sqlx_core_oldapi/mysql/connection/
executor.rs1use super::MySqlStream;
2use crate::describe::Describe;
3use crate::error::Error;
4use crate::executor::{Execute, Executor};
5use crate::ext::ustr::UStr;
6use crate::logger::QueryLogger;
7use crate::mysql::connection::stream::Waiting;
8use crate::mysql::io::MySqlBufExt;
9use crate::mysql::protocol::response::Status;
10use crate::mysql::protocol::statement::{
11 BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
12};
13use crate::mysql::protocol::text::{ColumnDefinition, ColumnFlags, Query, TextRow};
14use crate::mysql::statement::{MySqlStatement, MySqlStatementMetadata};
15use crate::mysql::{
16 MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, MySqlTypeInfo,
17 MySqlValueFormat,
18};
19use crate::HashMap;
20use either::Either;
21use futures_core::future::BoxFuture;
22use futures_core::stream::BoxStream;
23use futures_core::Stream;
24use futures_util::{pin_mut, TryStreamExt};
25use std::{borrow::Cow, sync::Arc};
26
27impl MySqlConnection {
28 async fn get_or_prepare<'c>(
29 &mut self,
30 sql: &str,
31 persistent: bool,
32 ) -> Result<(u32, MySqlStatementMetadata), Error> {
33 if let Some(statement) = self.cache_statement.get_mut(sql) {
34 return Ok((*statement).clone());
36 }
37
38 self.stream.send_packet(Prepare { query: sql }).await?;
42
43 let ok: PrepareOk = self.stream.recv().await?;
44
45 if ok.params > 0 {
49 for _ in 0..ok.params {
50 let _def: ColumnDefinition = self.stream.recv().await?;
51 }
52
53 self.stream.maybe_recv_eof().await?;
54 }
55
56 let mut columns = Vec::new();
61
62 let column_names = if ok.columns > 0 {
63 recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns).await?
64 } else {
65 Default::default()
66 };
67
68 let id = ok.statement_id;
69 let metadata = MySqlStatementMetadata {
70 parameters: ok.params as usize,
71 columns: Arc::new(columns),
72 column_names: Arc::new(column_names),
73 };
74
75 if persistent && self.cache_statement.is_enabled() {
76 if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
78 self.stream.send_packet(StmtClose { statement: id }).await?;
79 }
80 }
81
82 Ok((id, metadata))
83 }
84
85 #[allow(clippy::needless_lifetimes)]
86 async fn run<'e, 'c: 'e, 'q: 'e>(
87 &'c mut self,
88 sql: &'q str,
89 arguments: Option<MySqlArguments>,
90 persistent: bool,
91 ) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
92 {
93 let mut logger = QueryLogger::new(sql, self.log_settings.clone());
94
95 self.stream.wait_until_ready().await?;
96 self.stream.waiting.push_back(Waiting::Result);
97
98 Ok(Box::pin(try_stream! {
99 let mut columns = Arc::new(Vec::new());
103
104 let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
105 let (id, metadata) = self.get_or_prepare(
106 sql,
107 persistent,
108 )
109 .await?;
110
111 self.stream
113 .send_packet(StatementExecute {
114 statement: id,
115 arguments: &arguments,
116 })
117 .await?;
118
119 (metadata.column_names, MySqlValueFormat::Binary, false)
120 } else {
121 self.stream.send_packet(Query(sql)).await?;
123
124 (Arc::default(), MySqlValueFormat::Text, true)
125 };
126
127 loop {
128 let mut packet = self.stream.recv_packet().await?;
131
132 if packet[0] == 0x00 || packet[0] == 0xff {
133 let ok = packet.ok()?;
136
137 let rows_affected = ok.affected_rows;
138 logger.increase_rows_affected(rows_affected);
139 let done = MySqlQueryResult {
140 rows_affected,
141 last_insert_id: ok.last_insert_id,
142 };
143
144 r#yield!(Either::Left(done));
145
146 if ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
147 continue;
149 }
150
151 self.stream.waiting.pop_front();
152 return Ok(());
153 }
154
155 *self.stream.waiting.front_mut().unwrap() = Waiting::Row;
157
158 let num_columns = packet.get_uint_lenenc() as usize; if needs_metadata {
161 column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?);
162 } else {
163 needs_metadata = true;
166
167 recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?;
168 }
169
170 loop {
172 let packet = self.stream.recv_packet().await?;
173
174 if packet[0] == 0xfe && packet.len() < 9 {
175 let eof = packet.eof(self.stream.capabilities)?;
176
177 r#yield!(Either::Left(MySqlQueryResult {
178 rows_affected: 0,
179 last_insert_id: 0,
180 }));
181
182 if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
183 *self.stream.waiting.front_mut().unwrap() = Waiting::Result;
185 break;
186 }
187
188 self.stream.waiting.pop_front();
189 return Ok(());
190 }
191
192 let row = match format {
193 MySqlValueFormat::Binary => packet.decode_with::<BinaryRow, _>(&columns)?.0,
194 MySqlValueFormat::Text => packet.decode_with::<TextRow, _>(&columns)?.0,
195 };
196
197 let v = Either::Right(MySqlRow {
198 row,
199 format,
200 columns: Arc::clone(&columns),
201 column_names: Arc::clone(&column_names),
202 });
203
204 logger.increment_rows_returned();
205
206 r#yield!(v);
207 }
208 }
209 }))
210 }
211}
212
213impl<'c> Executor<'c> for &'c mut MySqlConnection {
214 type Database = MySql;
215
216 fn fetch_many<'e, 'q: 'e, E: 'q>(
217 self,
218 mut query: E,
219 ) -> BoxStream<'e, Result<Either<MySqlQueryResult, MySqlRow>, Error>>
220 where
221 'c: 'e,
222 E: Execute<'q, Self::Database>,
223 {
224 let sql = query.sql();
225 let arguments = query.take_arguments();
226 let persistent = query.persistent();
227
228 Box::pin(try_stream! {
229 let s = self.run(sql, arguments, persistent).await?;
230 pin_mut!(s);
231
232 while let Some(v) = s.try_next().await? {
233 r#yield!(v);
234 }
235
236 Ok(())
237 })
238 }
239
240 fn fetch_optional<'e, 'q: 'e, E: 'q>(
241 self,
242 query: E,
243 ) -> BoxFuture<'e, Result<Option<MySqlRow>, Error>>
244 where
245 'c: 'e,
246 E: Execute<'q, Self::Database>,
247 {
248 let mut s = self.fetch_many(query);
249
250 Box::pin(async move {
251 while let Some(v) = s.try_next().await? {
252 if let Either::Right(r) = v {
253 return Ok(Some(r));
254 }
255 }
256
257 Ok(None)
258 })
259 }
260
261 fn prepare_with<'e, 'q: 'e>(
262 self,
263 sql: &'q str,
264 _parameters: &'e [MySqlTypeInfo],
265 ) -> BoxFuture<'e, Result<MySqlStatement<'q>, Error>>
266 where
267 'c: 'e,
268 {
269 Box::pin(async move {
270 self.stream.wait_until_ready().await?;
271
272 let (_, metadata) = self.get_or_prepare(sql, true).await?;
273
274 Ok(MySqlStatement {
275 sql: Cow::Borrowed(sql),
276 metadata: metadata.clone(),
278 })
279 })
280 }
281
282 #[doc(hidden)]
283 fn describe<'e, 'q: 'e>(self, sql: &'q str) -> BoxFuture<'e, Result<Describe<MySql>, Error>>
284 where
285 'c: 'e,
286 {
287 Box::pin(async move {
288 self.stream.wait_until_ready().await?;
289
290 let (_, metadata) = self.get_or_prepare(sql, false).await?;
291
292 let columns = (&*metadata.columns).clone();
293
294 let nullable = columns
295 .iter()
296 .map(|col| {
297 col.flags
298 .map(|flags| !flags.contains(ColumnFlags::NOT_NULL))
299 })
300 .collect();
301
302 Ok(Describe {
303 parameters: Some(Either::Right(metadata.parameters)),
304 columns,
305 nullable,
306 })
307 })
308 }
309}
310
311async fn recv_result_columns(
312 stream: &mut MySqlStream,
313 num_columns: usize,
314 columns: &mut Vec<MySqlColumn>,
315) -> Result<(), Error> {
316 columns.clear();
317 columns.reserve(num_columns);
318
319 for ordinal in 0..num_columns {
320 columns.push(recv_next_result_column(&stream.recv().await?, ordinal)?);
321 }
322
323 if num_columns > 0 {
324 stream.maybe_recv_eof().await?;
325 }
326
327 Ok(())
328}
329
330fn recv_next_result_column(def: &ColumnDefinition, ordinal: usize) -> Result<MySqlColumn, Error> {
331 let name = match (def.name()?, def.alias()?) {
334 (_, alias) if !alias.is_empty() => UStr::new(alias),
335 (name, _) => UStr::new(name),
336 };
337
338 let type_info = MySqlTypeInfo::from_column(&def);
339
340 Ok(MySqlColumn {
341 name,
342 type_info,
343 ordinal,
344 flags: Some(def.flags),
345 })
346}
347
348async fn recv_result_metadata(
349 stream: &mut MySqlStream,
350 num_columns: usize,
351 columns: &mut Vec<MySqlColumn>,
352) -> Result<HashMap<UStr, usize>, Error> {
353 let mut column_names = HashMap::with_capacity(num_columns);
357
358 columns.clear();
359 columns.reserve(num_columns);
360
361 for ordinal in 0..num_columns {
362 let def: ColumnDefinition = stream.recv().await?;
363
364 let column = recv_next_result_column(&def, ordinal)?;
365
366 column_names.insert(column.name.clone(), ordinal);
367 columns.push(column);
368 }
369
370 stream.maybe_recv_eof().await?;
371
372 Ok(column_names)
373}