sqlx_core_oldapi/mysql/connection/
executor.rs

1use super::MySqlStream;
2use crate::describe::Describe;
3use crate::error::Error;
4use crate::executor::{Execute, Executor};
5use crate::ext::ustr::UStr;
6use crate::logger::QueryLogger;
7use crate::mysql::connection::stream::Waiting;
8use crate::mysql::io::MySqlBufExt;
9use crate::mysql::protocol::response::Status;
10use crate::mysql::protocol::statement::{
11    BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
12};
13use crate::mysql::protocol::text::{ColumnDefinition, ColumnFlags, Query, TextRow};
14use crate::mysql::statement::{MySqlStatement, MySqlStatementMetadata};
15use crate::mysql::{
16    MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, MySqlTypeInfo,
17    MySqlValueFormat,
18};
19use crate::HashMap;
20use either::Either;
21use futures_core::future::BoxFuture;
22use futures_core::stream::BoxStream;
23use futures_core::Stream;
24use futures_util::{pin_mut, TryStreamExt};
25use std::{borrow::Cow, sync::Arc};
26
27impl MySqlConnection {
28    async fn get_or_prepare<'c>(
29        &mut self,
30        sql: &str,
31        persistent: bool,
32    ) -> Result<(u32, MySqlStatementMetadata), Error> {
33        if let Some(statement) = self.cache_statement.get_mut(sql) {
34            // <MySqlStatementMetadata> is internally reference-counted
35            return Ok((*statement).clone());
36        }
37
38        // https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
39        // https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html#packet-COM_STMT_PREPARE_OK
40
41        self.stream.send_packet(Prepare { query: sql }).await?;
42
43        let ok: PrepareOk = self.stream.recv().await?;
44
45        // the parameter definitions are very unreliable so we skip over them
46        // as we have little use
47
48        if ok.params > 0 {
49            for _ in 0..ok.params {
50                let _def: ColumnDefinition = self.stream.recv().await?;
51            }
52
53            self.stream.maybe_recv_eof().await?;
54        }
55
56        // the column definitions are berefit the type information from the
57        // to-be-bound parameters; we will receive the output column definitions
58        // once more on execute so we wait for that
59
60        let mut columns = Vec::new();
61
62        let column_names = if ok.columns > 0 {
63            recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns).await?
64        } else {
65            Default::default()
66        };
67
68        let id = ok.statement_id;
69        let metadata = MySqlStatementMetadata {
70            parameters: ok.params as usize,
71            columns: Arc::new(columns),
72            column_names: Arc::new(column_names),
73        };
74
75        if persistent && self.cache_statement.is_enabled() {
76            // in case of the cache being full, close the least recently used statement
77            if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
78                self.stream.send_packet(StmtClose { statement: id }).await?;
79            }
80        }
81
82        Ok((id, metadata))
83    }
84
85    #[allow(clippy::needless_lifetimes)]
86    async fn run<'e, 'c: 'e, 'q: 'e>(
87        &'c mut self,
88        sql: &'q str,
89        arguments: Option<MySqlArguments>,
90        persistent: bool,
91    ) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
92    {
93        let mut logger = QueryLogger::new(sql, self.log_settings.clone());
94
95        self.stream.wait_until_ready().await?;
96        self.stream.waiting.push_back(Waiting::Result);
97
98        Ok(Box::pin(try_stream! {
99            // make a slot for the shared column data
100            // as long as a reference to a row is not held past one iteration, this enables us
101            // to re-use this memory freely between result sets
102            let mut columns = Arc::new(Vec::new());
103
104            let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
105                let (id, metadata) = self.get_or_prepare(
106                    sql,
107                    persistent,
108                )
109                .await?;
110
111                // https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
112                self.stream
113                    .send_packet(StatementExecute {
114                        statement: id,
115                        arguments: &arguments,
116                    })
117                    .await?;
118
119                (metadata.column_names, MySqlValueFormat::Binary, false)
120            } else {
121                // https://dev.mysql.com/doc/internals/en/com-query.html
122                self.stream.send_packet(Query(sql)).await?;
123
124                (Arc::default(), MySqlValueFormat::Text, true)
125            };
126
127            loop {
128                // query response is a meta-packet which may be one of:
129                //  Ok, Err, ResultSet, or (unhandled) LocalInfileRequest
130                let mut packet = self.stream.recv_packet().await?;
131
132                if packet[0] == 0x00 || packet[0] == 0xff {
133                    // first packet in a query response is OK or ERR
134                    // this indicates either a successful query with no rows at all or a failed query
135                    let ok = packet.ok()?;
136
137                    let rows_affected = ok.affected_rows;
138                    logger.increase_rows_affected(rows_affected);
139                    let done = MySqlQueryResult {
140                        rows_affected,
141                        last_insert_id: ok.last_insert_id,
142                    };
143
144                    r#yield!(Either::Left(done));
145
146                    if ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
147                        // more result sets exist, continue to the next one
148                        continue;
149                    }
150
151                    self.stream.waiting.pop_front();
152                    return Ok(());
153                }
154
155                // otherwise, this first packet is the start of the result-set metadata,
156                *self.stream.waiting.front_mut().unwrap() = Waiting::Row;
157
158                let num_columns = packet.get_uint_lenenc() as usize; // column count
159
160                if needs_metadata {
161                    column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?);
162                } else {
163                    // next time we hit here, it'll be a new result set and we'll need the
164                    // full metadata
165                    needs_metadata = true;
166
167                    recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?;
168                }
169
170                // finally, there will be none or many result-rows
171                loop {
172                    let packet = self.stream.recv_packet().await?;
173
174                    if packet[0] == 0xfe && packet.len() < 9 {
175                        let eof = packet.eof(self.stream.capabilities)?;
176
177                        r#yield!(Either::Left(MySqlQueryResult {
178                            rows_affected: 0,
179                            last_insert_id: 0,
180                        }));
181
182                        if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
183                            // more result sets exist, continue to the next one
184                            *self.stream.waiting.front_mut().unwrap() = Waiting::Result;
185                            break;
186                        }
187
188                        self.stream.waiting.pop_front();
189                        return Ok(());
190                    }
191
192                    let row = match format {
193                        MySqlValueFormat::Binary => packet.decode_with::<BinaryRow, _>(&columns)?.0,
194                        MySqlValueFormat::Text => packet.decode_with::<TextRow, _>(&columns)?.0,
195                    };
196
197                    let v = Either::Right(MySqlRow {
198                        row,
199                        format,
200                        columns: Arc::clone(&columns),
201                        column_names: Arc::clone(&column_names),
202                    });
203
204                    logger.increment_rows_returned();
205
206                    r#yield!(v);
207                }
208            }
209        }))
210    }
211}
212
213impl<'c> Executor<'c> for &'c mut MySqlConnection {
214    type Database = MySql;
215
216    fn fetch_many<'e, 'q: 'e, E: 'q>(
217        self,
218        mut query: E,
219    ) -> BoxStream<'e, Result<Either<MySqlQueryResult, MySqlRow>, Error>>
220    where
221        'c: 'e,
222        E: Execute<'q, Self::Database>,
223    {
224        let sql = query.sql();
225        let arguments = query.take_arguments();
226        let persistent = query.persistent();
227
228        Box::pin(try_stream! {
229            let s = self.run(sql, arguments, persistent).await?;
230            pin_mut!(s);
231
232            while let Some(v) = s.try_next().await? {
233                r#yield!(v);
234            }
235
236            Ok(())
237        })
238    }
239
240    fn fetch_optional<'e, 'q: 'e, E: 'q>(
241        self,
242        query: E,
243    ) -> BoxFuture<'e, Result<Option<MySqlRow>, Error>>
244    where
245        'c: 'e,
246        E: Execute<'q, Self::Database>,
247    {
248        let mut s = self.fetch_many(query);
249
250        Box::pin(async move {
251            while let Some(v) = s.try_next().await? {
252                if let Either::Right(r) = v {
253                    return Ok(Some(r));
254                }
255            }
256
257            Ok(None)
258        })
259    }
260
261    fn prepare_with<'e, 'q: 'e>(
262        self,
263        sql: &'q str,
264        _parameters: &'e [MySqlTypeInfo],
265    ) -> BoxFuture<'e, Result<MySqlStatement<'q>, Error>>
266    where
267        'c: 'e,
268    {
269        Box::pin(async move {
270            self.stream.wait_until_ready().await?;
271
272            let (_, metadata) = self.get_or_prepare(sql, true).await?;
273
274            Ok(MySqlStatement {
275                sql: Cow::Borrowed(sql),
276                // metadata has internal Arcs for expensive data structures
277                metadata: metadata.clone(),
278            })
279        })
280    }
281
282    #[doc(hidden)]
283    fn describe<'e, 'q: 'e>(self, sql: &'q str) -> BoxFuture<'e, Result<Describe<MySql>, Error>>
284    where
285        'c: 'e,
286    {
287        Box::pin(async move {
288            self.stream.wait_until_ready().await?;
289
290            let (_, metadata) = self.get_or_prepare(sql, false).await?;
291
292            let columns = (&*metadata.columns).clone();
293
294            let nullable = columns
295                .iter()
296                .map(|col| {
297                    col.flags
298                        .map(|flags| !flags.contains(ColumnFlags::NOT_NULL))
299                })
300                .collect();
301
302            Ok(Describe {
303                parameters: Some(Either::Right(metadata.parameters)),
304                columns,
305                nullable,
306            })
307        })
308    }
309}
310
311async fn recv_result_columns(
312    stream: &mut MySqlStream,
313    num_columns: usize,
314    columns: &mut Vec<MySqlColumn>,
315) -> Result<(), Error> {
316    columns.clear();
317    columns.reserve(num_columns);
318
319    for ordinal in 0..num_columns {
320        columns.push(recv_next_result_column(&stream.recv().await?, ordinal)?);
321    }
322
323    if num_columns > 0 {
324        stream.maybe_recv_eof().await?;
325    }
326
327    Ok(())
328}
329
330fn recv_next_result_column(def: &ColumnDefinition, ordinal: usize) -> Result<MySqlColumn, Error> {
331    // if the alias is empty, use the alias
332    // only then use the name
333    let name = match (def.name()?, def.alias()?) {
334        (_, alias) if !alias.is_empty() => UStr::new(alias),
335        (name, _) => UStr::new(name),
336    };
337
338    let type_info = MySqlTypeInfo::from_column(&def);
339
340    Ok(MySqlColumn {
341        name,
342        type_info,
343        ordinal,
344        flags: Some(def.flags),
345    })
346}
347
348async fn recv_result_metadata(
349    stream: &mut MySqlStream,
350    num_columns: usize,
351    columns: &mut Vec<MySqlColumn>,
352) -> Result<HashMap<UStr, usize>, Error> {
353    // the result-set metadata is primarily a listing of each output
354    // column in the result-set
355
356    let mut column_names = HashMap::with_capacity(num_columns);
357
358    columns.clear();
359    columns.reserve(num_columns);
360
361    for ordinal in 0..num_columns {
362        let def: ColumnDefinition = stream.recv().await?;
363
364        let column = recv_next_result_column(&def, ordinal)?;
365
366        column_names.insert(column.name.clone(), ordinal);
367        columns.push(column);
368    }
369
370    stream.maybe_recv_eof().await?;
371
372    Ok(column_names)
373}