1use super::connection::PooledConnection;
4use super::lifecycle::MAX_HOT_STATEMENTS;
5use crate::driver::{
6 PgError, PgResult, ResultFormat,
7 extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8 is_ignorable_session_message, unexpected_backend_message,
9};
10use std::sync::Arc;
11
12impl PooledConnection {
13 pub async fn fetch_all_uncached(
16 &mut self,
17 cmd: &qail_core::ast::Qail,
18 ) -> PgResult<Vec<crate::driver::PgRow>> {
19 self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
20 .await
21 }
22
23 pub async fn query_raw_with_params(
31 &mut self,
32 sql: &str,
33 params: &[Option<Vec<u8>>],
34 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
35 let conn = self.conn_mut()?;
36 conn.query(sql, params).await
37 }
38
39 pub async fn copy_export(&mut self, cmd: &qail_core::ast::Qail) -> PgResult<Vec<Vec<String>>> {
41 self.conn_mut()?.copy_export(cmd).await
42 }
43
44 pub async fn copy_export_stream_raw<F, Fut>(
46 &mut self,
47 cmd: &qail_core::ast::Qail,
48 on_chunk: F,
49 ) -> PgResult<()>
50 where
51 F: FnMut(Vec<u8>) -> Fut,
52 Fut: std::future::Future<Output = PgResult<()>>,
53 {
54 self.conn_mut()?.copy_export_stream_raw(cmd, on_chunk).await
55 }
56
57 pub async fn copy_export_stream_rows<F>(
59 &mut self,
60 cmd: &qail_core::ast::Qail,
61 on_row: F,
62 ) -> PgResult<()>
63 where
64 F: FnMut(Vec<String>) -> PgResult<()>,
65 {
66 self.conn_mut()?.copy_export_stream_rows(cmd, on_row).await
67 }
68
69 pub async fn copy_export_table(
71 &mut self,
72 table: &str,
73 columns: &[String],
74 ) -> PgResult<Vec<u8>> {
75 let quote_ident = |ident: &str| -> String {
76 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
77 };
78 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
79 let sql = format!(
80 "COPY {} ({}) TO STDOUT",
81 quote_ident(table),
82 cols.join(", ")
83 );
84 self.conn_mut()?.copy_out_raw(&sql).await
85 }
86
87 pub async fn copy_export_table_stream<F, Fut>(
89 &mut self,
90 table: &str,
91 columns: &[String],
92 on_chunk: F,
93 ) -> PgResult<()>
94 where
95 F: FnMut(Vec<u8>) -> Fut,
96 Fut: std::future::Future<Output = PgResult<()>>,
97 {
98 let quote_ident = |ident: &str| -> String {
99 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
100 };
101 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
102 let sql = format!(
103 "COPY {} ({}) TO STDOUT",
104 quote_ident(table),
105 cols.join(", ")
106 );
107 self.conn_mut()?.copy_out_raw_stream(&sql, on_chunk).await
108 }
109
110 pub async fn fetch_all_uncached_with_format(
112 &mut self,
113 cmd: &qail_core::ast::Qail,
114 result_format: ResultFormat,
115 ) -> PgResult<Vec<crate::driver::PgRow>> {
116 use crate::driver::ColumnInfo;
117 use crate::protocol::AstEncoder;
118
119 let conn = self.conn_mut()?;
120
121 AstEncoder::encode_cmd_reuse_into_with_result_format(
122 cmd,
123 &mut conn.sql_buf,
124 &mut conn.params_buf,
125 &mut conn.write_buf,
126 result_format.as_wire_code(),
127 )
128 .map_err(|e| PgError::Encode(e.to_string()))?;
129
130 conn.flush_write_buf().await?;
131
132 let mut rows: Vec<crate::driver::PgRow> = Vec::new();
133 let mut column_info: Option<Arc<ColumnInfo>> = None;
134 let mut error: Option<PgError> = None;
135 let mut flow =
136 ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
137
138 loop {
139 let msg = conn.recv().await?;
140 flow.validate(&msg, "pool fetch_all execute", error.is_some())?;
141 match msg {
142 crate::protocol::BackendMessage::ParseComplete
143 | crate::protocol::BackendMessage::BindComplete => {}
144 crate::protocol::BackendMessage::RowDescription(fields) => {
145 column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
146 }
147 crate::protocol::BackendMessage::DataRow(data) => {
148 if error.is_none() {
149 rows.push(crate::driver::PgRow {
150 columns: data,
151 column_info: column_info.clone(),
152 });
153 }
154 }
155 crate::protocol::BackendMessage::NoData => {}
156 crate::protocol::BackendMessage::CommandComplete(_) => {}
157 crate::protocol::BackendMessage::ReadyForQuery(_) => {
158 if let Some(err) = error {
159 return Err(err);
160 }
161 return Ok(rows);
162 }
163 crate::protocol::BackendMessage::ErrorResponse(err) => {
164 if error.is_none() {
165 error = Some(PgError::QueryServer(err.into()));
166 }
167 }
168 msg if is_ignorable_session_message(&msg) => {}
169 other => {
170 return Err(unexpected_backend_message("pool fetch_all execute", &other));
171 }
172 }
173 }
174 }
175
176 pub async fn fetch_all_fast(
180 &mut self,
181 cmd: &qail_core::ast::Qail,
182 ) -> PgResult<Vec<crate::driver::PgRow>> {
183 self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
184 .await
185 }
186
187 pub async fn fetch_all_fast_with_format(
189 &mut self,
190 cmd: &qail_core::ast::Qail,
191 result_format: ResultFormat,
192 ) -> PgResult<Vec<crate::driver::PgRow>> {
193 use crate::protocol::AstEncoder;
194
195 let conn = self.conn_mut()?;
196
197 AstEncoder::encode_cmd_reuse_into_with_result_format(
198 cmd,
199 &mut conn.sql_buf,
200 &mut conn.params_buf,
201 &mut conn.write_buf,
202 result_format.as_wire_code(),
203 )
204 .map_err(|e| PgError::Encode(e.to_string()))?;
205
206 conn.flush_write_buf().await?;
207
208 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
209 let mut error: Option<PgError> = None;
210 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
211
212 loop {
213 let res = conn.recv_with_data_fast().await;
214 match res {
215 Ok((msg_type, data)) => {
216 flow.validate_msg_type(
217 msg_type,
218 "pool fetch_all_fast execute",
219 error.is_some(),
220 )?;
221 match msg_type {
222 b'D' => {
223 if error.is_none()
224 && let Some(columns) = data
225 {
226 rows.push(crate::driver::PgRow {
227 columns,
228 column_info: None,
229 });
230 }
231 }
232 b'Z' => {
233 if let Some(err) = error {
234 return Err(err);
235 }
236 return Ok(rows);
237 }
238 _ => {}
239 }
240 }
241 Err(e) => {
242 if matches!(&e, PgError::QueryServer(_)) {
243 if error.is_none() {
244 error = Some(e);
245 }
246 continue;
247 }
248 return Err(e);
249 }
250 }
251 }
252 }
253
254 pub async fn fetch_all_cached(
259 &mut self,
260 cmd: &qail_core::ast::Qail,
261 ) -> PgResult<Vec<crate::driver::PgRow>> {
262 self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
263 .await
264 }
265
266 pub async fn fetch_all_cached_with_format(
268 &mut self,
269 cmd: &qail_core::ast::Qail,
270 result_format: ResultFormat,
271 ) -> PgResult<Vec<crate::driver::PgRow>> {
272 let mut retried = false;
273 loop {
274 match self
275 .fetch_all_cached_with_format_once(cmd, result_format)
276 .await
277 {
278 Ok(rows) => return Ok(rows),
279 Err(err)
280 if !retried
281 && (err.is_prepared_statement_retryable()
282 || err.is_prepared_statement_already_exists()) =>
283 {
284 retried = true;
285 if err.is_prepared_statement_retryable()
286 && let Some(conn) = self.conn.as_mut()
287 {
288 conn.clear_prepared_statement_state();
289 }
290 }
291 Err(err) => return Err(err),
292 }
293 }
294 }
295
296 pub async fn fetch_typed<T: crate::driver::row::QailRow>(
298 &mut self,
299 cmd: &qail_core::ast::Qail,
300 ) -> PgResult<Vec<T>> {
301 self.fetch_typed_with_format(cmd, ResultFormat::Text).await
302 }
303
304 pub async fn fetch_typed_with_format<T: crate::driver::row::QailRow>(
309 &mut self,
310 cmd: &qail_core::ast::Qail,
311 result_format: ResultFormat,
312 ) -> PgResult<Vec<T>> {
313 let rows = self
314 .fetch_all_cached_with_format(cmd, result_format)
315 .await?;
316 Ok(rows.iter().map(T::from_row).collect())
317 }
318
319 pub async fn fetch_one_typed<T: crate::driver::row::QailRow>(
321 &mut self,
322 cmd: &qail_core::ast::Qail,
323 ) -> PgResult<Option<T>> {
324 self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
325 .await
326 }
327
328 pub async fn fetch_one_typed_with_format<T: crate::driver::row::QailRow>(
330 &mut self,
331 cmd: &qail_core::ast::Qail,
332 result_format: ResultFormat,
333 ) -> PgResult<Option<T>> {
334 let rows = self
335 .fetch_all_cached_with_format(cmd, result_format)
336 .await?;
337 Ok(rows.first().map(T::from_row))
338 }
339
340 async fn fetch_all_cached_with_format_once(
341 &mut self,
342 cmd: &qail_core::ast::Qail,
343 result_format: ResultFormat,
344 ) -> PgResult<Vec<crate::driver::PgRow>> {
345 use crate::driver::ColumnInfo;
346 use std::collections::hash_map::DefaultHasher;
347 use std::hash::{Hash, Hasher};
348
349 let conn = self.conn.as_mut().ok_or_else(|| {
350 PgError::Connection("Connection already released back to pool".into())
351 })?;
352
353 conn.sql_buf.clear();
354 conn.params_buf.clear();
355
356 match cmd.action {
358 qail_core::ast::Action::Get | qail_core::ast::Action::With => {
359 crate::protocol::ast_encoder::dml::encode_select(
360 cmd,
361 &mut conn.sql_buf,
362 &mut conn.params_buf,
363 )?;
364 }
365 qail_core::ast::Action::Add => {
366 crate::protocol::ast_encoder::dml::encode_insert(
367 cmd,
368 &mut conn.sql_buf,
369 &mut conn.params_buf,
370 )?;
371 }
372 qail_core::ast::Action::Set => {
373 crate::protocol::ast_encoder::dml::encode_update(
374 cmd,
375 &mut conn.sql_buf,
376 &mut conn.params_buf,
377 )?;
378 }
379 qail_core::ast::Action::Del => {
380 crate::protocol::ast_encoder::dml::encode_delete(
381 cmd,
382 &mut conn.sql_buf,
383 &mut conn.params_buf,
384 )?;
385 }
386 _ => {
387 return self
389 .fetch_all_uncached_with_format(cmd, result_format)
390 .await;
391 }
392 }
393
394 let mut hasher = DefaultHasher::new();
395 conn.sql_buf.hash(&mut hasher);
396 let sql_hash = hasher.finish();
397
398 let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
399
400 conn.write_buf.clear();
401
402 let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
403 name
404 } else {
405 let name = format!("qail_{:x}", sql_hash);
406
407 conn.evict_prepared_if_full();
408
409 let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
410
411 use crate::protocol::PgEncoder;
412 let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
413 let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
414 conn.write_buf.extend_from_slice(&parse_msg);
415 conn.write_buf.extend_from_slice(&describe_msg);
416
417 conn.stmt_cache.put(sql_hash, name.clone());
418 conn.prepared_statements
419 .insert(name.clone(), sql_str.to_string());
420
421 if let Ok(mut hot) = self.pool.hot_statements.write()
423 && hot.len() < MAX_HOT_STATEMENTS
424 {
425 hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
426 }
427
428 name
429 };
430
431 use crate::protocol::PgEncoder;
432 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
433 &mut conn.write_buf,
434 &stmt_name,
435 &conn.params_buf,
436 result_format.as_wire_code(),
437 ) {
438 if is_cache_miss {
439 conn.stmt_cache.remove(&sql_hash);
440 conn.prepared_statements.remove(&stmt_name);
441 conn.column_info_cache.remove(&sql_hash);
442 }
443 return Err(PgError::Encode(e.to_string()));
444 }
445 PgEncoder::encode_execute_to(&mut conn.write_buf);
446 PgEncoder::encode_sync_to(&mut conn.write_buf);
447
448 if let Err(err) = conn.flush_write_buf().await {
449 if is_cache_miss {
450 conn.stmt_cache.remove(&sql_hash);
451 conn.prepared_statements.remove(&stmt_name);
452 conn.column_info_cache.remove(&sql_hash);
453 }
454 return Err(err);
455 }
456
457 let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
458
459 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
460 let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
461 let mut error: Option<PgError> = None;
462 let mut flow = ExtendedFlowTracker::new(
463 ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
464 );
465
466 loop {
467 let msg = match conn.recv().await {
468 Ok(msg) => msg,
469 Err(err) => {
470 if is_cache_miss && !flow.saw_parse_complete() {
471 conn.stmt_cache.remove(&sql_hash);
472 conn.prepared_statements.remove(&stmt_name);
473 conn.column_info_cache.remove(&sql_hash);
474 }
475 return Err(err);
476 }
477 };
478 if let Err(err) = flow.validate(&msg, "pool fetch_all_cached execute", error.is_some())
479 {
480 if is_cache_miss && !flow.saw_parse_complete() {
481 conn.stmt_cache.remove(&sql_hash);
482 conn.prepared_statements.remove(&stmt_name);
483 conn.column_info_cache.remove(&sql_hash);
484 }
485 return Err(err);
486 }
487 match msg {
488 crate::protocol::BackendMessage::ParseComplete => {}
489 crate::protocol::BackendMessage::BindComplete => {}
490 crate::protocol::BackendMessage::ParameterDescription(_) => {}
491 crate::protocol::BackendMessage::RowDescription(fields) => {
492 let info = Arc::new(ColumnInfo::from_fields(&fields));
493 if is_cache_miss {
494 conn.column_info_cache.insert(sql_hash, info.clone());
495 }
496 column_info = Some(info);
497 }
498 crate::protocol::BackendMessage::DataRow(data) => {
499 if error.is_none() {
500 rows.push(crate::driver::PgRow {
501 columns: data,
502 column_info: column_info.clone(),
503 });
504 }
505 }
506 crate::protocol::BackendMessage::CommandComplete(_) => {}
507 crate::protocol::BackendMessage::ReadyForQuery(_) => {
508 if let Some(err) = error {
509 if is_cache_miss
510 && !flow.saw_parse_complete()
511 && !err.is_prepared_statement_already_exists()
512 {
513 conn.stmt_cache.remove(&sql_hash);
514 conn.prepared_statements.remove(&stmt_name);
515 conn.column_info_cache.remove(&sql_hash);
516 }
517 return Err(err);
518 }
519 if is_cache_miss && !flow.saw_parse_complete() {
520 conn.stmt_cache.remove(&sql_hash);
521 conn.prepared_statements.remove(&stmt_name);
522 conn.column_info_cache.remove(&sql_hash);
523 return Err(PgError::Protocol(
524 "Cache miss query reached ReadyForQuery without ParseComplete"
525 .to_string(),
526 ));
527 }
528 return Ok(rows);
529 }
530 crate::protocol::BackendMessage::ErrorResponse(err) => {
531 if error.is_none() {
532 error = Some(PgError::QueryServer(err.into()));
533 }
534 }
535 msg if is_ignorable_session_message(&msg) => {}
536 other => {
537 if is_cache_miss && !flow.saw_parse_complete() {
538 conn.stmt_cache.remove(&sql_hash);
539 conn.prepared_statements.remove(&stmt_name);
540 conn.column_info_cache.remove(&sql_hash);
541 }
542 return Err(unexpected_backend_message(
543 "pool fetch_all_cached execute",
544 &other,
545 ));
546 }
547 }
548 }
549 }
550
551 pub async fn fetch_all_with_rls(
570 &mut self,
571 cmd: &qail_core::ast::Qail,
572 rls_sql: &str,
573 ) -> PgResult<Vec<crate::driver::PgRow>> {
574 self.fetch_all_with_rls_with_format(cmd, rls_sql, ResultFormat::Text)
575 .await
576 }
577
578 pub async fn fetch_all_with_rls_with_format(
580 &mut self,
581 cmd: &qail_core::ast::Qail,
582 rls_sql: &str,
583 result_format: ResultFormat,
584 ) -> PgResult<Vec<crate::driver::PgRow>> {
585 let mut retried = false;
586 loop {
587 match self
588 .fetch_all_with_rls_with_format_once(cmd, rls_sql, result_format)
589 .await
590 {
591 Ok(rows) => return Ok(rows),
592 Err(err)
593 if !retried
594 && (err.is_prepared_statement_retryable()
595 || err.is_prepared_statement_already_exists()) =>
596 {
597 retried = true;
598 if err.is_prepared_statement_retryable()
599 && let Some(conn) = self.conn.as_mut()
600 {
601 conn.clear_prepared_statement_state();
602 let _ = conn.execute_simple("ROLLBACK").await;
603 }
604 self.rls_dirty = false;
605 }
606 Err(err) => return Err(err),
607 }
608 }
609 }
610
611 async fn fetch_all_with_rls_with_format_once(
612 &mut self,
613 cmd: &qail_core::ast::Qail,
614 rls_sql: &str,
615 result_format: ResultFormat,
616 ) -> PgResult<Vec<crate::driver::PgRow>> {
617 use crate::driver::ColumnInfo;
618 use std::collections::hash_map::DefaultHasher;
619 use std::hash::{Hash, Hasher};
620
621 let conn = self.conn.as_mut().ok_or_else(|| {
622 PgError::Connection("Connection already released back to pool".into())
623 })?;
624
625 conn.sql_buf.clear();
626 conn.params_buf.clear();
627
628 if cmd.is_raw_sql() {
630 conn.sql_buf.clear();
632 conn.params_buf.clear();
633 conn.sql_buf.extend_from_slice(cmd.table.as_bytes());
634 } else {
635 match cmd.action {
636 qail_core::ast::Action::Get | qail_core::ast::Action::With => {
637 crate::protocol::ast_encoder::dml::encode_select(
638 cmd,
639 &mut conn.sql_buf,
640 &mut conn.params_buf,
641 )?;
642 }
643 qail_core::ast::Action::Add => {
644 crate::protocol::ast_encoder::dml::encode_insert(
645 cmd,
646 &mut conn.sql_buf,
647 &mut conn.params_buf,
648 )?;
649 }
650 qail_core::ast::Action::Set => {
651 crate::protocol::ast_encoder::dml::encode_update(
652 cmd,
653 &mut conn.sql_buf,
654 &mut conn.params_buf,
655 )?;
656 }
657 qail_core::ast::Action::Del => {
658 crate::protocol::ast_encoder::dml::encode_delete(
659 cmd,
660 &mut conn.sql_buf,
661 &mut conn.params_buf,
662 )?;
663 }
664 _ => {
665 conn.execute_simple(rls_sql).await?;
667 self.rls_dirty = true;
668 return self
669 .fetch_all_uncached_with_format(cmd, result_format)
670 .await;
671 }
672 }
673 }
674
675 let mut hasher = DefaultHasher::new();
676 conn.sql_buf.hash(&mut hasher);
677 let sql_hash = hasher.finish();
678
679 let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
680
681 conn.write_buf.clear();
682
683 let rls_msg = crate::protocol::PgEncoder::try_encode_query_string(rls_sql)?;
687 conn.write_buf.extend_from_slice(&rls_msg);
688
689 let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
691 name
692 } else {
693 let name = format!("qail_{:x}", sql_hash);
694
695 conn.evict_prepared_if_full();
696
697 let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
698
699 use crate::protocol::PgEncoder;
700 let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
701 let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
702 conn.write_buf.extend_from_slice(&parse_msg);
703 conn.write_buf.extend_from_slice(&describe_msg);
704
705 conn.stmt_cache.put(sql_hash, name.clone());
706 conn.prepared_statements
707 .insert(name.clone(), sql_str.to_string());
708
709 if let Ok(mut hot) = self.pool.hot_statements.write()
710 && hot.len() < MAX_HOT_STATEMENTS
711 {
712 hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
713 }
714
715 name
716 };
717
718 use crate::protocol::PgEncoder;
719 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
720 &mut conn.write_buf,
721 &stmt_name,
722 &conn.params_buf,
723 result_format.as_wire_code(),
724 ) {
725 if is_cache_miss {
726 conn.stmt_cache.remove(&sql_hash);
727 conn.prepared_statements.remove(&stmt_name);
728 conn.column_info_cache.remove(&sql_hash);
729 }
730 return Err(PgError::Encode(e.to_string()));
731 }
732 PgEncoder::encode_execute_to(&mut conn.write_buf);
733 PgEncoder::encode_sync_to(&mut conn.write_buf);
734
735 if let Err(err) = conn.flush_write_buf().await {
737 if is_cache_miss {
738 conn.stmt_cache.remove(&sql_hash);
739 conn.prepared_statements.remove(&stmt_name);
740 conn.column_info_cache.remove(&sql_hash);
741 }
742 return Err(err);
743 }
744
745 self.rls_dirty = true;
747
748 let mut rls_error: Option<PgError> = None;
752 loop {
753 let msg = match conn.recv().await {
754 Ok(msg) => msg,
755 Err(err) => {
756 if is_cache_miss {
757 conn.stmt_cache.remove(&sql_hash);
758 conn.prepared_statements.remove(&stmt_name);
759 conn.column_info_cache.remove(&sql_hash);
760 }
761 return Err(err);
762 }
763 };
764 match msg {
765 crate::protocol::BackendMessage::ReadyForQuery(_) => {
766 if let Some(err) = rls_error {
768 return Err(err);
769 }
770 break;
771 }
772 crate::protocol::BackendMessage::ErrorResponse(err) => {
773 if rls_error.is_none() {
774 rls_error = Some(PgError::QueryServer(err.into()));
775 }
776 }
777 crate::protocol::BackendMessage::CommandComplete(_)
779 | crate::protocol::BackendMessage::DataRow(_)
780 | crate::protocol::BackendMessage::RowDescription(_)
781 | crate::protocol::BackendMessage::ParseComplete
782 | crate::protocol::BackendMessage::BindComplete => {}
783 msg if is_ignorable_session_message(&msg) => {}
784 other => return Err(unexpected_backend_message("pool rls setup", &other)),
785 }
786 }
787
788 let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
790
791 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
792 let mut column_info: Option<std::sync::Arc<ColumnInfo>> = cached_column_info;
793 let mut error: Option<PgError> = None;
794 let mut flow = ExtendedFlowTracker::new(
795 ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
796 );
797
798 loop {
799 let msg = match conn.recv().await {
800 Ok(msg) => msg,
801 Err(err) => {
802 if is_cache_miss && !flow.saw_parse_complete() {
803 conn.stmt_cache.remove(&sql_hash);
804 conn.prepared_statements.remove(&stmt_name);
805 conn.column_info_cache.remove(&sql_hash);
806 }
807 return Err(err);
808 }
809 };
810 if let Err(err) =
811 flow.validate(&msg, "pool fetch_all_with_rls execute", error.is_some())
812 {
813 if is_cache_miss && !flow.saw_parse_complete() {
814 conn.stmt_cache.remove(&sql_hash);
815 conn.prepared_statements.remove(&stmt_name);
816 conn.column_info_cache.remove(&sql_hash);
817 }
818 return Err(err);
819 }
820 match msg {
821 crate::protocol::BackendMessage::ParseComplete => {}
822 crate::protocol::BackendMessage::BindComplete => {}
823 crate::protocol::BackendMessage::ParameterDescription(_) => {}
824 crate::protocol::BackendMessage::RowDescription(fields) => {
825 let info = std::sync::Arc::new(ColumnInfo::from_fields(&fields));
826 if is_cache_miss {
827 conn.column_info_cache.insert(sql_hash, info.clone());
828 }
829 column_info = Some(info);
830 }
831 crate::protocol::BackendMessage::DataRow(data) => {
832 if error.is_none() {
833 rows.push(crate::driver::PgRow {
834 columns: data,
835 column_info: column_info.clone(),
836 });
837 }
838 }
839 crate::protocol::BackendMessage::CommandComplete(_) => {}
840 crate::protocol::BackendMessage::ReadyForQuery(_) => {
841 if let Some(err) = error {
842 if is_cache_miss
843 && !flow.saw_parse_complete()
844 && !err.is_prepared_statement_already_exists()
845 {
846 conn.stmt_cache.remove(&sql_hash);
847 conn.prepared_statements.remove(&stmt_name);
848 conn.column_info_cache.remove(&sql_hash);
849 }
850 return Err(err);
851 }
852 if is_cache_miss && !flow.saw_parse_complete() {
853 conn.stmt_cache.remove(&sql_hash);
854 conn.prepared_statements.remove(&stmt_name);
855 conn.column_info_cache.remove(&sql_hash);
856 return Err(PgError::Protocol(
857 "Cache miss query reached ReadyForQuery without ParseComplete"
858 .to_string(),
859 ));
860 }
861 return Ok(rows);
862 }
863 crate::protocol::BackendMessage::ErrorResponse(err) => {
864 if error.is_none() {
865 error = Some(PgError::QueryServer(err.into()));
866 }
867 }
868 msg if is_ignorable_session_message(&msg) => {}
869 other => {
870 if is_cache_miss && !flow.saw_parse_complete() {
871 conn.stmt_cache.remove(&sql_hash);
872 conn.prepared_statements.remove(&stmt_name);
873 conn.column_info_cache.remove(&sql_hash);
874 }
875 return Err(unexpected_backend_message(
876 "pool fetch_all_with_rls execute",
877 &other,
878 ));
879 }
880 }
881 }
882 }
883}