mysql_connector/connection/
query.rs1use {
2 super::{
3 types::{Column, TextProtocol},
4 Command, Connection, ParseBuf, ResultSet, BUFFER_POOL,
5 },
6 crate::{
7 model::FromQueryResult,
8 packets::{ColumnDef, OkPacket},
9 types::Value,
10 Deserialize, Error,
11 },
12};
13
14impl Connection {
15 pub async fn query<R: FromQueryResult>(
16 &mut self,
17 query: &str,
18 ) -> Result<ResultSet<'_, TextProtocol, R>, Error> {
19 self.execute_command(Command::Query, query).await?;
20 ResultSet::read(self).await
21 }
22
23 pub async fn execute_query(&mut self, query: &str) -> Result<OkPacket, Error> {
24 self.execute_command(Command::Query, query).await?;
25 self.read_response().await?.map_err(Into::into)
26 }
27
28 pub(super) async fn execute_command<D>(&mut self, cmd: Command, data: D) -> Result<(), Error>
29 where
30 D: AsRef<[u8]>,
31 {
32 let mut buf = BUFFER_POOL.get();
33 let body: &mut Vec<u8> = buf.as_mut();
34 body.push(cmd as u8);
35 body.extend_from_slice(data.as_ref());
36 self.cleanup().await?;
37 self.seq_id = 0;
38 self.write_packet(&buf).await
39 }
40
41 pub(super) async fn read_column_defs(&mut self, count: usize) -> Result<Vec<Column>, Error> {
42 let mut columns: Vec<Column> = Vec::with_capacity(count);
43 for _ in 0..count {
44 let packet = self.read_packet().await?;
45 let def = ColumnDef::deserialize(&mut ParseBuf(&packet), ())?;
46 columns.push(def.try_into()?);
47 }
48 Ok(columns)
49 }
50
51 pub(super) async fn read_settings(&mut self) -> Result<(), Error> {
52 if self.options.max_allowed_packet().is_none() {
53 let mut res = self
54 .query::<Vec<Value>>("select @@max_allowed_packet")
55 .await?;
56 let row = res.next().await?;
57 let columns = res.into_columns();
58
59 if let Some(mut row) = row {
60 if let Some(i) = columns
61 .iter()
62 .position(|x| x.name() == "@@max_allowed_packet")
63 {
64 self.data.max_allowed_packet =
65 <Value as TryInto<u64>>::try_into(row[i].take())? as usize;
66 }
67 }
68 }
69 Ok(())
70 }
71}