Skip to main content

sqlx_mysql/connection/
executor.rs

1use super::MySqlStream;
2use crate::connection::stream::Waiting;
3use crate::error::Error;
4use crate::executor::{Execute, Executor};
5use crate::ext::ustr::UStr;
6use crate::io::MySqlBufExt;
7use crate::logger::QueryLogger;
8use crate::protocol::response::Status;
9use crate::protocol::statement::{
10    BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
11};
12use crate::protocol::text::{ColumnDefinition, Query, TextRow};
13use crate::statement::{MySqlStatement, MySqlStatementMetadata};
14use crate::HashMap;
15use crate::{
16    MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, MySqlTypeInfo,
17    MySqlValueFormat,
18};
19use either::Either;
20use futures_core::future::BoxFuture;
21use futures_core::stream::BoxStream;
22use futures_core::Stream;
23use futures_util::TryStreamExt;
24use sqlx_core::column::{ColumnOrigin, TableColumn};
25use sqlx_core::sql_str::SqlStr;
26use std::{pin::pin, sync::Arc};
27
28impl MySqlConnection {
29    async fn prepare_statement(
30        &mut self,
31        sql: &str,
32    ) -> Result<(u32, MySqlStatementMetadata), Error> {
33        // https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
34        // https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html#packet-COM_STMT_PREPARE_OK
35
36        self.inner
37            .stream
38            .send_packet(Prepare { query: sql })
39            .await?;
40
41        let ok: PrepareOk = self.inner.stream.recv().await?;
42
43        // the parameter definitions are very unreliable so we skip over them
44        // as we have little use
45
46        if ok.params > 0 {
47            for _ in 0..ok.params {
48                let _def: ColumnDefinition = self.inner.stream.recv().await?;
49            }
50
51            self.inner.stream.maybe_recv_eof().await?;
52        }
53
54        // the column definitions are berefit the type information from the
55        // to-be-bound parameters; we will receive the output column definitions
56        // once more on execute so we wait for that
57
58        let mut columns = Vec::new();
59
60        let column_names = if ok.columns > 0 {
61            recv_result_metadata(&mut self.inner.stream, ok.columns as usize, &mut columns).await?
62        } else {
63            Default::default()
64        };
65
66        let id = ok.statement_id;
67        let metadata = MySqlStatementMetadata {
68            parameters: ok.params as usize,
69            columns: Arc::new(columns),
70            column_names: Arc::new(column_names),
71        };
72
73        Ok((id, metadata))
74    }
75
76    async fn get_or_prepare_statement(
77        &mut self,
78        sql: &str,
79    ) -> Result<(u32, MySqlStatementMetadata), Error> {
80        if let Some(statement) = self.inner.cache_statement.get_mut(sql) {
81            // <MySqlStatementMetadata> is internally reference-counted
82            return Ok((*statement).clone());
83        }
84
85        let (id, metadata) = self.prepare_statement(sql).await?;
86
87        // in case of the cache being full, close the least recently used statement
88        if let Some((id, _)) = self
89            .inner
90            .cache_statement
91            .insert(sql, (id, metadata.clone()))
92        {
93            self.inner
94                .stream
95                .send_packet(StmtClose { statement: id })
96                .await?;
97        }
98
99        Ok((id, metadata))
100    }
101
102    #[allow(clippy::needless_lifetimes)]
103    pub(crate) async fn run<'e, 'c: 'e, 'q: 'e>(
104        &'c mut self,
105        sql: SqlStr,
106        arguments: Option<MySqlArguments>,
107        persistent: bool,
108    ) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
109    {
110        let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone());
111
112        self.inner.stream.wait_until_ready().await?;
113        self.inner.stream.waiting.push_back(Waiting::Result);
114
115        Ok(try_stream! {
116        let sql = logger.sql().as_str();
117
118            // make a slot for the shared column data
119            // as long as a reference to a row is not held past one iteration, this enables us
120            // to re-use this memory freely between result sets
121            let mut columns = Arc::new(Vec::new());
122
123            let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
124                if persistent && self.inner.cache_statement.is_enabled() {
125                    let (id, metadata) = self
126                        .get_or_prepare_statement(sql)
127                        .await?;
128
129                    if arguments.types.len() != metadata.parameters {
130                        return Err(
131                            err_protocol!(
132                                "prepared statement expected {} parameters but {} parameters were provided",
133                                metadata.parameters,
134                                arguments.types.len()
135                            )
136                        );
137                    }
138
139                    // https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
140                    self.inner.stream
141                        .send_packet(StatementExecute {
142                            statement: id,
143                            arguments: &arguments,
144                        })
145                        .await?;
146
147                    (metadata.column_names, MySqlValueFormat::Binary, false)
148                } else {
149                    let (id, metadata) = self
150                        .prepare_statement(sql)
151                        .await?;
152
153                    if arguments.types.len() != metadata.parameters {
154                        return Err(
155                            err_protocol!(
156                                "prepared statement expected {} parameters but {} parameters were provided",
157                                metadata.parameters,
158                                arguments.types.len()
159                            )
160                        );
161                    }
162
163                    // https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
164                    self.inner.stream
165                        .send_packet(StatementExecute {
166                            statement: id,
167                            arguments: &arguments,
168                        })
169                        .await?;
170
171                    self.inner.stream.send_packet(StmtClose { statement: id }).await?;
172
173                    (metadata.column_names, MySqlValueFormat::Binary, false)
174                }
175            } else {
176                // https://dev.mysql.com/doc/internals/en/com-query.html
177                self.inner.stream.send_packet(Query(sql)).await?;
178
179                (Arc::default(), MySqlValueFormat::Text, true)
180            };
181
182            loop {
183                // query response is a meta-packet which may be one of:
184                //  Ok, Err, ResultSet, or (unhandled) LocalInfileRequest
185                let mut packet = self.inner.stream.recv_packet().await?;
186
187                if packet[0] == 0x00 || packet[0] == 0xff {
188                    // first packet in a query response is OK or ERR
189                    // this indicates either a successful query with no rows at all or a failed query
190                    let ok = packet.ok()?;
191
192                    self.inner.status_flags = ok.status;
193
194                    let rows_affected = ok.affected_rows;
195                    logger.increase_rows_affected(rows_affected);
196                    let done = MySqlQueryResult {
197                        rows_affected,
198                        last_insert_id: ok.last_insert_id,
199                    };
200
201                    r#yield!(Either::Left(done));
202
203                    if ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
204                        // more result sets exist, continue to the next one
205                        continue;
206                    }
207
208                    self.inner.stream.waiting.pop_front();
209                    return Ok(());
210                }
211
212                // otherwise, this first packet is the start of the result-set metadata,
213                *self.inner.stream.waiting.front_mut().unwrap() = Waiting::Row;
214
215                let num_columns = packet.get_uint_lenenc(); // column count
216                let num_columns = usize::try_from(num_columns)
217                    .map_err(|_| err_protocol!("column count overflows usize: {num_columns}"))?;
218
219                if needs_metadata {
220                    column_names = Arc::new(recv_result_metadata(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?);
221                } else {
222                    // next time we hit here, it'll be a new result set and we'll need the
223                    // full metadata
224                    needs_metadata = true;
225
226                    recv_result_columns(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?;
227                }
228
229                // finally, there will be none or many result-rows
230                loop {
231                    let packet = self.inner.stream.recv_packet().await?;
232
233                    if packet[0] == 0xfe {
234                        let (rows_affected, last_insert_id, status) = if packet.len() < 9 {
235                            // EOF packet
236                            let eof = packet.eof(self.inner.stream.capabilities)?;
237                            (0, 0, eof.status)
238                        } else {
239                            // OK packet
240                            let ok = packet.ok()?;
241                            (ok.affected_rows, ok.last_insert_id, ok.status)
242                        };
243
244                        self.inner.status_flags = status;
245                        r#yield!(Either::Left(MySqlQueryResult {
246                            rows_affected,
247                            last_insert_id,
248                        }));
249
250                        if status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
251                            *self.inner.stream.waiting.front_mut().unwrap() = Waiting::Result;
252                            break;
253                        }
254                        self.inner.stream.waiting.pop_front();
255                        return Ok(());
256                    }
257
258                    let row = match format {
259                        MySqlValueFormat::Binary => packet.decode_with::<BinaryRow, _>(&columns)?.0,
260                        MySqlValueFormat::Text => packet.decode_with::<TextRow, _>(&columns)?.0,
261                    };
262
263                    let v = Either::Right(MySqlRow {
264                        row,
265                        format,
266                        columns: Arc::clone(&columns),
267                        column_names: Arc::clone(&column_names),
268                    });
269
270                    logger.increment_rows_returned();
271
272                    r#yield!(v);
273                }
274            }
275        })
276    }
277}
278
279impl<'c> Executor<'c> for &'c mut MySqlConnection {
280    type Database = MySql;
281
282    fn fetch_many<'e, 'q, E>(
283        self,
284        mut query: E,
285    ) -> BoxStream<'e, Result<Either<MySqlQueryResult, MySqlRow>, Error>>
286    where
287        'c: 'e,
288        E: Execute<'q, Self::Database>,
289        'q: 'e,
290        E: 'q,
291    {
292        let arguments = query.take_arguments().map_err(Error::Encode);
293        let persistent = query.persistent();
294
295        Box::pin(try_stream! {
296        let sql = query.sql();
297            let arguments = arguments?;
298            let mut s = pin!(self.run(sql, arguments, persistent).await?);
299
300            while let Some(v) = s.try_next().await? {
301                r#yield!(v);
302            }
303
304            Ok(())
305        })
306    }
307
308    fn fetch_optional<'e, 'q, E>(self, query: E) -> BoxFuture<'e, Result<Option<MySqlRow>, Error>>
309    where
310        'c: 'e,
311        E: Execute<'q, Self::Database>,
312        'q: 'e,
313        E: 'q,
314    {
315        let mut s = self.fetch_many(query);
316
317        Box::pin(async move {
318            while let Some(v) = s.try_next().await? {
319                if let Either::Right(r) = v {
320                    return Ok(Some(r));
321                }
322            }
323
324            Ok(None)
325        })
326    }
327
328    fn prepare_with<'e>(
329        self,
330        sql: SqlStr,
331        _parameters: &'e [MySqlTypeInfo],
332    ) -> BoxFuture<'e, Result<MySqlStatement, Error>>
333    where
334        'c: 'e,
335    {
336        Box::pin(async move {
337            self.inner.stream.wait_until_ready().await?;
338
339            let metadata = if self.inner.cache_statement.is_enabled() {
340                self.get_or_prepare_statement(sql.as_str()).await?.1
341            } else {
342                let (id, metadata) = self.prepare_statement(sql.as_str()).await?;
343
344                self.inner
345                    .stream
346                    .send_packet(StmtClose { statement: id })
347                    .await?;
348
349                metadata
350            };
351
352            Ok(MySqlStatement {
353                sql,
354                // metadata has internal Arcs for expensive data structures
355                metadata: metadata.clone(),
356            })
357        })
358    }
359
360    #[doc(hidden)]
361    #[cfg(feature = "offline")]
362    fn describe<'e>(
363        self,
364        sql: SqlStr,
365    ) -> BoxFuture<'e, Result<crate::describe::Describe<MySql>, Error>>
366    where
367        'c: 'e,
368    {
369        Box::pin(async move {
370            self.inner.stream.wait_until_ready().await?;
371
372            let (id, metadata) = self.prepare_statement(sql.as_str()).await?;
373
374            self.inner
375                .stream
376                .send_packet(StmtClose { statement: id })
377                .await?;
378
379            let columns = (*metadata.columns).clone();
380
381            let nullable = columns
382                .iter()
383                .map(|col| {
384                    col.flags
385                        .map(|flags| !flags.contains(crate::protocol::text::ColumnFlags::NOT_NULL))
386                })
387                .collect();
388
389            Ok(crate::describe::Describe {
390                parameters: Some(Either::Right(metadata.parameters)),
391                columns,
392                nullable,
393            })
394        })
395    }
396}
397
398async fn recv_result_columns(
399    stream: &mut MySqlStream,
400    num_columns: usize,
401    columns: &mut Vec<MySqlColumn>,
402) -> Result<(), Error> {
403    columns.clear();
404    columns.reserve(num_columns);
405
406    for ordinal in 0..num_columns {
407        columns.push(recv_next_result_column(&stream.recv().await?, ordinal)?);
408    }
409
410    if num_columns > 0 {
411        stream.maybe_recv_eof().await?;
412    }
413
414    Ok(())
415}
416
417fn recv_next_result_column(def: &ColumnDefinition, ordinal: usize) -> Result<MySqlColumn, Error> {
418    // if the alias is empty, use the alias
419    // only then use the name
420    let column_name = def.name()?;
421
422    let name = match (def.name()?, def.alias()?) {
423        (_, alias) if !alias.is_empty() => UStr::new(alias),
424        (name, _) => UStr::new(name),
425    };
426
427    let table = def.table()?;
428
429    let origin = if table.is_empty() {
430        ColumnOrigin::Expression
431    } else {
432        let schema = def.schema()?;
433
434        ColumnOrigin::Table(TableColumn {
435            table: if !schema.is_empty() {
436                format!("{schema}.{table}").into()
437            } else {
438                table.into()
439            },
440            name: column_name.into(),
441        })
442    };
443
444    let type_info = MySqlTypeInfo::from_column(def);
445
446    Ok(MySqlColumn {
447        name,
448        type_info,
449        ordinal,
450        flags: Some(def.flags),
451        origin,
452    })
453}
454
455async fn recv_result_metadata(
456    stream: &mut MySqlStream,
457    num_columns: usize,
458    columns: &mut Vec<MySqlColumn>,
459) -> Result<HashMap<UStr, usize>, Error> {
460    // the result-set metadata is primarily a listing of each output
461    // column in the result-set
462
463    let mut column_names = HashMap::with_capacity(num_columns);
464
465    columns.clear();
466    columns.reserve(num_columns);
467
468    for ordinal in 0..num_columns {
469        let def: ColumnDefinition = stream.recv().await?;
470
471        let column = recv_next_result_column(&def, ordinal)?;
472
473        column_names.insert(column.name.clone(), ordinal);
474        columns.push(column);
475    }
476
477    stream.maybe_recv_eof().await?;
478
479    Ok(column_names)
480}