1use super::core::PgDriver;
5use super::types::*;
6use qail_core::ast::Qail;
7use std::sync::Arc;
8
9impl PgDriver {
10 pub async fn fetch_all(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
16 self.fetch_all_with_format(cmd, ResultFormat::Text).await
17 }
18
19 pub async fn fetch_all_with_format(
25 &mut self,
26 cmd: &Qail,
27 result_format: ResultFormat,
28 ) -> PgResult<Vec<PgRow>> {
29 self.fetch_all_cached_with_format(cmd, result_format).await
31 }
32
33 pub async fn fetch_typed<T: super::row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Vec<T>> {
41 self.fetch_typed_with_format(cmd, ResultFormat::Text).await
42 }
43
44 pub async fn fetch_typed_with_format<T: super::row::QailRow>(
49 &mut self,
50 cmd: &Qail,
51 result_format: ResultFormat,
52 ) -> PgResult<Vec<T>> {
53 let rows = self.fetch_all_with_format(cmd, result_format).await?;
54 Ok(rows.iter().map(T::from_row).collect())
55 }
56
57 pub async fn fetch_one_typed<T: super::row::QailRow>(
60 &mut self,
61 cmd: &Qail,
62 ) -> PgResult<Option<T>> {
63 self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
64 .await
65 }
66
67 pub async fn fetch_one_typed_with_format<T: super::row::QailRow>(
69 &mut self,
70 cmd: &Qail,
71 result_format: ResultFormat,
72 ) -> PgResult<Option<T>> {
73 let rows = self.fetch_all_with_format(cmd, result_format).await?;
74 Ok(rows.first().map(T::from_row))
75 }
76
77 pub async fn fetch_all_uncached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
83 self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
84 .await
85 }
86
87 pub async fn fetch_all_uncached_with_format(
89 &mut self,
90 cmd: &Qail,
91 result_format: ResultFormat,
92 ) -> PgResult<Vec<PgRow>> {
93 use crate::protocol::AstEncoder;
94
95 AstEncoder::encode_cmd_reuse_into_with_result_format(
96 cmd,
97 &mut self.connection.sql_buf,
98 &mut self.connection.params_buf,
99 &mut self.connection.write_buf,
100 result_format.as_wire_code(),
101 )
102 .map_err(|e| PgError::Encode(e.to_string()))?;
103
104 self.connection.flush_write_buf().await?;
105
106 let mut rows: Vec<PgRow> = Vec::with_capacity(32);
107 let mut column_info: Option<Arc<ColumnInfo>> = None;
108
109 let mut error: Option<PgError> = None;
110 let mut flow = super::extended_flow::ExtendedFlowTracker::new(
111 super::extended_flow::ExtendedFlowConfig::parse_bind_describe_portal_execute(),
112 );
113
114 loop {
115 let msg = self.connection.recv().await?;
116 flow.validate(&msg, "driver fetch_all execute", error.is_some())?;
117 match msg {
118 crate::protocol::BackendMessage::ParseComplete
119 | crate::protocol::BackendMessage::BindComplete => {}
120 crate::protocol::BackendMessage::RowDescription(fields) => {
121 column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
122 }
123 crate::protocol::BackendMessage::DataRow(data) => {
124 if error.is_none() {
125 rows.push(PgRow {
126 columns: data,
127 column_info: column_info.clone(),
128 });
129 }
130 }
131 crate::protocol::BackendMessage::NoData => {}
132 crate::protocol::BackendMessage::CommandComplete(_) => {}
133 crate::protocol::BackendMessage::ReadyForQuery(_) => {
134 if let Some(err) = error {
135 return Err(err);
136 }
137 return Ok(rows);
138 }
139 crate::protocol::BackendMessage::ErrorResponse(err) => {
140 if error.is_none() {
141 error = Some(PgError::QueryServer(err.into()));
142 }
143 }
144 msg if is_ignorable_session_message(&msg) => {}
145 other => {
146 return Err(unexpected_backend_message(
147 "driver fetch_all execute",
148 &other,
149 ));
150 }
151 }
152 }
153 }
154
155 pub async fn fetch_all_fast(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
159 self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
160 .await
161 }
162
163 pub async fn fetch_all_fast_with_format(
165 &mut self,
166 cmd: &Qail,
167 result_format: ResultFormat,
168 ) -> PgResult<Vec<PgRow>> {
169 use crate::protocol::AstEncoder;
170
171 AstEncoder::encode_cmd_reuse_into_with_result_format(
172 cmd,
173 &mut self.connection.sql_buf,
174 &mut self.connection.params_buf,
175 &mut self.connection.write_buf,
176 result_format.as_wire_code(),
177 )
178 .map_err(|e| PgError::Encode(e.to_string()))?;
179
180 self.connection.flush_write_buf().await?;
181
182 let mut rows: Vec<PgRow> = Vec::with_capacity(32);
184 let mut error: Option<PgError> = None;
185 let mut flow = super::extended_flow::ExtendedFlowTracker::new(
186 super::extended_flow::ExtendedFlowConfig::parse_bind_execute(true),
187 );
188
189 loop {
190 let res = self.connection.recv_with_data_fast().await;
191 match res {
192 Ok((msg_type, data)) => {
193 flow.validate_msg_type(
194 msg_type,
195 "driver fetch_all_fast execute",
196 error.is_some(),
197 )?;
198 match msg_type {
199 b'D' => {
200 if error.is_none()
201 && let Some(columns) = data
202 {
203 rows.push(PgRow {
204 columns,
205 column_info: None,
206 });
207 }
208 }
209 b'Z' => {
210 if let Some(err) = error {
211 return Err(err);
212 }
213 return Ok(rows);
214 }
215 _ => {}
216 }
217 }
218 Err(e) => {
219 if matches!(&e, PgError::QueryServer(_)) {
221 if error.is_none() {
222 error = Some(e);
223 }
224 continue;
225 }
226 return Err(e);
227 }
228 }
229 }
230 }
231
232 pub async fn fetch_one(&mut self, cmd: &Qail) -> PgResult<PgRow> {
234 let rows = self.fetch_all(cmd).await?;
235 rows.into_iter().next().ok_or(PgError::NoRows)
236 }
237
238 pub async fn fetch_all_cached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
247 self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
248 .await
249 }
250
251 pub async fn fetch_all_cached_with_format(
253 &mut self,
254 cmd: &Qail,
255 result_format: ResultFormat,
256 ) -> PgResult<Vec<PgRow>> {
257 let mut retried = false;
258 loop {
259 match self
260 .fetch_all_cached_with_format_once(cmd, result_format)
261 .await
262 {
263 Ok(rows) => return Ok(rows),
264 Err(err)
265 if !retried
266 && (err.is_prepared_statement_retryable()
267 || err.is_prepared_statement_already_exists()) =>
268 {
269 retried = true;
270 if err.is_prepared_statement_retryable() {
271 self.connection.clear_prepared_statement_state();
272 }
273 }
274 Err(err) => return Err(err),
275 }
276 }
277 }
278
279 async fn fetch_all_cached_with_format_once(
280 &mut self,
281 cmd: &Qail,
282 result_format: ResultFormat,
283 ) -> PgResult<Vec<PgRow>> {
284 use crate::protocol::AstEncoder;
285 use std::collections::hash_map::DefaultHasher;
286 use std::hash::{Hash, Hasher};
287
288 self.connection.sql_buf.clear();
289 self.connection.params_buf.clear();
290
291 match cmd.action {
293 qail_core::ast::Action::Get | qail_core::ast::Action::With => {
294 crate::protocol::ast_encoder::dml::encode_select(
295 cmd,
296 &mut self.connection.sql_buf,
297 &mut self.connection.params_buf,
298 )?;
299 }
300 qail_core::ast::Action::Add => {
301 crate::protocol::ast_encoder::dml::encode_insert(
302 cmd,
303 &mut self.connection.sql_buf,
304 &mut self.connection.params_buf,
305 )?;
306 }
307 qail_core::ast::Action::Set => {
308 crate::protocol::ast_encoder::dml::encode_update(
309 cmd,
310 &mut self.connection.sql_buf,
311 &mut self.connection.params_buf,
312 )?;
313 }
314 qail_core::ast::Action::Del => {
315 crate::protocol::ast_encoder::dml::encode_delete(
316 cmd,
317 &mut self.connection.sql_buf,
318 &mut self.connection.params_buf,
319 )?;
320 }
321 _ => {
322 let (sql, params) =
324 AstEncoder::encode_cmd_sql(cmd).map_err(|e| PgError::Encode(e.to_string()))?;
325 let raw_rows = self
326 .connection
327 .query_cached_with_result_format(&sql, ¶ms, result_format.as_wire_code())
328 .await?;
329 return Ok(raw_rows
330 .into_iter()
331 .map(|data| PgRow {
332 columns: data,
333 column_info: None,
334 })
335 .collect());
336 }
337 }
338
339 let mut hasher = DefaultHasher::new();
340 self.connection.sql_buf.hash(&mut hasher);
341 let sql_hash = hasher.finish();
342
343 let is_cache_miss = !self.connection.stmt_cache.contains(&sql_hash);
344
345 self.connection.write_buf.clear();
347
348 let stmt_name = if let Some(name) = self.connection.stmt_cache.get(&sql_hash) {
349 name
350 } else {
351 let name = format!("qail_{:x}", sql_hash);
352
353 self.connection.evict_prepared_if_full();
355
356 let sql_str = std::str::from_utf8(&self.connection.sql_buf).unwrap_or("");
357
358 use crate::protocol::PgEncoder;
360 let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
361 let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
362 self.connection.write_buf.extend_from_slice(&parse_msg);
363 self.connection.write_buf.extend_from_slice(&describe_msg);
364
365 self.connection.stmt_cache.put(sql_hash, name.clone());
366 self.connection
367 .prepared_statements
368 .insert(name.clone(), sql_str.to_string());
369
370 name
371 };
372
373 use crate::protocol::PgEncoder;
375 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
376 &mut self.connection.write_buf,
377 &stmt_name,
378 &self.connection.params_buf,
379 result_format.as_wire_code(),
380 ) {
381 if is_cache_miss {
382 self.connection.stmt_cache.remove(&sql_hash);
383 self.connection.prepared_statements.remove(&stmt_name);
384 self.connection.column_info_cache.remove(&sql_hash);
385 }
386 return Err(PgError::Encode(e.to_string()));
387 }
388 PgEncoder::encode_execute_to(&mut self.connection.write_buf);
389 PgEncoder::encode_sync_to(&mut self.connection.write_buf);
390
391 if let Err(err) = self.connection.flush_write_buf().await {
393 if is_cache_miss {
394 self.connection.stmt_cache.remove(&sql_hash);
395 self.connection.prepared_statements.remove(&stmt_name);
396 self.connection.column_info_cache.remove(&sql_hash);
397 }
398 return Err(err);
399 }
400
401 let cached_column_info = self.connection.column_info_cache.get(&sql_hash).cloned();
403
404 let mut rows: Vec<PgRow> = Vec::with_capacity(32);
405 let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
406 let mut error: Option<PgError> = None;
407 let mut flow = super::extended_flow::ExtendedFlowTracker::new(
408 super::extended_flow::ExtendedFlowConfig::parse_describe_statement_bind_execute(
409 is_cache_miss,
410 ),
411 );
412
413 loop {
414 let msg = match self.connection.recv().await {
415 Ok(msg) => msg,
416 Err(err) => {
417 if is_cache_miss && !flow.saw_parse_complete() {
418 self.connection.stmt_cache.remove(&sql_hash);
419 self.connection.prepared_statements.remove(&stmt_name);
420 self.connection.column_info_cache.remove(&sql_hash);
421 }
422 return Err(err);
423 }
424 };
425 if let Err(err) =
426 flow.validate(&msg, "driver fetch_all_cached execute", error.is_some())
427 {
428 if is_cache_miss && !flow.saw_parse_complete() {
429 self.connection.stmt_cache.remove(&sql_hash);
430 self.connection.prepared_statements.remove(&stmt_name);
431 self.connection.column_info_cache.remove(&sql_hash);
432 }
433 return Err(err);
434 }
435 match msg {
436 crate::protocol::BackendMessage::ParseComplete => {}
437 crate::protocol::BackendMessage::BindComplete => {}
438 crate::protocol::BackendMessage::ParameterDescription(_) => {
439 }
441 crate::protocol::BackendMessage::RowDescription(fields) => {
442 let info = Arc::new(ColumnInfo::from_fields(&fields));
444 if is_cache_miss {
445 self.connection
446 .column_info_cache
447 .insert(sql_hash, info.clone());
448 }
449 column_info = Some(info);
450 }
451 crate::protocol::BackendMessage::DataRow(data) => {
452 if error.is_none() {
453 rows.push(PgRow {
454 columns: data,
455 column_info: column_info.clone(),
456 });
457 }
458 }
459 crate::protocol::BackendMessage::CommandComplete(_) => {}
460 crate::protocol::BackendMessage::NoData => {
461 }
463 crate::protocol::BackendMessage::ReadyForQuery(_) => {
464 if let Some(err) = error {
465 if is_cache_miss
466 && !flow.saw_parse_complete()
467 && !err.is_prepared_statement_already_exists()
468 {
469 self.connection.stmt_cache.remove(&sql_hash);
470 self.connection.prepared_statements.remove(&stmt_name);
471 self.connection.column_info_cache.remove(&sql_hash);
472 }
473 return Err(err);
474 }
475 if is_cache_miss && !flow.saw_parse_complete() {
476 self.connection.stmt_cache.remove(&sql_hash);
477 self.connection.prepared_statements.remove(&stmt_name);
478 self.connection.column_info_cache.remove(&sql_hash);
479 return Err(PgError::Protocol(
480 "Cache miss query reached ReadyForQuery without ParseComplete"
481 .to_string(),
482 ));
483 }
484 return Ok(rows);
485 }
486 crate::protocol::BackendMessage::ErrorResponse(err) => {
487 if error.is_none() {
488 let query_err = PgError::QueryServer(err.into());
489 if query_err.is_prepared_statement_retryable() {
490 self.connection.clear_prepared_statement_state();
491 }
492 error = Some(query_err);
493 }
494 }
495 msg if is_ignorable_session_message(&msg) => {}
496 other => {
497 if is_cache_miss && !flow.saw_parse_complete() {
498 self.connection.stmt_cache.remove(&sql_hash);
499 self.connection.prepared_statements.remove(&stmt_name);
500 self.connection.column_info_cache.remove(&sql_hash);
501 }
502 return Err(unexpected_backend_message(
503 "driver fetch_all_cached execute",
504 &other,
505 ));
506 }
507 }
508 }
509 }
510
511 pub async fn execute(&mut self, cmd: &Qail) -> PgResult<u64> {
513 use crate::protocol::AstEncoder;
514
515 let wire_bytes = AstEncoder::encode_cmd_reuse(
516 cmd,
517 &mut self.connection.sql_buf,
518 &mut self.connection.params_buf,
519 )
520 .map_err(|e| PgError::Encode(e.to_string()))?;
521
522 self.connection.send_bytes(&wire_bytes).await?;
523
524 let mut affected = 0u64;
525 let mut error: Option<PgError> = None;
526 let mut flow = super::extended_flow::ExtendedFlowTracker::new(
527 super::extended_flow::ExtendedFlowConfig::parse_bind_describe_portal_execute(),
528 );
529
530 loop {
531 let msg = self.connection.recv().await?;
532 flow.validate(&msg, "driver execute mutation", error.is_some())?;
533 match msg {
534 crate::protocol::BackendMessage::ParseComplete
535 | crate::protocol::BackendMessage::BindComplete => {}
536 crate::protocol::BackendMessage::RowDescription(_) => {}
537 crate::protocol::BackendMessage::DataRow(_) => {}
538 crate::protocol::BackendMessage::NoData => {}
539 crate::protocol::BackendMessage::CommandComplete(tag) => {
540 if error.is_none()
541 && let Some(n) = tag.split_whitespace().last()
542 {
543 affected = n.parse().unwrap_or(0);
544 }
545 }
546 crate::protocol::BackendMessage::ReadyForQuery(_) => {
547 if let Some(err) = error {
548 return Err(err);
549 }
550 return Ok(affected);
551 }
552 crate::protocol::BackendMessage::ErrorResponse(err) => {
553 if error.is_none() {
554 error = Some(PgError::QueryServer(err.into()));
555 }
556 }
557 msg if is_ignorable_session_message(&msg) => {}
558 other => {
559 return Err(unexpected_backend_message(
560 "driver execute mutation",
561 &other,
562 ));
563 }
564 }
565 }
566 }
567
568 pub async fn query_ast(&mut self, cmd: &Qail) -> PgResult<QueryResult> {
572 self.query_ast_with_format(cmd, ResultFormat::Text).await
573 }
574
575 pub async fn query_ast_with_format(
577 &mut self,
578 cmd: &Qail,
579 result_format: ResultFormat,
580 ) -> PgResult<QueryResult> {
581 use crate::protocol::AstEncoder;
582
583 let wire_bytes = AstEncoder::encode_cmd_reuse_with_result_format(
584 cmd,
585 &mut self.connection.sql_buf,
586 &mut self.connection.params_buf,
587 result_format.as_wire_code(),
588 )
589 .map_err(|e| PgError::Encode(e.to_string()))?;
590
591 self.connection.send_bytes(&wire_bytes).await?;
592
593 let mut columns: Vec<String> = Vec::new();
594 let mut rows: Vec<Vec<Option<String>>> = Vec::new();
595 let mut error: Option<PgError> = None;
596 let mut flow = super::extended_flow::ExtendedFlowTracker::new(
597 super::extended_flow::ExtendedFlowConfig::parse_bind_describe_portal_execute(),
598 );
599
600 loop {
601 let msg = self.connection.recv().await?;
602 flow.validate(&msg, "driver query_ast", error.is_some())?;
603 match msg {
604 crate::protocol::BackendMessage::ParseComplete
605 | crate::protocol::BackendMessage::BindComplete => {}
606 crate::protocol::BackendMessage::RowDescription(fields) => {
607 columns = fields.into_iter().map(|f| f.name).collect();
608 }
609 crate::protocol::BackendMessage::DataRow(data) => {
610 if error.is_none() {
611 let row: Vec<Option<String>> = data
612 .into_iter()
613 .map(|col| col.map(|bytes| String::from_utf8_lossy(&bytes).to_string()))
614 .collect();
615 rows.push(row);
616 }
617 }
618 crate::protocol::BackendMessage::CommandComplete(_) => {}
619 crate::protocol::BackendMessage::NoData => {}
620 crate::protocol::BackendMessage::ReadyForQuery(_) => {
621 if let Some(err) = error {
622 return Err(err);
623 }
624 return Ok(QueryResult { columns, rows });
625 }
626 crate::protocol::BackendMessage::ErrorResponse(err) => {
627 if error.is_none() {
628 error = Some(PgError::QueryServer(err.into()));
629 }
630 }
631 msg if is_ignorable_session_message(&msg) => {}
632 other => return Err(unexpected_backend_message("driver query_ast", &other)),
633 }
634 }
635 }
636}