1use super::connection::PooledConnection;
4use super::lifecycle::MAX_HOT_STATEMENTS;
5use crate::driver::{
6 PgError, PgResult, ResultFormat,
7 extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8 is_ignorable_session_message, unexpected_backend_message,
9};
10use std::sync::Arc;
11
12impl PooledConnection {
13 pub async fn fetch_all_uncached(
16 &mut self,
17 cmd: &qail_core::ast::Qail,
18 ) -> PgResult<Vec<crate::driver::PgRow>> {
19 self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
20 .await
21 }
22
23 pub async fn query_raw_with_params(
31 &mut self,
32 sql: &str,
33 params: &[Option<Vec<u8>>],
34 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
35 let conn = self.conn_mut()?;
36 conn.query(sql, params).await
37 }
38
39 pub async fn copy_export(&mut self, cmd: &qail_core::ast::Qail) -> PgResult<Vec<Vec<String>>> {
41 self.conn_mut()?.copy_export(cmd).await
42 }
43
44 pub async fn copy_export_stream_raw<F, Fut>(
46 &mut self,
47 cmd: &qail_core::ast::Qail,
48 on_chunk: F,
49 ) -> PgResult<()>
50 where
51 F: FnMut(Vec<u8>) -> Fut,
52 Fut: std::future::Future<Output = PgResult<()>>,
53 {
54 self.conn_mut()?.copy_export_stream_raw(cmd, on_chunk).await
55 }
56
57 pub async fn copy_export_stream_rows<F>(
59 &mut self,
60 cmd: &qail_core::ast::Qail,
61 on_row: F,
62 ) -> PgResult<()>
63 where
64 F: FnMut(Vec<String>) -> PgResult<()>,
65 {
66 self.conn_mut()?.copy_export_stream_rows(cmd, on_row).await
67 }
68
69 pub async fn copy_export_table(
71 &mut self,
72 table: &str,
73 columns: &[String],
74 ) -> PgResult<Vec<u8>> {
75 let quote_ident = |ident: &str| -> String {
76 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
77 };
78 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
79 let sql = format!(
80 "COPY {} ({}) TO STDOUT",
81 quote_ident(table),
82 cols.join(", ")
83 );
84 self.conn_mut()?.copy_out_raw(&sql).await
85 }
86
87 pub async fn copy_export_table_stream<F, Fut>(
89 &mut self,
90 table: &str,
91 columns: &[String],
92 on_chunk: F,
93 ) -> PgResult<()>
94 where
95 F: FnMut(Vec<u8>) -> Fut,
96 Fut: std::future::Future<Output = PgResult<()>>,
97 {
98 let quote_ident = |ident: &str| -> String {
99 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
100 };
101 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
102 let sql = format!(
103 "COPY {} ({}) TO STDOUT",
104 quote_ident(table),
105 cols.join(", ")
106 );
107 self.conn_mut()?.copy_out_raw_stream(&sql, on_chunk).await
108 }
109
110 pub async fn fetch_all_uncached_with_format(
112 &mut self,
113 cmd: &qail_core::ast::Qail,
114 result_format: ResultFormat,
115 ) -> PgResult<Vec<crate::driver::PgRow>> {
116 use crate::driver::ColumnInfo;
117 use crate::protocol::AstEncoder;
118
119 let conn = self.conn_mut()?;
120
121 AstEncoder::encode_cmd_reuse_into_with_result_format(
122 cmd,
123 &mut conn.sql_buf,
124 &mut conn.params_buf,
125 &mut conn.write_buf,
126 result_format.as_wire_code(),
127 )
128 .map_err(|e| PgError::Encode(e.to_string()))?;
129
130 conn.flush_write_buf().await?;
131
132 let mut rows: Vec<crate::driver::PgRow> = Vec::new();
133 let mut column_info: Option<Arc<ColumnInfo>> = None;
134 let mut error: Option<PgError> = None;
135 let mut flow =
136 ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
137
138 loop {
139 let msg = conn.recv().await?;
140 flow.validate(&msg, "pool fetch_all execute", error.is_some())?;
141 match msg {
142 crate::protocol::BackendMessage::ParseComplete
143 | crate::protocol::BackendMessage::BindComplete => {}
144 crate::protocol::BackendMessage::RowDescription(fields) => {
145 column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
146 }
147 crate::protocol::BackendMessage::DataRow(data) => {
148 if error.is_none() {
149 rows.push(crate::driver::PgRow {
150 columns: data,
151 column_info: column_info.clone(),
152 });
153 }
154 }
155 crate::protocol::BackendMessage::NoData => {}
156 crate::protocol::BackendMessage::CommandComplete(_) => {}
157 crate::protocol::BackendMessage::ReadyForQuery(_) => {
158 if let Some(err) = error {
159 return Err(err);
160 }
161 return Ok(rows);
162 }
163 crate::protocol::BackendMessage::ErrorResponse(err) => {
164 if error.is_none() {
165 error = Some(PgError::QueryServer(err.into()));
166 }
167 }
168 msg if is_ignorable_session_message(&msg) => {}
169 other => {
170 return Err(unexpected_backend_message("pool fetch_all execute", &other));
171 }
172 }
173 }
174 }
175
176 pub async fn fetch_all_fast(
180 &mut self,
181 cmd: &qail_core::ast::Qail,
182 ) -> PgResult<Vec<crate::driver::PgRow>> {
183 self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
184 .await
185 }
186
187 pub async fn fetch_all_fast_with_format(
189 &mut self,
190 cmd: &qail_core::ast::Qail,
191 result_format: ResultFormat,
192 ) -> PgResult<Vec<crate::driver::PgRow>> {
193 use crate::protocol::AstEncoder;
194
195 let conn = self.conn_mut()?;
196
197 AstEncoder::encode_cmd_reuse_into_with_result_format(
198 cmd,
199 &mut conn.sql_buf,
200 &mut conn.params_buf,
201 &mut conn.write_buf,
202 result_format.as_wire_code(),
203 )
204 .map_err(|e| PgError::Encode(e.to_string()))?;
205
206 conn.flush_write_buf().await?;
207
208 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
209 let mut error: Option<PgError> = None;
210 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
211
212 loop {
213 let res = conn.recv_with_data_fast().await;
214 match res {
215 Ok((msg_type, data)) => {
216 flow.validate_msg_type(
217 msg_type,
218 "pool fetch_all_fast execute",
219 error.is_some(),
220 )?;
221 match msg_type {
222 b'D' => {
223 if error.is_none()
224 && let Some(columns) = data
225 {
226 rows.push(crate::driver::PgRow {
227 columns,
228 column_info: None,
229 });
230 }
231 }
232 b'Z' => {
233 if let Some(err) = error {
234 return Err(err);
235 }
236 return Ok(rows);
237 }
238 _ => {}
239 }
240 }
241 Err(e) => {
242 if matches!(&e, PgError::QueryServer(_)) {
243 if error.is_none() {
244 error = Some(e);
245 }
246 continue;
247 }
248 return Err(e);
249 }
250 }
251 }
252 }
253
254 pub async fn fetch_all_cached(
259 &mut self,
260 cmd: &qail_core::ast::Qail,
261 ) -> PgResult<Vec<crate::driver::PgRow>> {
262 self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
263 .await
264 }
265
266 pub async fn fetch_all_cached_with_format(
268 &mut self,
269 cmd: &qail_core::ast::Qail,
270 result_format: ResultFormat,
271 ) -> PgResult<Vec<crate::driver::PgRow>> {
272 let mut retried = false;
273 loop {
274 match self
275 .fetch_all_cached_with_format_once(cmd, result_format)
276 .await
277 {
278 Ok(rows) => return Ok(rows),
279 Err(err)
280 if !retried
281 && (err.is_prepared_statement_retryable()
282 || err.is_prepared_statement_already_exists()) =>
283 {
284 retried = true;
285 if err.is_prepared_statement_retryable()
286 && let Some(conn) = self.conn.as_mut()
287 {
288 conn.clear_prepared_statement_state();
289 }
290 }
291 Err(err) => return Err(err),
292 }
293 }
294 }
295
296 pub async fn fetch_typed<T: crate::driver::row::QailRow>(
298 &mut self,
299 cmd: &qail_core::ast::Qail,
300 ) -> PgResult<Vec<T>> {
301 self.fetch_typed_with_format(cmd, ResultFormat::Text).await
302 }
303
304 pub async fn fetch_typed_with_format<T: crate::driver::row::QailRow>(
309 &mut self,
310 cmd: &qail_core::ast::Qail,
311 result_format: ResultFormat,
312 ) -> PgResult<Vec<T>> {
313 let rows = self
314 .fetch_all_cached_with_format(cmd, result_format)
315 .await?;
316 Ok(rows.iter().map(T::from_row).collect())
317 }
318
319 pub async fn fetch_one_typed<T: crate::driver::row::QailRow>(
321 &mut self,
322 cmd: &qail_core::ast::Qail,
323 ) -> PgResult<Option<T>> {
324 self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
325 .await
326 }
327
328 pub async fn fetch_one_typed_with_format<T: crate::driver::row::QailRow>(
330 &mut self,
331 cmd: &qail_core::ast::Qail,
332 result_format: ResultFormat,
333 ) -> PgResult<Option<T>> {
334 let rows = self
335 .fetch_all_cached_with_format(cmd, result_format)
336 .await?;
337 Ok(rows.first().map(T::from_row))
338 }
339
340 async fn fetch_all_cached_with_format_once(
341 &mut self,
342 cmd: &qail_core::ast::Qail,
343 result_format: ResultFormat,
344 ) -> PgResult<Vec<crate::driver::PgRow>> {
345 use crate::driver::ColumnInfo;
346 use std::collections::hash_map::DefaultHasher;
347 use std::hash::{Hash, Hasher};
348
349 let conn = self.conn.as_mut().ok_or_else(|| {
350 PgError::Connection("Connection already released back to pool".into())
351 })?;
352
353 conn.sql_buf.clear();
354 conn.params_buf.clear();
355
356 match cmd.action {
358 qail_core::ast::Action::Get | qail_core::ast::Action::With => {
359 crate::protocol::ast_encoder::dml::encode_select(
360 cmd,
361 &mut conn.sql_buf,
362 &mut conn.params_buf,
363 )?;
364 }
365 qail_core::ast::Action::Add => {
366 crate::protocol::ast_encoder::dml::encode_insert(
367 cmd,
368 &mut conn.sql_buf,
369 &mut conn.params_buf,
370 )?;
371 }
372 qail_core::ast::Action::Set => {
373 crate::protocol::ast_encoder::dml::encode_update(
374 cmd,
375 &mut conn.sql_buf,
376 &mut conn.params_buf,
377 )?;
378 }
379 qail_core::ast::Action::Del => {
380 crate::protocol::ast_encoder::dml::encode_delete(
381 cmd,
382 &mut conn.sql_buf,
383 &mut conn.params_buf,
384 )?;
385 }
386 _ => {
387 return self
389 .fetch_all_uncached_with_format(cmd, result_format)
390 .await;
391 }
392 }
393
394 let mut hasher = DefaultHasher::new();
395 conn.sql_buf.hash(&mut hasher);
396 let sql_hash = hasher.finish();
397
398 let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
399
400 conn.write_buf.clear();
401
402 let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
403 name
404 } else {
405 let name = format!("qail_{:x}", sql_hash);
406
407 conn.evict_prepared_if_full();
408
409 let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
410
411 use crate::protocol::PgEncoder;
412 let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
413 let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
414 conn.write_buf.extend_from_slice(&parse_msg);
415 conn.write_buf.extend_from_slice(&describe_msg);
416
417 conn.stmt_cache.put(sql_hash, name.clone());
418 conn.prepared_statements
419 .insert(name.clone(), sql_str.to_string());
420
421 if let Ok(mut hot) = self.pool.hot_statements.write()
423 && hot.len() < MAX_HOT_STATEMENTS
424 {
425 hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
426 }
427
428 name
429 };
430
431 use crate::protocol::PgEncoder;
432 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
433 &mut conn.write_buf,
434 &stmt_name,
435 &conn.params_buf,
436 result_format.as_wire_code(),
437 ) {
438 if is_cache_miss {
439 conn.stmt_cache.remove(&sql_hash);
440 conn.prepared_statements.remove(&stmt_name);
441 conn.column_info_cache.remove(&sql_hash);
442 }
443 return Err(PgError::Encode(e.to_string()));
444 }
445 PgEncoder::encode_execute_to(&mut conn.write_buf);
446 PgEncoder::encode_sync_to(&mut conn.write_buf);
447
448 if let Err(err) = conn.flush_write_buf().await {
449 if is_cache_miss {
450 conn.stmt_cache.remove(&sql_hash);
451 conn.prepared_statements.remove(&stmt_name);
452 conn.column_info_cache.remove(&sql_hash);
453 }
454 return Err(err);
455 }
456
457 let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
458
459 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
460 let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
461 let mut error: Option<PgError> = None;
462 let mut flow = ExtendedFlowTracker::new(
463 ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
464 );
465
466 loop {
467 let msg = match conn.recv().await {
468 Ok(msg) => msg,
469 Err(err) => {
470 if is_cache_miss && !flow.saw_parse_complete() {
471 conn.stmt_cache.remove(&sql_hash);
472 conn.prepared_statements.remove(&stmt_name);
473 conn.column_info_cache.remove(&sql_hash);
474 }
475 return Err(err);
476 }
477 };
478 if let Err(err) = flow.validate(&msg, "pool fetch_all_cached execute", error.is_some())
479 {
480 if is_cache_miss && !flow.saw_parse_complete() {
481 conn.stmt_cache.remove(&sql_hash);
482 conn.prepared_statements.remove(&stmt_name);
483 conn.column_info_cache.remove(&sql_hash);
484 }
485 return Err(err);
486 }
487 match msg {
488 crate::protocol::BackendMessage::ParseComplete => {}
489 crate::protocol::BackendMessage::BindComplete => {}
490 crate::protocol::BackendMessage::ParameterDescription(_) => {}
491 crate::protocol::BackendMessage::RowDescription(fields) => {
492 let info = Arc::new(ColumnInfo::from_fields(&fields));
493 if is_cache_miss {
494 conn.column_info_cache.insert(sql_hash, info.clone());
495 }
496 column_info = Some(info);
497 }
498 crate::protocol::BackendMessage::DataRow(data) => {
499 if error.is_none() {
500 rows.push(crate::driver::PgRow {
501 columns: data,
502 column_info: column_info.clone(),
503 });
504 }
505 }
506 crate::protocol::BackendMessage::CommandComplete(_) => {}
507 crate::protocol::BackendMessage::ReadyForQuery(_) => {
508 if let Some(err) = error {
509 if is_cache_miss
510 && !flow.saw_parse_complete()
511 && !err.is_prepared_statement_already_exists()
512 {
513 conn.stmt_cache.remove(&sql_hash);
514 conn.prepared_statements.remove(&stmt_name);
515 conn.column_info_cache.remove(&sql_hash);
516 }
517 return Err(err);
518 }
519 if is_cache_miss && !flow.saw_parse_complete() {
520 conn.stmt_cache.remove(&sql_hash);
521 conn.prepared_statements.remove(&stmt_name);
522 conn.column_info_cache.remove(&sql_hash);
523 return Err(PgError::Protocol(
524 "Cache miss query reached ReadyForQuery without ParseComplete"
525 .to_string(),
526 ));
527 }
528 return Ok(rows);
529 }
530 crate::protocol::BackendMessage::ErrorResponse(err) => {
531 if error.is_none() {
532 error = Some(PgError::QueryServer(err.into()));
533 }
534 }
535 msg if is_ignorable_session_message(&msg) => {}
536 other => {
537 if is_cache_miss && !flow.saw_parse_complete() {
538 conn.stmt_cache.remove(&sql_hash);
539 conn.prepared_statements.remove(&stmt_name);
540 conn.column_info_cache.remove(&sql_hash);
541 }
542 return Err(unexpected_backend_message(
543 "pool fetch_all_cached execute",
544 &other,
545 ));
546 }
547 }
548 }
549 }
550
551 pub async fn fetch_all_with_rls(
570 &mut self,
571 cmd: &qail_core::ast::Qail,
572 rls_sql: &str,
573 ) -> PgResult<Vec<crate::driver::PgRow>> {
574 self.fetch_all_with_rls_with_format(cmd, rls_sql, ResultFormat::Text)
575 .await
576 }
577
578 pub async fn fetch_all_with_rls_with_format(
580 &mut self,
581 cmd: &qail_core::ast::Qail,
582 rls_sql: &str,
583 result_format: ResultFormat,
584 ) -> PgResult<Vec<crate::driver::PgRow>> {
585 let mut retried = false;
586 loop {
587 match self
588 .fetch_all_with_rls_with_format_once(cmd, rls_sql, result_format)
589 .await
590 {
591 Ok(rows) => return Ok(rows),
592 Err(err)
593 if !retried
594 && (err.is_prepared_statement_retryable()
595 || err.is_prepared_statement_already_exists()) =>
596 {
597 retried = true;
598 if err.is_prepared_statement_retryable()
599 && let Some(conn) = self.conn.as_mut()
600 {
601 conn.clear_prepared_statement_state();
602 let _ = conn.execute_simple("ROLLBACK").await;
603 }
604 self.rls_dirty = false;
605 }
606 Err(err) => return Err(err),
607 }
608 }
609 }
610
611 async fn fetch_all_with_rls_with_format_once(
612 &mut self,
613 cmd: &qail_core::ast::Qail,
614 rls_sql: &str,
615 result_format: ResultFormat,
616 ) -> PgResult<Vec<crate::driver::PgRow>> {
617 use crate::driver::ColumnInfo;
618 use std::collections::hash_map::DefaultHasher;
619 use std::hash::{Hash, Hasher};
620
621 let conn = self.conn.as_mut().ok_or_else(|| {
622 PgError::Connection("Connection already released back to pool".into())
623 })?;
624
625 conn.sql_buf.clear();
626 conn.params_buf.clear();
627
628 match cmd.action {
630 qail_core::ast::Action::Get | qail_core::ast::Action::With => {
631 crate::protocol::ast_encoder::dml::encode_select(
632 cmd,
633 &mut conn.sql_buf,
634 &mut conn.params_buf,
635 )?;
636 }
637 qail_core::ast::Action::Add => {
638 crate::protocol::ast_encoder::dml::encode_insert(
639 cmd,
640 &mut conn.sql_buf,
641 &mut conn.params_buf,
642 )?;
643 }
644 qail_core::ast::Action::Set => {
645 crate::protocol::ast_encoder::dml::encode_update(
646 cmd,
647 &mut conn.sql_buf,
648 &mut conn.params_buf,
649 )?;
650 }
651 qail_core::ast::Action::Del => {
652 crate::protocol::ast_encoder::dml::encode_delete(
653 cmd,
654 &mut conn.sql_buf,
655 &mut conn.params_buf,
656 )?;
657 }
658 _ => {
659 conn.execute_simple(rls_sql).await?;
661 self.rls_dirty = true;
662 return self
663 .fetch_all_uncached_with_format(cmd, result_format)
664 .await;
665 }
666 }
667
668 let mut hasher = DefaultHasher::new();
669 conn.sql_buf.hash(&mut hasher);
670 let sql_hash = hasher.finish();
671
672 let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
673
674 conn.write_buf.clear();
675
676 let rls_msg = crate::protocol::PgEncoder::try_encode_query_string(rls_sql)?;
680 conn.write_buf.extend_from_slice(&rls_msg);
681
682 let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
684 name
685 } else {
686 let name = format!("qail_{:x}", sql_hash);
687
688 conn.evict_prepared_if_full();
689
690 let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
691
692 use crate::protocol::PgEncoder;
693 let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
694 let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
695 conn.write_buf.extend_from_slice(&parse_msg);
696 conn.write_buf.extend_from_slice(&describe_msg);
697
698 conn.stmt_cache.put(sql_hash, name.clone());
699 conn.prepared_statements
700 .insert(name.clone(), sql_str.to_string());
701
702 if let Ok(mut hot) = self.pool.hot_statements.write()
703 && hot.len() < MAX_HOT_STATEMENTS
704 {
705 hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
706 }
707
708 name
709 };
710
711 use crate::protocol::PgEncoder;
712 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
713 &mut conn.write_buf,
714 &stmt_name,
715 &conn.params_buf,
716 result_format.as_wire_code(),
717 ) {
718 if is_cache_miss {
719 conn.stmt_cache.remove(&sql_hash);
720 conn.prepared_statements.remove(&stmt_name);
721 conn.column_info_cache.remove(&sql_hash);
722 }
723 return Err(PgError::Encode(e.to_string()));
724 }
725 PgEncoder::encode_execute_to(&mut conn.write_buf);
726 PgEncoder::encode_sync_to(&mut conn.write_buf);
727
728 if let Err(err) = conn.flush_write_buf().await {
730 if is_cache_miss {
731 conn.stmt_cache.remove(&sql_hash);
732 conn.prepared_statements.remove(&stmt_name);
733 conn.column_info_cache.remove(&sql_hash);
734 }
735 return Err(err);
736 }
737
738 self.rls_dirty = true;
740
741 let mut rls_error: Option<PgError> = None;
745 loop {
746 let msg = match conn.recv().await {
747 Ok(msg) => msg,
748 Err(err) => {
749 if is_cache_miss {
750 conn.stmt_cache.remove(&sql_hash);
751 conn.prepared_statements.remove(&stmt_name);
752 conn.column_info_cache.remove(&sql_hash);
753 }
754 return Err(err);
755 }
756 };
757 match msg {
758 crate::protocol::BackendMessage::ReadyForQuery(_) => {
759 if let Some(err) = rls_error {
761 return Err(err);
762 }
763 break;
764 }
765 crate::protocol::BackendMessage::ErrorResponse(err) => {
766 if rls_error.is_none() {
767 rls_error = Some(PgError::QueryServer(err.into()));
768 }
769 }
770 crate::protocol::BackendMessage::CommandComplete(_)
772 | crate::protocol::BackendMessage::DataRow(_)
773 | crate::protocol::BackendMessage::RowDescription(_)
774 | crate::protocol::BackendMessage::ParseComplete
775 | crate::protocol::BackendMessage::BindComplete => {}
776 msg if is_ignorable_session_message(&msg) => {}
777 other => return Err(unexpected_backend_message("pool rls setup", &other)),
778 }
779 }
780
781 let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
783
784 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
785 let mut column_info: Option<std::sync::Arc<ColumnInfo>> = cached_column_info;
786 let mut error: Option<PgError> = None;
787 let mut flow = ExtendedFlowTracker::new(
788 ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
789 );
790
791 loop {
792 let msg = match conn.recv().await {
793 Ok(msg) => msg,
794 Err(err) => {
795 if is_cache_miss && !flow.saw_parse_complete() {
796 conn.stmt_cache.remove(&sql_hash);
797 conn.prepared_statements.remove(&stmt_name);
798 conn.column_info_cache.remove(&sql_hash);
799 }
800 return Err(err);
801 }
802 };
803 if let Err(err) =
804 flow.validate(&msg, "pool fetch_all_with_rls execute", error.is_some())
805 {
806 if is_cache_miss && !flow.saw_parse_complete() {
807 conn.stmt_cache.remove(&sql_hash);
808 conn.prepared_statements.remove(&stmt_name);
809 conn.column_info_cache.remove(&sql_hash);
810 }
811 return Err(err);
812 }
813 match msg {
814 crate::protocol::BackendMessage::ParseComplete => {}
815 crate::protocol::BackendMessage::BindComplete => {}
816 crate::protocol::BackendMessage::ParameterDescription(_) => {}
817 crate::protocol::BackendMessage::RowDescription(fields) => {
818 let info = std::sync::Arc::new(ColumnInfo::from_fields(&fields));
819 if is_cache_miss {
820 conn.column_info_cache.insert(sql_hash, info.clone());
821 }
822 column_info = Some(info);
823 }
824 crate::protocol::BackendMessage::DataRow(data) => {
825 if error.is_none() {
826 rows.push(crate::driver::PgRow {
827 columns: data,
828 column_info: column_info.clone(),
829 });
830 }
831 }
832 crate::protocol::BackendMessage::CommandComplete(_) => {}
833 crate::protocol::BackendMessage::ReadyForQuery(_) => {
834 if let Some(err) = error {
835 if is_cache_miss
836 && !flow.saw_parse_complete()
837 && !err.is_prepared_statement_already_exists()
838 {
839 conn.stmt_cache.remove(&sql_hash);
840 conn.prepared_statements.remove(&stmt_name);
841 conn.column_info_cache.remove(&sql_hash);
842 }
843 return Err(err);
844 }
845 if is_cache_miss && !flow.saw_parse_complete() {
846 conn.stmt_cache.remove(&sql_hash);
847 conn.prepared_statements.remove(&stmt_name);
848 conn.column_info_cache.remove(&sql_hash);
849 return Err(PgError::Protocol(
850 "Cache miss query reached ReadyForQuery without ParseComplete"
851 .to_string(),
852 ));
853 }
854 return Ok(rows);
855 }
856 crate::protocol::BackendMessage::ErrorResponse(err) => {
857 if error.is_none() {
858 error = Some(PgError::QueryServer(err.into()));
859 }
860 }
861 msg if is_ignorable_session_message(&msg) => {}
862 other => {
863 if is_cache_miss && !flow.saw_parse_complete() {
864 conn.stmt_cache.remove(&sql_hash);
865 conn.prepared_statements.remove(&stmt_name);
866 conn.column_info_cache.remove(&sql_hash);
867 }
868 return Err(unexpected_backend_message(
869 "pool fetch_all_with_rls execute",
870 &other,
871 ));
872 }
873 }
874 }
875 }
876}