1use super::MySqlStream;
2use cdbc::describe::Describe;
3use cdbc::error::Error;
4use cdbc::executor::{Execute, Executor};
5use crate::connection::stream::Waiting;
6use crate::io::MySqlBufExt;
7use crate::protocol::response::Status;
8use crate::protocol::statement::{
9 BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
10};
11use crate::protocol::text::{ColumnDefinition, ColumnFlags, Query, TextRow};
12use crate::statement::{MySqlStatement, MySqlStatementMetadata};
13use crate::{
14 MySql, MySqlArguments, MySqlColumn, MySqlConnection, MySqlQueryResult, MySqlRow, MySqlTypeInfo,
15 MySqlValueFormat,
16};
17use cdbc::{chan_stream, HashMap};
18use either::Either;
19use std::{borrow::Cow, sync::Arc};
20use cdbc::database::{Database, HasStatement};
21use cdbc::io::chan_stream::{ChanStream, Stream, TryStream};
22use cdbc::utils::ustr::UStr;
23
24impl MySqlConnection {
25 fn get_or_prepare<'c>(
26 &mut self,
27 sql: &str,
28 persistent: bool,
29 ) -> Result<(u32, MySqlStatementMetadata), Error> {
30 if let Some(statement) = self.cache_statement.get_mut(sql) {
31 return Ok((*statement).clone());
33 }
34
35 self.stream.send_packet(Prepare { query: sql })?;
39
40 let ok: PrepareOk = self.stream.recv()?;
41
42 if ok.params > 0 {
46 for _ in 0..ok.params {
47 let _def: ColumnDefinition = self.stream.recv()?;
48 }
49
50 self.stream.maybe_recv_eof()?;
51 }
52
53 let mut columns = Vec::new();
58
59 let column_names = if ok.columns > 0 {
60 recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns)?
61 } else {
62 Default::default()
63 };
64
65 let id = ok.statement_id;
66 let metadata = MySqlStatementMetadata {
67 parameters: ok.params as usize,
68 columns: Arc::new(columns),
69 column_names: Arc::new(column_names),
70 };
71
72 if persistent && self.cache_statement.is_enabled() {
73 if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
75 self.stream.send_packet(StmtClose { statement: id })?;
76 }
77 }
78
79 Ok((id, metadata))
80 }
81
82 #[allow(clippy::needless_lifetimes)]
83 fn run<'e, 'c: 'e, 'q: 'e>(
84 &'c mut self,
85 sql: &'q str,
86 arguments: Option<MySqlArguments>,
87 persistent: bool,
88 ) -> Result<ChanStream<Either<MySqlQueryResult, MySqlRow>>, Error>
89 {
90 self.stream.wait_until_ready()?;
91 self.stream.waiting.push_back(Waiting::Result);
92
93 Ok(chan_stream!({
94 let mut columns = Arc::new(Vec::new());
98
99 let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
100 let (id, metadata) = self.get_or_prepare(
101 sql,
102 persistent,
103 )
104 ?;
105
106 self.stream
108 .send_packet(StatementExecute {
109 statement: id,
110 arguments: &arguments,
111 })
112 ?;
113
114 (metadata.column_names, MySqlValueFormat::Binary, false)
115 } else {
116 self.stream.send_packet(Query(sql))?;
118
119 (Arc::default(), MySqlValueFormat::Text, true)
120 };
121
122 loop {
123 let mut packet = self.stream.recv_packet()?;
126
127 if packet[0] == 0x00 || packet[0] == 0xff {
128 let ok = packet.ok()?;
131
132 let done = MySqlQueryResult {
133 rows_affected: ok.affected_rows,
134 last_insert_id: ok.last_insert_id,
135 };
136
137 r#yield!(Either::Left(done));
138
139 if ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
140 continue;
142 }
143
144 self.stream.waiting.pop_front();
145 return Ok(());
146 }
147
148 *self.stream.waiting.front_mut().unwrap() = Waiting::Row;
150
151 let num_columns = packet.get_uint_lenenc() as usize; if needs_metadata {
154 column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns))?);
155 } else {
156 needs_metadata = true;
159
160 recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns))?;
161 }
162
163 loop {
165 let packet = self.stream.recv_packet()?;
166
167 if packet[0] == 0xfe && packet.len() < 9 {
168 let eof = packet.eof(self.stream.capabilities)?;
169
170 r#yield!(Either::Left(MySqlQueryResult {
171 rows_affected: 0,
172 last_insert_id: 0,
173 }));
174
175 if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
176 *self.stream.waiting.front_mut().unwrap() = Waiting::Result;
178 break;
179 }
180
181 self.stream.waiting.pop_front();
182 return Ok(());
183 }
184
185 let row = match format {
186 MySqlValueFormat::Binary => packet.decode_with::<BinaryRow, _>(&columns)?.0,
187 MySqlValueFormat::Text => packet.decode_with::<TextRow, _>(&columns)?.0,
188 };
189
190 let v = Either::Right(MySqlRow {
191 row,
192 format,
193 columns: Arc::clone(&columns),
194 column_names: Arc::clone(&column_names),
195 });
196 r#yield!(v);
197 }
198 }
199 }))
200 }
201}
202
203impl Executor for MySqlConnection {
204 type Database = MySql;
205
206 fn fetch_many<'q, E: 'q>(
207 &mut self,
208 mut query: E,
209 ) -> ChanStream<Either<MySqlQueryResult, MySqlRow>>
210 where E: Execute<'q, Self::Database>,
211 {
212 let arguments = query.take_arguments();
213 let persistent = query.persistent();
214
215 chan_stream! {
216 let mut s = self.run(query.sql(), arguments, persistent)?;
217
218 while let Some(v) = s.try_next()? {
219 r#yield!(v);
220 }
221
222 Ok(())
223 }
224 }
225
226 fn fetch_optional<'q, E: 'q>(
227 &mut self,
228 query: E,
229 ) -> Result<Option<MySqlRow>, Error>
230 where E: Execute<'q, Self::Database>,
231 {
232 let mut s = self.fetch_many(query);
233 while let Some(v) = s.try_next()? {
234 if let Either::Right(r) = v {
235 return Ok(Some(r));
236 }
237 }
238 Ok(None)
239 }
240
241 fn prepare_with<'q>(
242 &mut self,
243 sql: &'q str,
244 _parameters: &'q [MySqlTypeInfo],
245 ) -> Result<MySqlStatement, Error>
246 {
247 self.stream.wait_until_ready()?;
248 let (_, metadata) = self.get_or_prepare(sql, true)?;
249 Ok(MySqlStatement {
250 sql: sql.to_string(),
251 metadata: metadata.clone(),
253 })
254 }
255
256 #[doc(hidden)]
257 fn describe<'q>( &mut self, sql: &'q str) ->Result<Describe<MySql>, Error> {
258 self.stream.wait_until_ready()?;
259 let (_, metadata) = self.get_or_prepare(sql, false)?;
260 let columns = (&*metadata.columns).clone();
261 let nullable = columns
262 .iter()
263 .map(|col| {
264 col.flags
265 .map(|flags| !flags.contains(ColumnFlags::NOT_NULL))
266 })
267 .collect();
268 Ok(Describe {
269 parameters: Some(Either::Right(metadata.parameters)),
270 columns,
271 nullable,
272 })
273 }
274}
275
276fn recv_result_columns(
277 stream: &mut MySqlStream,
278 num_columns: usize,
279 columns: &mut Vec<MySqlColumn>,
280) -> Result<(), Error> {
281 columns.clear();
282 columns.reserve(num_columns);
283
284 for ordinal in 0..num_columns {
285 columns.push(recv_next_result_column(&stream.recv()?, ordinal)?);
286 }
287
288 if num_columns > 0 {
289 stream.maybe_recv_eof()?;
290 }
291
292 Ok(())
293}
294
295fn recv_next_result_column(def: &ColumnDefinition, ordinal: usize) -> Result<MySqlColumn, Error> {
296 let name = match (def.name()?, def.alias()?) {
299 (_, alias) if !alias.is_empty() => UStr::new(alias),
300 (name, _) => UStr::new(name),
301 };
302
303 let type_info = MySqlTypeInfo::from_column(&def);
304
305 Ok(MySqlColumn {
306 name,
307 type_info,
308 ordinal,
309 flags: Some(def.flags),
310 })
311}
312
313fn recv_result_metadata(
314 stream: &mut MySqlStream,
315 num_columns: usize,
316 columns: &mut Vec<MySqlColumn>,
317) -> Result<HashMap<UStr, usize>, Error> {
318 let mut column_names = HashMap::with_capacity(num_columns);
322
323 columns.clear();
324 columns.reserve(num_columns);
325
326 for ordinal in 0..num_columns {
327 let def: ColumnDefinition = stream.recv()?;
328
329 let column = recv_next_result_column(&def, ordinal)?;
330
331 column_names.insert(column.name.clone(), ordinal);
332 columns.push(column);
333 }
334
335 stream.maybe_recv_eof()?;
336
337 Ok(column_names)
338}
339
340
341
342impl Executor for &mut MySqlConnection{
343 type Database = MySql;
344
345 fn fetch_many<'q, E: 'q>(&mut self, query: E) -> ChanStream<Either<<Self::Database as Database>::QueryResult, <Self::Database as Database>::Row>> where E: Execute<'q, Self::Database> {
346 MySqlConnection::fetch_many(self,query)
347 }
348
349 fn fetch_optional<'q, E: 'q>(&mut self, query: E) -> Result<Option<<Self::Database as Database>::Row>, Error> where E: Execute<'q, Self::Database> {
350 MySqlConnection::fetch_optional(self,query)
351 }
352
353 fn prepare_with<'q>(&mut self, sql: &'q str, parameters: &'q [<Self::Database as Database>::TypeInfo]) -> Result<<Self::Database as HasStatement>::Statement, Error> {
354 MySqlConnection::prepare_with(self,sql,parameters)
355 }
356
357 fn describe(&mut self, sql: &str) -> Result<Describe<Self::Database>, Error> {
358 MySqlConnection::describe(self,sql)
359 }
360}