rbdc_mysql/connection/
executor.rs

1use super::MySqlStream;
2use crate::connection::stream::Waiting;
3use crate::connection::MySqlConnection;
4use crate::io::MySqlBufExt;
5use crate::protocol::response::Status;
6use crate::protocol::statement::{
7    BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
8};
9use crate::protocol::text::{ColumnDefinition, Query, TextRow};
10use crate::query::MysqlQuery;
11use crate::query_result::MySqlQueryResult;
12use crate::result_set::{MySqlColumn, MySqlTypeInfo};
13use crate::row::MySqlRow;
14use crate::stmt::{MySqlArguments, MySqlStatement, MySqlStatementMetadata};
15use crate::value::MySqlValueFormat;
16use either::Either;
17use futures_core::future::BoxFuture;
18use futures_core::stream::BoxStream;
19use futures_core::Stream;
20use futures_util::{pin_mut, TryStreamExt};
21use rbdc::ext::ustr::UStr;
22use rbdc::{try_stream, Error};
23use std::collections::HashMap;
24use std::sync::Arc;
25
26impl MySqlConnection {
27    async fn get_or_prepare<'c>(
28        &mut self,
29        sql: &str,
30        persistent: bool,
31    ) -> Result<(u32, MySqlStatementMetadata), Error> {
32        if let Some(statement) = self.cache_statement.get_mut(sql) {
33            // <MySqlStatementMetadata> is internally reference-counted
34            return Ok((*statement).clone());
35        }
36
37        // https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
38        // https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html#packet-COM_STMT_PREPARE_OK
39
40        self.stream.send_packet(Prepare { query: sql }).await?;
41
42        let ok: PrepareOk = self.stream.recv().await?;
43
44        // the parameter definitions are very unreliable so we skip over them
45        // as we have little use
46
47        if ok.params > 0 {
48            for _ in 0..ok.params {
49                let _def: ColumnDefinition = self.stream.recv().await?;
50            }
51
52            self.stream.maybe_recv_eof().await?;
53        }
54
55        // the column definitions are berefit the type information from the
56        // to-be-bound parameters; we will receive the output column definitions
57        // once more on execute so we wait for that
58
59        let mut columns = Vec::with_capacity(ok.columns as usize);
60
61        let column_names = if ok.columns > 0 {
62            recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns).await?
63        } else {
64            Default::default()
65        };
66
67        let id = ok.statement_id;
68        let metadata = MySqlStatementMetadata {
69            parameters: ok.params as usize,
70            columns: Arc::new(columns),
71            column_names: Arc::new(column_names),
72        };
73
74        if persistent && self.cache_statement.is_enabled() {
75            // in case of the cache being full, close the least recently used statement
76            if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
77                self.stream.send_packet(StmtClose { statement: id }).await?;
78            }
79        }
80
81        Ok((id, metadata))
82    }
83
84    #[allow(clippy::needless_lifetimes)]
85    async fn run<'e, 'q: 'e>(
86        &'e mut self,
87        sql: &'q str,
88        arguments: Option<MySqlArguments>,
89        persistent: bool,
90    ) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
91    {
92        self.stream.wait_until_ready().await?;
93        self.stream.waiting.push_back(Waiting::Result);
94
95        Ok(Box::pin(try_stream! {
96            let option = self.option.clone();
97
98            // make a slot for the shared column data
99            // as long as a reference to a row is not held past one iteration, this enables us
100            // to re-use this memory freely between result sets
101            let mut columns = Arc::new(Vec::with_capacity({
102                if let Some(arguments) = &arguments {
103                     arguments.len()
104                }else{
105                     0
106                }
107            }));
108
109            let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
110                let (id, metadata) = self.get_or_prepare(
111                    sql,
112                    persistent,
113                )
114                .await?;
115
116                // https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
117                self.stream
118                    .send_packet(StatementExecute {
119                        statement_id: id,
120                        arguments: &arguments,
121                    })
122                    .await?;
123
124                (metadata.column_names, MySqlValueFormat::Binary, false)
125            } else {
126                // https://dev.mysql.com/doc/internals/en/com-query.html
127                self.stream.send_packet(Query(sql)).await?;
128
129                (Arc::default(), MySqlValueFormat::Text, true)
130            };
131
132            loop {
133                // query response is a meta-packet which may be one of:
134                //  Ok, Err, ResultSet, or (unhandled) LocalInfileRequest
135                let mut packet = self.stream.recv_packet().await?;
136
137                if packet[0] == 0x00 || packet[0] == 0xff {
138                    // first packet in a query response is OK or ERR
139                    // this indicates either a successful query with no rows at all or a failed query
140                    let ok = packet.ok()?;
141
142                    let rows_affected = ok.affected_rows;
143                    let done = MySqlQueryResult {
144                        rows_affected,
145                        last_insert_id: ok.last_insert_id,
146                    };
147
148                    r#yield!(Either::Left(done));
149
150                    if ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
151                        // more result sets exist, continue to the next one
152                        continue;
153                    }
154
155                    self.stream.waiting.pop_front();
156                    return Ok(());
157                }
158
159                // otherwise, this first packet is the start of the result-set metadata,
160                *self.stream.waiting.front_mut().unwrap() = Waiting::Row;
161
162                let num_columns = packet.get_uint_lenenc() as usize; // column count
163
164                if needs_metadata {
165                    column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?);
166                } else {
167                    // next time we hit here, it'll be a new result set and we'll need the
168                    // full metadata
169                    needs_metadata = true;
170
171                    recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?;
172                }
173
174                // finally, there will be none or many result-rows
175                loop {
176                    let packet = self.stream.recv_packet().await?;
177
178                    if packet[0] == 0xfe && packet.len() < 9 {
179                        let eof = packet.eof(self.stream.capabilities)?;
180
181                        r#yield!(Either::Left(MySqlQueryResult {
182                            rows_affected: 0,
183                            last_insert_id: 0,
184                        }));
185
186                        if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
187                            // more result sets exist, continue to the next one
188                            *self.stream.waiting.front_mut().unwrap() = Waiting::Result;
189                            break;
190                        }
191
192                        self.stream.waiting.pop_front();
193                        return Ok(());
194                    }
195
196                    let row = match format {
197                        MySqlValueFormat::Binary => packet.decode_with::<BinaryRow, _>(&columns)?.0,
198                        MySqlValueFormat::Text => packet.decode_with::<TextRow, _>(&columns)?.0,
199                    };
200
201                    let v = Either::Right(MySqlRow {
202                        row,
203                        format,
204                        columns: Arc::clone(&columns),
205                        column_names: Arc::clone(&column_names),
206                        option: option.clone(),
207                    });
208
209                    r#yield!(v);
210                }
211            }
212        }))
213    }
214}
215
216impl MySqlConnection {
217    pub async fn execute(&mut self, sql: &str) -> Result<Option<MySqlRow>, Error> {
218        self.fetch_optional(MysqlQuery {
219            statement: Either::Left(sql.to_string()),
220            arguments: vec![],
221            persistent: false,
222        })
223        .await
224    }
225
226    pub fn fetch_many(
227        &mut self,
228        query: MysqlQuery,
229    ) -> BoxStream<'_, Result<Either<MySqlQueryResult, MySqlRow>, Error>> {
230        let sql = query.sql().to_owned();
231        let persistent = query.persistent();
232        Box::pin(try_stream! {
233            let arguments = query.take_arguments()?;
234            let s = self.run(&sql, arguments, persistent).await?;
235            pin_mut!(s);
236
237            while let Some(v) = s.try_next().await? {
238                r#yield!(v);
239            }
240
241            Ok(())
242        })
243    }
244
245    pub fn fetch_optional(
246        &mut self,
247        query: MysqlQuery,
248    ) -> BoxFuture<'_, Result<Option<MySqlRow>, Error>> {
249        let mut s = self.fetch_many(query);
250        Box::pin(async move {
251            while let Some(v) = s.try_next().await? {
252                if let Either::Right(r) = v {
253                    return Ok(Some(r));
254                }
255            }
256
257            Ok(None)
258        })
259    }
260
261    pub fn prepare_with<'e>(
262        &'e mut self,
263        sql: &'e str,
264        _: &'e [MySqlTypeInfo],
265    ) -> BoxFuture<'e, Result<MySqlStatement, Error>> {
266        let sql = sql.to_string();
267        Box::pin(async move {
268            self.stream.wait_until_ready().await?;
269
270            let (_, metadata) = self.get_or_prepare(&sql, true).await?;
271
272            Ok(MySqlStatement {
273                sql: sql,
274                // metadata has internal Arcs for expensive data structures
275                metadata: metadata.clone(),
276            })
277        })
278    }
279
280    // #[doc(hidden)]
281    // pub fn describe<'e, 'q: 'e>(mut self, sql: &'q str) -> BoxFuture<'e, Result<Describe, Error>> {
282    //     Box::pin(async move {
283    //         self.stream.wait_until_ready().await?;
284    //
285    //         let (_, metadata) = self.get_or_prepare(sql, false).await?;
286    //
287    //         let columns = (&*metadata.columns).clone();
288    //
289    //         let nullable = columns
290    //             .iter()
291    //             // .map(|col| {
292    //             //     col.flags
293    //             //         .map(|flags| !flags.contains(ColumnFlags::NOT_NULL))
294    //             // })
295    //             .collect();
296    //
297    //         Ok(Describe {
298    //             parameters: Some(Either::Right(metadata.parameters)),
299    //             columns,
300    //             nullable,
301    //         })
302    //     })
303    // }
304}
305
306async fn recv_result_columns(
307    stream: &mut MySqlStream,
308    num_columns: usize,
309    columns: &mut Vec<MySqlColumn>,
310) -> Result<(), Error> {
311    columns.clear();
312    columns.reserve(num_columns);
313
314    for ordinal in 0..num_columns {
315        columns.push(recv_next_result_column(&stream.recv().await?, ordinal)?);
316    }
317
318    if num_columns > 0 {
319        stream.maybe_recv_eof().await?;
320    }
321
322    Ok(())
323}
324
325fn recv_next_result_column(def: &ColumnDefinition, ordinal: usize) -> Result<MySqlColumn, Error> {
326    // if the alias is empty, use the alias
327    // only then use the name
328    let name = match (def.name()?, def.alias()?) {
329        (_, alias) if !alias.is_empty() => UStr::new(alias),
330        (name, _) => UStr::new(name),
331    };
332
333    let type_info = MySqlTypeInfo::from_column(&def);
334
335    Ok(MySqlColumn {
336        name,
337        type_info,
338        ordinal,
339        // flags: Some(def.flags),
340    })
341}
342
343async fn recv_result_metadata(
344    stream: &mut MySqlStream,
345    num_columns: usize,
346    columns: &mut Vec<MySqlColumn>,
347) -> Result<HashMap<UStr, (usize, MySqlTypeInfo)>, Error> {
348    // the result-set metadata is primarily a listing of each output
349    // column in the result-set
350
351    let mut column_names = HashMap::with_capacity(num_columns);
352
353    columns.clear();
354    columns.reserve(num_columns);
355
356    for ordinal in 0..num_columns {
357        let def: ColumnDefinition = stream.recv().await?;
358
359        let column = recv_next_result_column(&def, ordinal)?;
360
361        column_names.insert(column.name.clone(), (ordinal, column.type_info.clone()));
362        columns.push(column);
363    }
364
365    stream.maybe_recv_eof().await?;
366
367    Ok(column_names)
368}