1use super::MySqlStream;
2use crate::connection::stream::Waiting;
3use crate::connection::MySqlConnection;
4use crate::io::MySqlBufExt;
5use crate::protocol::response::Status;
6use crate::protocol::statement::{
7 BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
8};
9use crate::protocol::text::{ColumnDefinition, Query, TextRow};
10use crate::query::MysqlQuery;
11use crate::query_result::MySqlQueryResult;
12use crate::result_set::{MySqlColumn, MySqlTypeInfo};
13use crate::row::MySqlRow;
14use crate::stmt::{MySqlArguments, MySqlStatement, MySqlStatementMetadata};
15use crate::value::MySqlValueFormat;
16use either::Either;
17use futures_core::future::BoxFuture;
18use futures_core::stream::BoxStream;
19use futures_core::Stream;
20use futures_util::{pin_mut, TryStreamExt};
21use rbdc::ext::ustr::UStr;
22use rbdc::{try_stream, Error};
23use std::collections::HashMap;
24use std::sync::Arc;
25
26impl MySqlConnection {
27 async fn get_or_prepare<'c>(
28 &mut self,
29 sql: &str,
30 persistent: bool,
31 ) -> Result<(u32, MySqlStatementMetadata), Error> {
32 if let Some(statement) = self.cache_statement.get_mut(sql) {
33 return Ok((*statement).clone());
35 }
36
37 self.stream.send_packet(Prepare { query: sql }).await?;
41
42 let ok: PrepareOk = self.stream.recv().await?;
43
44 if ok.params > 0 {
48 for _ in 0..ok.params {
49 let _def: ColumnDefinition = self.stream.recv().await?;
50 }
51
52 self.stream.maybe_recv_eof().await?;
53 }
54
55 let mut columns = Vec::with_capacity(ok.columns as usize);
60
61 let column_names = if ok.columns > 0 {
62 recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns).await?
63 } else {
64 Default::default()
65 };
66
67 let id = ok.statement_id;
68 let metadata = MySqlStatementMetadata {
69 parameters: ok.params as usize,
70 columns: Arc::new(columns),
71 column_names: Arc::new(column_names),
72 };
73
74 if persistent && self.cache_statement.is_enabled() {
75 if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
77 self.stream.send_packet(StmtClose { statement: id }).await?;
78 }
79 }
80
81 Ok((id, metadata))
82 }
83
84 #[allow(clippy::needless_lifetimes)]
85 async fn run<'e, 'q: 'e>(
86 &'e mut self,
87 sql: &'q str,
88 arguments: Option<MySqlArguments>,
89 persistent: bool,
90 ) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
91 {
92 self.stream.wait_until_ready().await?;
93 self.stream.waiting.push_back(Waiting::Result);
94
95 Ok(Box::pin(try_stream! {
96 let option = self.option.clone();
97
98 let mut columns = Arc::new(Vec::with_capacity({
102 if let Some(arguments) = &arguments {
103 arguments.len()
104 }else{
105 0
106 }
107 }));
108
109 let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
110 let (id, metadata) = self.get_or_prepare(
111 sql,
112 persistent,
113 )
114 .await?;
115
116 self.stream
118 .send_packet(StatementExecute {
119 statement_id: id,
120 arguments: &arguments,
121 })
122 .await?;
123
124 (metadata.column_names, MySqlValueFormat::Binary, false)
125 } else {
126 self.stream.send_packet(Query(sql)).await?;
128
129 (Arc::default(), MySqlValueFormat::Text, true)
130 };
131
132 loop {
133 let mut packet = self.stream.recv_packet().await?;
136
137 if packet[0] == 0x00 || packet[0] == 0xff {
138 let ok = packet.ok()?;
141
142 let rows_affected = ok.affected_rows;
143 let done = MySqlQueryResult {
144 rows_affected,
145 last_insert_id: ok.last_insert_id,
146 };
147
148 r#yield!(Either::Left(done));
149
150 if ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
151 continue;
153 }
154
155 self.stream.waiting.pop_front();
156 return Ok(());
157 }
158
159 *self.stream.waiting.front_mut().unwrap() = Waiting::Row;
161
162 if packet.is_empty() { return Err(Error::protocol("empty packet when expecting column count".to_string())); }
164 let num_columns = packet.get_uint_lenenc() as usize; if needs_metadata {
167 column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?);
168 } else {
169 needs_metadata = true;
172
173 recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?;
174 }
175
176 loop {
178 let packet = self.stream.recv_packet().await?;
179
180 if packet[0] == 0xfe && packet.len() < 9 {
181 let eof = packet.eof(self.stream.capabilities)?;
182
183 r#yield!(Either::Left(MySqlQueryResult {
184 rows_affected: 0,
185 last_insert_id: 0,
186 }));
187
188 if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
189 *self.stream.waiting.front_mut().unwrap() = Waiting::Result;
191 break;
192 }
193
194 self.stream.waiting.pop_front();
195 return Ok(());
196 }
197
198 let row = match format {
199 MySqlValueFormat::Binary => packet.decode_with::<BinaryRow, _>(&columns)?.0,
200 MySqlValueFormat::Text => packet.decode_with::<TextRow, _>(&columns)?.0,
201 };
202
203 let v = Either::Right(MySqlRow {
204 row,
205 format,
206 columns: Arc::clone(&columns),
207 column_names: Arc::clone(&column_names),
208 option: option.clone(),
209 });
210
211 r#yield!(v);
212 }
213 }
214 }))
215 }
216}
217
218impl MySqlConnection {
219 pub async fn execute(&mut self, sql: &str) -> Result<Option<MySqlRow>, Error> {
220 self.fetch_optional(MysqlQuery {
221 statement: Either::Left(sql.to_string()),
222 arguments: vec![],
223 persistent: false,
224 })
225 .await
226 }
227
228 pub fn fetch_many(
229 &mut self,
230 query: MysqlQuery,
231 ) -> BoxStream<'_, Result<Either<MySqlQueryResult, MySqlRow>, Error>> {
232 let sql = query.sql().to_owned();
233 let persistent = query.persistent();
234 Box::pin(try_stream! {
235 let arguments = query.take_arguments()?;
236 let s = self.run(&sql, arguments, persistent).await?;
237 pin_mut!(s);
238
239 while let Some(v) = s.try_next().await? {
240 r#yield!(v);
241 }
242
243 Ok(())
244 })
245 }
246
247 pub fn fetch_optional(
248 &mut self,
249 query: MysqlQuery,
250 ) -> BoxFuture<'_, Result<Option<MySqlRow>, Error>> {
251 let mut s = self.fetch_many(query);
252 Box::pin(async move {
253 while let Some(v) = s.try_next().await? {
254 if let Either::Right(r) = v {
255 return Ok(Some(r));
256 }
257 }
258
259 Ok(None)
260 })
261 }
262
263 pub fn prepare_with<'e>(
264 &'e mut self,
265 sql: &'e str,
266 _: &'e [MySqlTypeInfo],
267 ) -> BoxFuture<'e, Result<MySqlStatement, Error>> {
268 let sql = sql.to_string();
269 Box::pin(async move {
270 self.stream.wait_until_ready().await?;
271
272 let (_, metadata) = self.get_or_prepare(&sql, true).await?;
273
274 Ok(MySqlStatement {
275 sql: sql,
276 metadata: metadata.clone(),
278 })
279 })
280 }
281
282 }
307
308async fn recv_result_columns(
309 stream: &mut MySqlStream,
310 num_columns: usize,
311 columns: &mut Vec<MySqlColumn>,
312) -> Result<(), Error> {
313 columns.clear();
314 columns.reserve(num_columns);
315
316 for ordinal in 0..num_columns {
317 columns.push(recv_next_result_column(&stream.recv().await?, ordinal)?);
318 }
319
320 if num_columns > 0 {
321 stream.maybe_recv_eof().await?;
322 }
323
324 Ok(())
325}
326
327fn recv_next_result_column(def: &ColumnDefinition, ordinal: usize) -> Result<MySqlColumn, Error> {
328 let name = match (def.name()?, def.alias()?) {
331 (_, alias) if !alias.is_empty() => UStr::new(alias),
332 (name, _) => UStr::new(name),
333 };
334
335 let type_info = MySqlTypeInfo::from_column(&def);
336
337 Ok(MySqlColumn {
338 name,
339 type_info,
340 ordinal,
341 })
343}
344
345async fn recv_result_metadata(
346 stream: &mut MySqlStream,
347 num_columns: usize,
348 columns: &mut Vec<MySqlColumn>,
349) -> Result<HashMap<UStr, (usize, MySqlTypeInfo)>, Error> {
350 let mut column_names = HashMap::with_capacity(num_columns);
354
355 columns.clear();
356 columns.reserve(num_columns);
357
358 for ordinal in 0..num_columns {
359 let def: ColumnDefinition = stream.recv().await?;
360
361 let column = recv_next_result_column(&def, ordinal)?;
362
363 column_names.insert(column.name.clone(), (ordinal, column.type_info.clone()));
364 columns.push(column);
365 }
366
367 stream.maybe_recv_eof().await?;
368
369 Ok(column_names)
370}