cdbc_mysql/connection/
executor.rs

1use super::MySqlStream;
2use cdbc::describe::Describe;
3use cdbc::error::Error;
4use cdbc::executor::{Execute, Executor};
5use crate::connection::stream::Waiting;
6use crate::io::MySqlBufExt;
7use crate::protocol::response::Status;
8use crate::protocol::statement::{
9    BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
10};
11use crate::protocol::text::{ColumnDefinition, ColumnFlags, Query, TextRow};
12use crate::statement::{MySqlStatement, MySqlStatementMetadata};
13use crate::{
14    MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, MySqlTypeInfo,
15    MySqlValueFormat,
16};
17use cdbc::{chan_stream, HashMap};
18use either::Either;
19use std::{borrow::Cow, sync::Arc};
20use cdbc::database::{Database, HasStatement};
21use cdbc::io::chan_stream::{ChanStream, Stream, TryStream};
22use cdbc::utils::ustr::UStr;
23
24impl MySqlConnection {
25    fn get_or_prepare<'c>(
26        &mut self,
27        sql: &str,
28        persistent: bool,
29    ) -> Result<(u32, MySqlStatementMetadata), Error> {
30        if let Some(statement) = self.cache_statement.get_mut(sql) {
31            // <MySqlStatementMetadata> is internally reference-counted
32            return Ok((*statement).clone());
33        }
34
35        // https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
36        // https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html#packet-COM_STMT_PREPARE_OK
37
38        self.stream.send_packet(Prepare { query: sql })?;
39
40        let ok: PrepareOk = self.stream.recv()?;
41
42        // the parameter definitions are very unreliable so we skip over them
43        // as we have little use
44
45        if ok.params > 0 {
46            for _ in 0..ok.params {
47                let _def: ColumnDefinition = self.stream.recv()?;
48            }
49
50            self.stream.maybe_recv_eof()?;
51        }
52
53        // the column definitions are berefit the type information from the
54        // to-be-bound parameters; we will receive the output column definitions
55        // once more on execute so we wait for that
56
57        let mut columns = Vec::new();
58
59        let column_names = if ok.columns > 0 {
60            recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns)?
61        } else {
62            Default::default()
63        };
64
65        let id = ok.statement_id;
66        let metadata = MySqlStatementMetadata {
67            parameters: ok.params as usize,
68            columns: Arc::new(columns),
69            column_names: Arc::new(column_names),
70        };
71
72        if persistent && self.cache_statement.is_enabled() {
73            // in case of the cache being full, close the least recently used statement
74            if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
75                self.stream.send_packet(StmtClose { statement: id })?;
76            }
77        }
78
79        Ok((id, metadata))
80    }
81
82    #[allow(clippy::needless_lifetimes)]
83    fn run<'e, 'c: 'e, 'q: 'e>(
84        &'c mut self,
85        sql: &'q str,
86        arguments: Option<MySqlArguments>,
87        persistent: bool,
88    ) -> Result<ChanStream<Either<MySqlQueryResult, MySqlRow>>, Error>
89    {
90        self.stream.wait_until_ready()?;
91        self.stream.waiting.push_back(Waiting::Result);
92
93        Ok(chan_stream!({
94            // make a slot for the shared column data
95            // as long as a reference to a row is not held past one iteration, this enables us
96            // to re-use this memory freely between result sets
97            let mut columns = Arc::new(Vec::new());
98
99            let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
100                let (id, metadata) = self.get_or_prepare(
101                    sql,
102                    persistent,
103                )
104                ?;
105
106                // https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
107                self.stream
108                    .send_packet(StatementExecute {
109                        statement: id,
110                        arguments: &arguments,
111                    })
112                    ?;
113
114                (metadata.column_names, MySqlValueFormat::Binary, false)
115            } else {
116                // https://dev.mysql.com/doc/internals/en/com-query.html
117                self.stream.send_packet(Query(sql))?;
118
119                (Arc::default(), MySqlValueFormat::Text, true)
120            };
121
122            loop {
123                // query response is a meta-packet which may be one of:
124                //  Ok, Err, ResultSet, or (unhandled) LocalInfileRequest
125                let mut packet = self.stream.recv_packet()?;
126
127                if packet[0] == 0x00 || packet[0] == 0xff {
128                    // first packet in a query response is OK or ERR
129                    // this indicates either a successful query with no rows at all or a failed query
130                    let ok = packet.ok()?;
131
132                    let done = MySqlQueryResult {
133                        rows_affected: ok.affected_rows,
134                        last_insert_id: ok.last_insert_id,
135                    };
136
137                    r#yield!(Either::Left(done));
138
139                    if ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
140                        // more result sets exist, continue to the next one
141                        continue;
142                    }
143
144                    self.stream.waiting.pop_front();
145                    return Ok(());
146                }
147
148                // otherwise, this first packet is the start of the result-set metadata,
149                *self.stream.waiting.front_mut().unwrap() = Waiting::Row;
150
151                let num_columns = packet.get_uint_lenenc() as usize; // column count
152
153                if needs_metadata {
154                    column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns))?);
155                } else {
156                    // next time we hit here, it'll be a new result set and we'll need the
157                    // full metadata
158                    needs_metadata = true;
159
160                    recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns))?;
161                }
162
163                // finally, there will be none or many result-rows
164                loop {
165                    let packet = self.stream.recv_packet()?;
166
167                    if packet[0] == 0xfe && packet.len() < 9 {
168                        let eof = packet.eof(self.stream.capabilities)?;
169
170                        r#yield!(Either::Left(MySqlQueryResult {
171                            rows_affected: 0,
172                            last_insert_id: 0,
173                        }));
174
175                        if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
176                            // more result sets exist, continue to the next one
177                            *self.stream.waiting.front_mut().unwrap() = Waiting::Result;
178                            break;
179                        }
180
181                        self.stream.waiting.pop_front();
182                        return Ok(());
183                    }
184
185                    let row = match format {
186                        MySqlValueFormat::Binary => packet.decode_with::<BinaryRow, _>(&columns)?.0,
187                        MySqlValueFormat::Text => packet.decode_with::<TextRow, _>(&columns)?.0,
188                    };
189
190                    let v = Either::Right(MySqlRow {
191                        row,
192                        format,
193                        columns: Arc::clone(&columns),
194                        column_names: Arc::clone(&column_names),
195                    });
196                    r#yield!(v);
197                }
198            }
199        }))
200    }
201}
202
203impl Executor for MySqlConnection {
204    type Database = MySql;
205
206    fn fetch_many<'q, E: 'q>(
207        &mut self,
208        mut query: E,
209    ) -> ChanStream<Either<MySqlQueryResult, MySqlRow>>
210    where E: Execute<'q, Self::Database>,
211    {
212        let arguments = query.take_arguments();
213        let persistent = query.persistent();
214
215        chan_stream! {
216            let mut s = self.run(query.sql(), arguments, persistent)?;
217
218            while let Some(v) = s.try_next()? {
219                r#yield!(v);
220            }
221
222            Ok(())
223        }
224    }
225
226    fn fetch_optional<'q, E: 'q>(
227        &mut self,
228        query: E,
229    ) -> Result<Option<MySqlRow>, Error>
230    where E: Execute<'q, Self::Database>,
231    {
232        let mut s = self.fetch_many(query);
233            while let Some(v) = s.try_next()? {
234                if let Either::Right(r) = v {
235                    return Ok(Some(r));
236                }
237            }
238            Ok(None)
239    }
240
241    fn prepare_with<'q>(
242        &mut self,
243        sql: &'q str,
244        _parameters: &'q [MySqlTypeInfo],
245    ) -> Result<MySqlStatement, Error>
246    {
247            self.stream.wait_until_ready()?;
248            let (_, metadata) = self.get_or_prepare(sql, true)?;
249            Ok(MySqlStatement {
250                sql: sql.to_string(),
251                // metadata has internal Arcs for expensive data structures
252                metadata: metadata.clone(),
253            })
254    }
255
256    #[doc(hidden)]
257    fn describe<'q>( &mut self, sql: &'q str) ->Result<Describe<MySql>, Error> {
258            self.stream.wait_until_ready()?;
259            let (_, metadata) = self.get_or_prepare(sql, false)?;
260            let columns = (&*metadata.columns).clone();
261            let nullable = columns
262                .iter()
263                .map(|col| {
264                    col.flags
265                        .map(|flags| !flags.contains(ColumnFlags::NOT_NULL))
266                })
267                .collect();
268            Ok(Describe {
269                parameters: Some(Either::Right(metadata.parameters)),
270                columns,
271                nullable,
272            })
273    }
274}
275
276fn recv_result_columns(
277    stream: &mut MySqlStream,
278    num_columns: usize,
279    columns: &mut Vec<MySqlColumn>,
280) -> Result<(), Error> {
281    columns.clear();
282    columns.reserve(num_columns);
283
284    for ordinal in 0..num_columns {
285        columns.push(recv_next_result_column(&stream.recv()?, ordinal)?);
286    }
287
288    if num_columns > 0 {
289        stream.maybe_recv_eof()?;
290    }
291
292    Ok(())
293}
294
295fn recv_next_result_column(def: &ColumnDefinition, ordinal: usize) -> Result<MySqlColumn, Error> {
296    // if the alias is empty, use the alias
297    // only then use the name
298    let name = match (def.name()?, def.alias()?) {
299        (_, alias) if !alias.is_empty() => UStr::new(alias),
300        (name, _) => UStr::new(name),
301    };
302
303    let type_info = MySqlTypeInfo::from_column(&def);
304
305    Ok(MySqlColumn {
306        name,
307        type_info,
308        ordinal,
309        flags: Some(def.flags),
310    })
311}
312
313fn recv_result_metadata(
314    stream: &mut MySqlStream,
315    num_columns: usize,
316    columns: &mut Vec<MySqlColumn>,
317) -> Result<HashMap<UStr, usize>, Error> {
318    // the result-set metadata is primarily a listing of each output
319    // column in the result-set
320
321    let mut column_names = HashMap::with_capacity(num_columns);
322
323    columns.clear();
324    columns.reserve(num_columns);
325
326    for ordinal in 0..num_columns {
327        let def: ColumnDefinition = stream.recv()?;
328
329        let column = recv_next_result_column(&def, ordinal)?;
330
331        column_names.insert(column.name.clone(), ordinal);
332        columns.push(column);
333    }
334
335    stream.maybe_recv_eof()?;
336
337    Ok(column_names)
338}
339
340
341
342impl Executor for &mut MySqlConnection{
343    type Database = MySql;
344
345    fn fetch_many<'q, E: 'q>(&mut self, query: E) -> ChanStream<Either<<Self::Database as Database>::QueryResult, <Self::Database as Database>::Row>> where E: Execute<'q, Self::Database> {
346        MySqlConnection::fetch_many(self,query)
347    }
348
349    fn fetch_optional<'q, E: 'q>(&mut self, query: E) -> Result<Option<<Self::Database as Database>::Row>, Error> where E: Execute<'q, Self::Database> {
350        MySqlConnection::fetch_optional(self,query)
351    }
352
353    fn prepare_with<'q>(&mut self, sql: &'q str, parameters: &'q [<Self::Database as Database>::TypeInfo]) -> Result<<Self::Database as HasStatement>::Statement, Error> {
354        MySqlConnection::prepare_with(self,sql,parameters)
355    }
356
357    fn describe(&mut self, sql: &str) -> Result<Describe<Self::Database>, Error> {
358        MySqlConnection::describe(self,sql)
359    }
360}