1use super::MySqlStream;
2use crate::connection::stream::Waiting;
3use crate::error::Error;
4use crate::executor::{Execute, Executor};
5use crate::ext::ustr::UStr;
6use crate::io::MySqlBufExt;
7use crate::logger::QueryLogger;
8use crate::protocol::response::Status;
9use crate::protocol::statement::{
10 BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
11};
12use crate::protocol::text::{ColumnDefinition, Query, TextRow};
13use crate::statement::{MySqlStatement, MySqlStatementMetadata};
14use crate::HashMap;
15use crate::{
16 MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, MySqlTypeInfo,
17 MySqlValueFormat,
18};
19use either::Either;
20use futures_core::future::BoxFuture;
21use futures_core::stream::BoxStream;
22use futures_core::Stream;
23use futures_util::TryStreamExt;
24use sqlx_core::column::{ColumnOrigin, TableColumn};
25use sqlx_core::sql_str::SqlStr;
26use std::{pin::pin, sync::Arc};
27
28impl MySqlConnection {
29 async fn prepare_statement(
30 &mut self,
31 sql: &str,
32 ) -> Result<(u32, MySqlStatementMetadata), Error> {
33 self.inner
37 .stream
38 .send_packet(Prepare { query: sql })
39 .await?;
40
41 let ok: PrepareOk = self.inner.stream.recv().await?;
42
43 if ok.params > 0 {
47 for _ in 0..ok.params {
48 let _def: ColumnDefinition = self.inner.stream.recv().await?;
49 }
50
51 self.inner.stream.maybe_recv_eof().await?;
52 }
53
54 let mut columns = Vec::new();
59
60 let column_names = if ok.columns > 0 {
61 recv_result_metadata(&mut self.inner.stream, ok.columns as usize, &mut columns).await?
62 } else {
63 Default::default()
64 };
65
66 let id = ok.statement_id;
67 let metadata = MySqlStatementMetadata {
68 parameters: ok.params as usize,
69 columns: Arc::new(columns),
70 column_names: Arc::new(column_names),
71 };
72
73 Ok((id, metadata))
74 }
75
76 async fn get_or_prepare_statement(
77 &mut self,
78 sql: &str,
79 ) -> Result<(u32, MySqlStatementMetadata), Error> {
80 if let Some(statement) = self.inner.cache_statement.get_mut(sql) {
81 return Ok((*statement).clone());
83 }
84
85 let (id, metadata) = self.prepare_statement(sql).await?;
86
87 if let Some((id, _)) = self
89 .inner
90 .cache_statement
91 .insert(sql, (id, metadata.clone()))
92 {
93 self.inner
94 .stream
95 .send_packet(StmtClose { statement: id })
96 .await?;
97 }
98
99 Ok((id, metadata))
100 }
101
102 #[allow(clippy::needless_lifetimes)]
103 pub(crate) async fn run<'e, 'c: 'e, 'q: 'e>(
104 &'c mut self,
105 sql: SqlStr,
106 arguments: Option<MySqlArguments>,
107 persistent: bool,
108 ) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
109 {
110 let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone());
111
112 self.inner.stream.wait_until_ready().await?;
113 self.inner.stream.waiting.push_back(Waiting::Result);
114
115 Ok(try_stream! {
116 let sql = logger.sql().as_str();
117
118 let mut columns = Arc::new(Vec::new());
122
123 let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
124 if persistent && self.inner.cache_statement.is_enabled() {
125 let (id, metadata) = self
126 .get_or_prepare_statement(sql)
127 .await?;
128
129 if arguments.types.len() != metadata.parameters {
130 return Err(
131 err_protocol!(
132 "prepared statement expected {} parameters but {} parameters were provided",
133 metadata.parameters,
134 arguments.types.len()
135 )
136 );
137 }
138
139 self.inner.stream
141 .send_packet(StatementExecute {
142 statement: id,
143 arguments: &arguments,
144 })
145 .await?;
146
147 (metadata.column_names, MySqlValueFormat::Binary, false)
148 } else {
149 let (id, metadata) = self
150 .prepare_statement(sql)
151 .await?;
152
153 if arguments.types.len() != metadata.parameters {
154 return Err(
155 err_protocol!(
156 "prepared statement expected {} parameters but {} parameters were provided",
157 metadata.parameters,
158 arguments.types.len()
159 )
160 );
161 }
162
163 self.inner.stream
165 .send_packet(StatementExecute {
166 statement: id,
167 arguments: &arguments,
168 })
169 .await?;
170
171 self.inner.stream.send_packet(StmtClose { statement: id }).await?;
172
173 (metadata.column_names, MySqlValueFormat::Binary, false)
174 }
175 } else {
176 self.inner.stream.send_packet(Query(sql)).await?;
178
179 (Arc::default(), MySqlValueFormat::Text, true)
180 };
181
182 loop {
183 let mut packet = self.inner.stream.recv_packet().await?;
186
187 if packet[0] == 0x00 || packet[0] == 0xff {
188 let ok = packet.ok()?;
191
192 self.inner.status_flags = ok.status;
193
194 let rows_affected = ok.affected_rows;
195 logger.increase_rows_affected(rows_affected);
196 let done = MySqlQueryResult {
197 rows_affected,
198 last_insert_id: ok.last_insert_id,
199 };
200
201 r#yield!(Either::Left(done));
202
203 if ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
204 continue;
206 }
207
208 self.inner.stream.waiting.pop_front();
209 return Ok(());
210 }
211
212 *self.inner.stream.waiting.front_mut().unwrap() = Waiting::Row;
214
215 let num_columns = packet.get_uint_lenenc(); let num_columns = usize::try_from(num_columns)
217 .map_err(|_| err_protocol!("column count overflows usize: {num_columns}"))?;
218
219 if needs_metadata {
220 column_names = Arc::new(recv_result_metadata(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?);
221 } else {
222 needs_metadata = true;
225
226 recv_result_columns(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?;
227 }
228
229 loop {
231 let packet = self.inner.stream.recv_packet().await?;
232
233 if packet[0] == 0xfe {
234 let (rows_affected, last_insert_id, status) = if packet.len() < 9 {
235 let eof = packet.eof(self.inner.stream.capabilities)?;
237 (0, 0, eof.status)
238 } else {
239 let ok = packet.ok()?;
241 (ok.affected_rows, ok.last_insert_id, ok.status)
242 };
243
244 self.inner.status_flags = status;
245 r#yield!(Either::Left(MySqlQueryResult {
246 rows_affected,
247 last_insert_id,
248 }));
249
250 if status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
251 *self.inner.stream.waiting.front_mut().unwrap() = Waiting::Result;
252 break;
253 }
254 self.inner.stream.waiting.pop_front();
255 return Ok(());
256 }
257
258 let row = match format {
259 MySqlValueFormat::Binary => packet.decode_with::<BinaryRow, _>(&columns)?.0,
260 MySqlValueFormat::Text => packet.decode_with::<TextRow, _>(&columns)?.0,
261 };
262
263 let v = Either::Right(MySqlRow {
264 row,
265 format,
266 columns: Arc::clone(&columns),
267 column_names: Arc::clone(&column_names),
268 });
269
270 logger.increment_rows_returned();
271
272 r#yield!(v);
273 }
274 }
275 })
276 }
277}
278
279impl<'c> Executor<'c> for &'c mut MySqlConnection {
280 type Database = MySql;
281
282 fn fetch_many<'e, 'q, E>(
283 self,
284 mut query: E,
285 ) -> BoxStream<'e, Result<Either<MySqlQueryResult, MySqlRow>, Error>>
286 where
287 'c: 'e,
288 E: Execute<'q, Self::Database>,
289 'q: 'e,
290 E: 'q,
291 {
292 let arguments = query.take_arguments().map_err(Error::Encode);
293 let persistent = query.persistent();
294
295 Box::pin(try_stream! {
296 let sql = query.sql();
297 let arguments = arguments?;
298 let mut s = pin!(self.run(sql, arguments, persistent).await?);
299
300 while let Some(v) = s.try_next().await? {
301 r#yield!(v);
302 }
303
304 Ok(())
305 })
306 }
307
308 fn fetch_optional<'e, 'q, E>(self, query: E) -> BoxFuture<'e, Result<Option<MySqlRow>, Error>>
309 where
310 'c: 'e,
311 E: Execute<'q, Self::Database>,
312 'q: 'e,
313 E: 'q,
314 {
315 let mut s = self.fetch_many(query);
316
317 Box::pin(async move {
318 while let Some(v) = s.try_next().await? {
319 if let Either::Right(r) = v {
320 return Ok(Some(r));
321 }
322 }
323
324 Ok(None)
325 })
326 }
327
328 fn prepare_with<'e>(
329 self,
330 sql: SqlStr,
331 _parameters: &'e [MySqlTypeInfo],
332 ) -> BoxFuture<'e, Result<MySqlStatement, Error>>
333 where
334 'c: 'e,
335 {
336 Box::pin(async move {
337 self.inner.stream.wait_until_ready().await?;
338
339 let metadata = if self.inner.cache_statement.is_enabled() {
340 self.get_or_prepare_statement(sql.as_str()).await?.1
341 } else {
342 let (id, metadata) = self.prepare_statement(sql.as_str()).await?;
343
344 self.inner
345 .stream
346 .send_packet(StmtClose { statement: id })
347 .await?;
348
349 metadata
350 };
351
352 Ok(MySqlStatement {
353 sql,
354 metadata: metadata.clone(),
356 })
357 })
358 }
359
360 #[doc(hidden)]
361 #[cfg(feature = "offline")]
362 fn describe<'e>(
363 self,
364 sql: SqlStr,
365 ) -> BoxFuture<'e, Result<crate::describe::Describe<MySql>, Error>>
366 where
367 'c: 'e,
368 {
369 Box::pin(async move {
370 self.inner.stream.wait_until_ready().await?;
371
372 let (id, metadata) = self.prepare_statement(sql.as_str()).await?;
373
374 self.inner
375 .stream
376 .send_packet(StmtClose { statement: id })
377 .await?;
378
379 let columns = (*metadata.columns).clone();
380
381 let nullable = columns
382 .iter()
383 .map(|col| {
384 col.flags
385 .map(|flags| !flags.contains(crate::protocol::text::ColumnFlags::NOT_NULL))
386 })
387 .collect();
388
389 Ok(crate::describe::Describe {
390 parameters: Some(Either::Right(metadata.parameters)),
391 columns,
392 nullable,
393 })
394 })
395 }
396}
397
398async fn recv_result_columns(
399 stream: &mut MySqlStream,
400 num_columns: usize,
401 columns: &mut Vec<MySqlColumn>,
402) -> Result<(), Error> {
403 columns.clear();
404 columns.reserve(num_columns);
405
406 for ordinal in 0..num_columns {
407 columns.push(recv_next_result_column(&stream.recv().await?, ordinal)?);
408 }
409
410 if num_columns > 0 {
411 stream.maybe_recv_eof().await?;
412 }
413
414 Ok(())
415}
416
417fn recv_next_result_column(def: &ColumnDefinition, ordinal: usize) -> Result<MySqlColumn, Error> {
418 let column_name = def.name()?;
421
422 let name = match (def.name()?, def.alias()?) {
423 (_, alias) if !alias.is_empty() => UStr::new(alias),
424 (name, _) => UStr::new(name),
425 };
426
427 let table = def.table()?;
428
429 let origin = if table.is_empty() {
430 ColumnOrigin::Expression
431 } else {
432 let schema = def.schema()?;
433
434 ColumnOrigin::Table(TableColumn {
435 table: if !schema.is_empty() {
436 format!("{schema}.{table}").into()
437 } else {
438 table.into()
439 },
440 name: column_name.into(),
441 })
442 };
443
444 let type_info = MySqlTypeInfo::from_column(def);
445
446 Ok(MySqlColumn {
447 name,
448 type_info,
449 ordinal,
450 flags: Some(def.flags),
451 origin,
452 })
453}
454
455async fn recv_result_metadata(
456 stream: &mut MySqlStream,
457 num_columns: usize,
458 columns: &mut Vec<MySqlColumn>,
459) -> Result<HashMap<UStr, usize>, Error> {
460 let mut column_names = HashMap::with_capacity(num_columns);
464
465 columns.clear();
466 columns.reserve(num_columns);
467
468 for ordinal in 0..num_columns {
469 let def: ColumnDefinition = stream.recv().await?;
470
471 let column = recv_next_result_column(&def, ordinal)?;
472
473 column_names.insert(column.name.clone(), ordinal);
474 columns.push(column);
475 }
476
477 stream.maybe_recv_eof().await?;
478
479 Ok(column_names)
480}