1use super::connection::PooledConnection;
4use super::lifecycle::MAX_HOT_STATEMENTS;
5use crate::driver::{
6 PgConnection, PgError, PgResult, ResultFormat,
7 extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8 is_ignorable_session_message, unexpected_backend_message,
9};
10use std::sync::Arc;
11
12#[inline]
13fn rollback_cache_miss_statement_registration(
14 conn: &mut PgConnection,
15 is_cache_miss: bool,
16 sql_hash: u64,
17 stmt_name: &str,
18) {
19 if is_cache_miss {
20 conn.stmt_cache.remove(&sql_hash);
21 conn.prepared_statements.remove(stmt_name);
22 conn.column_info_cache.remove(&sql_hash);
23 }
24}
25
26async fn drain_extended_responses_after_rls_setup_error(conn: &mut PgConnection) -> PgResult<()> {
27 loop {
28 let msg = conn.recv().await?;
29 match msg {
30 crate::protocol::BackendMessage::ReadyForQuery(_) => return Ok(()),
31 crate::protocol::BackendMessage::ErrorResponse(_) => {}
32 msg if is_ignorable_session_message(&msg) => {}
33 _ => {}
35 }
36 }
37}
38
39impl PooledConnection {
40 pub async fn fetch_all_uncached(
43 &mut self,
44 cmd: &qail_core::ast::Qail,
45 ) -> PgResult<Vec<crate::driver::PgRow>> {
46 self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
47 .await
48 }
49
50 pub async fn query_raw_with_params(
58 &mut self,
59 sql: &str,
60 params: &[Option<Vec<u8>>],
61 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
62 let conn = self.conn_mut()?;
63 conn.query(sql, params).await
64 }
65
66 pub async fn copy_export(&mut self, cmd: &qail_core::ast::Qail) -> PgResult<Vec<Vec<String>>> {
68 self.conn_mut()?.copy_export(cmd).await
69 }
70
71 pub async fn copy_export_stream_raw<F, Fut>(
73 &mut self,
74 cmd: &qail_core::ast::Qail,
75 on_chunk: F,
76 ) -> PgResult<()>
77 where
78 F: FnMut(Vec<u8>) -> Fut,
79 Fut: std::future::Future<Output = PgResult<()>>,
80 {
81 self.conn_mut()?.copy_export_stream_raw(cmd, on_chunk).await
82 }
83
84 pub async fn copy_export_stream_rows<F>(
86 &mut self,
87 cmd: &qail_core::ast::Qail,
88 on_row: F,
89 ) -> PgResult<()>
90 where
91 F: FnMut(Vec<String>) -> PgResult<()>,
92 {
93 self.conn_mut()?.copy_export_stream_rows(cmd, on_row).await
94 }
95
96 pub async fn copy_export_table(
98 &mut self,
99 table: &str,
100 columns: &[String],
101 ) -> PgResult<Vec<u8>> {
102 let quote_ident = |ident: &str| -> String {
103 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
104 };
105 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
106 let sql = format!(
107 "COPY {} ({}) TO STDOUT",
108 quote_ident(table),
109 cols.join(", ")
110 );
111 self.conn_mut()?.copy_out_raw(&sql).await
112 }
113
114 pub async fn copy_export_table_stream<F, Fut>(
116 &mut self,
117 table: &str,
118 columns: &[String],
119 on_chunk: F,
120 ) -> PgResult<()>
121 where
122 F: FnMut(Vec<u8>) -> Fut,
123 Fut: std::future::Future<Output = PgResult<()>>,
124 {
125 let quote_ident = |ident: &str| -> String {
126 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
127 };
128 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
129 let sql = format!(
130 "COPY {} ({}) TO STDOUT",
131 quote_ident(table),
132 cols.join(", ")
133 );
134 self.conn_mut()?.copy_out_raw_stream(&sql, on_chunk).await
135 }
136
137 pub async fn fetch_all_uncached_with_format(
139 &mut self,
140 cmd: &qail_core::ast::Qail,
141 result_format: ResultFormat,
142 ) -> PgResult<Vec<crate::driver::PgRow>> {
143 use crate::driver::ColumnInfo;
144 use crate::protocol::AstEncoder;
145
146 let conn = self.conn_mut()?;
147
148 AstEncoder::encode_cmd_reuse_into_with_result_format(
149 cmd,
150 &mut conn.sql_buf,
151 &mut conn.params_buf,
152 &mut conn.write_buf,
153 result_format.as_wire_code(),
154 )
155 .map_err(|e| PgError::Encode(e.to_string()))?;
156
157 conn.flush_write_buf().await?;
158
159 let mut rows: Vec<crate::driver::PgRow> = Vec::new();
160 let mut column_info: Option<Arc<ColumnInfo>> = None;
161 let mut error: Option<PgError> = None;
162 let mut flow =
163 ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
164
165 loop {
166 let msg = conn.recv().await?;
167 flow.validate(&msg, "pool fetch_all execute", error.is_some())?;
168 match msg {
169 crate::protocol::BackendMessage::ParseComplete
170 | crate::protocol::BackendMessage::BindComplete => {}
171 crate::protocol::BackendMessage::RowDescription(fields) => {
172 column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
173 }
174 crate::protocol::BackendMessage::DataRow(data) => {
175 if error.is_none() {
176 rows.push(crate::driver::PgRow {
177 columns: data,
178 column_info: column_info.clone(),
179 });
180 }
181 }
182 crate::protocol::BackendMessage::NoData => {}
183 crate::protocol::BackendMessage::CommandComplete(_) => {}
184 crate::protocol::BackendMessage::ReadyForQuery(_) => {
185 if let Some(err) = error {
186 return Err(err);
187 }
188 return Ok(rows);
189 }
190 crate::protocol::BackendMessage::ErrorResponse(err) => {
191 if error.is_none() {
192 error = Some(PgError::QueryServer(err.into()));
193 }
194 }
195 msg if is_ignorable_session_message(&msg) => {}
196 other => {
197 return Err(unexpected_backend_message("pool fetch_all execute", &other));
198 }
199 }
200 }
201 }
202
203 pub async fn fetch_all_fast(
207 &mut self,
208 cmd: &qail_core::ast::Qail,
209 ) -> PgResult<Vec<crate::driver::PgRow>> {
210 self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
211 .await
212 }
213
214 pub async fn fetch_all_fast_with_format(
216 &mut self,
217 cmd: &qail_core::ast::Qail,
218 result_format: ResultFormat,
219 ) -> PgResult<Vec<crate::driver::PgRow>> {
220 use crate::protocol::AstEncoder;
221
222 let conn = self.conn_mut()?;
223
224 AstEncoder::encode_cmd_reuse_into_with_result_format(
225 cmd,
226 &mut conn.sql_buf,
227 &mut conn.params_buf,
228 &mut conn.write_buf,
229 result_format.as_wire_code(),
230 )
231 .map_err(|e| PgError::Encode(e.to_string()))?;
232
233 conn.flush_write_buf().await?;
234
235 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
236 let mut error: Option<PgError> = None;
237 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
238
239 loop {
240 let res = conn.recv_with_data_fast().await;
241 match res {
242 Ok((msg_type, data)) => {
243 flow.validate_msg_type(
244 msg_type,
245 "pool fetch_all_fast execute",
246 error.is_some(),
247 )?;
248 match msg_type {
249 b'D' => {
250 if error.is_none()
251 && let Some(columns) = data
252 {
253 rows.push(crate::driver::PgRow {
254 columns,
255 column_info: None,
256 });
257 }
258 }
259 b'Z' => {
260 if let Some(err) = error {
261 return Err(err);
262 }
263 return Ok(rows);
264 }
265 _ => {}
266 }
267 }
268 Err(e) => {
269 if matches!(&e, PgError::QueryServer(_)) {
270 if error.is_none() {
271 error = Some(e);
272 }
273 continue;
274 }
275 return Err(e);
276 }
277 }
278 }
279 }
280
281 pub async fn fetch_all_cached(
286 &mut self,
287 cmd: &qail_core::ast::Qail,
288 ) -> PgResult<Vec<crate::driver::PgRow>> {
289 self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
290 .await
291 }
292
293 pub async fn fetch_all_cached_with_format(
295 &mut self,
296 cmd: &qail_core::ast::Qail,
297 result_format: ResultFormat,
298 ) -> PgResult<Vec<crate::driver::PgRow>> {
299 let mut retried = false;
300 loop {
301 match self
302 .fetch_all_cached_with_format_once(cmd, result_format)
303 .await
304 {
305 Ok(rows) => return Ok(rows),
306 Err(err)
307 if !retried
308 && (err.is_prepared_statement_retryable()
309 || err.is_prepared_statement_already_exists()) =>
310 {
311 retried = true;
312 if err.is_prepared_statement_retryable()
313 && let Some(conn) = self.conn.as_mut()
314 {
315 conn.clear_prepared_statement_state();
316 }
317 }
318 Err(err) => return Err(err),
319 }
320 }
321 }
322
323 pub async fn fetch_typed<T: crate::driver::row::QailRow>(
325 &mut self,
326 cmd: &qail_core::ast::Qail,
327 ) -> PgResult<Vec<T>> {
328 self.fetch_typed_with_format(cmd, ResultFormat::Text).await
329 }
330
331 pub async fn fetch_typed_with_format<T: crate::driver::row::QailRow>(
336 &mut self,
337 cmd: &qail_core::ast::Qail,
338 result_format: ResultFormat,
339 ) -> PgResult<Vec<T>> {
340 let rows = self
341 .fetch_all_cached_with_format(cmd, result_format)
342 .await?;
343 Ok(rows.iter().map(T::from_row).collect())
344 }
345
346 pub async fn fetch_one_typed<T: crate::driver::row::QailRow>(
348 &mut self,
349 cmd: &qail_core::ast::Qail,
350 ) -> PgResult<Option<T>> {
351 self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
352 .await
353 }
354
355 pub async fn fetch_one_typed_with_format<T: crate::driver::row::QailRow>(
357 &mut self,
358 cmd: &qail_core::ast::Qail,
359 result_format: ResultFormat,
360 ) -> PgResult<Option<T>> {
361 let rows = self
362 .fetch_all_cached_with_format(cmd, result_format)
363 .await?;
364 Ok(rows.first().map(T::from_row))
365 }
366
367 async fn fetch_all_cached_with_format_once(
368 &mut self,
369 cmd: &qail_core::ast::Qail,
370 result_format: ResultFormat,
371 ) -> PgResult<Vec<crate::driver::PgRow>> {
372 use crate::driver::ColumnInfo;
373 use std::collections::hash_map::DefaultHasher;
374 use std::hash::{Hash, Hasher};
375
376 let conn = self.conn.as_mut().ok_or_else(|| {
377 PgError::Connection("Connection already released back to pool".into())
378 })?;
379
380 conn.sql_buf.clear();
381 conn.params_buf.clear();
382
383 match cmd.action {
385 qail_core::ast::Action::Get | qail_core::ast::Action::With => {
386 crate::protocol::ast_encoder::dml::encode_select(
387 cmd,
388 &mut conn.sql_buf,
389 &mut conn.params_buf,
390 )?;
391 }
392 qail_core::ast::Action::Add => {
393 crate::protocol::ast_encoder::dml::encode_insert(
394 cmd,
395 &mut conn.sql_buf,
396 &mut conn.params_buf,
397 )?;
398 }
399 qail_core::ast::Action::Set => {
400 crate::protocol::ast_encoder::dml::encode_update(
401 cmd,
402 &mut conn.sql_buf,
403 &mut conn.params_buf,
404 )?;
405 }
406 qail_core::ast::Action::Del => {
407 crate::protocol::ast_encoder::dml::encode_delete(
408 cmd,
409 &mut conn.sql_buf,
410 &mut conn.params_buf,
411 )?;
412 }
413 _ => {
414 return self
416 .fetch_all_uncached_with_format(cmd, result_format)
417 .await;
418 }
419 }
420
421 let mut hasher = DefaultHasher::new();
422 conn.sql_buf.hash(&mut hasher);
423 let sql_hash = hasher.finish();
424
425 let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
426
427 conn.write_buf.clear();
428
429 let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
430 name
431 } else {
432 let name = format!("qail_{:x}", sql_hash);
433
434 conn.evict_prepared_if_full();
435
436 let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
437
438 use crate::protocol::PgEncoder;
439 let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
440 let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
441 conn.write_buf.extend_from_slice(&parse_msg);
442 conn.write_buf.extend_from_slice(&describe_msg);
443
444 conn.stmt_cache.put(sql_hash, name.clone());
445 conn.prepared_statements
446 .insert(name.clone(), sql_str.to_string());
447
448 if let Ok(mut hot) = self.pool.hot_statements.write()
450 && hot.len() < MAX_HOT_STATEMENTS
451 {
452 hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
453 }
454
455 name
456 };
457
458 use crate::protocol::PgEncoder;
459 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
460 &mut conn.write_buf,
461 &stmt_name,
462 &conn.params_buf,
463 result_format.as_wire_code(),
464 ) {
465 if is_cache_miss {
466 conn.stmt_cache.remove(&sql_hash);
467 conn.prepared_statements.remove(&stmt_name);
468 conn.column_info_cache.remove(&sql_hash);
469 }
470 return Err(PgError::Encode(e.to_string()));
471 }
472 PgEncoder::encode_execute_to(&mut conn.write_buf);
473 PgEncoder::encode_sync_to(&mut conn.write_buf);
474
475 if let Err(err) = conn.flush_write_buf().await {
476 if is_cache_miss {
477 conn.stmt_cache.remove(&sql_hash);
478 conn.prepared_statements.remove(&stmt_name);
479 conn.column_info_cache.remove(&sql_hash);
480 }
481 return Err(err);
482 }
483
484 let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
485
486 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
487 let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
488 let mut error: Option<PgError> = None;
489 let mut flow = ExtendedFlowTracker::new(
490 ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
491 );
492
493 loop {
494 let msg = match conn.recv().await {
495 Ok(msg) => msg,
496 Err(err) => {
497 if is_cache_miss && !flow.saw_parse_complete() {
498 conn.stmt_cache.remove(&sql_hash);
499 conn.prepared_statements.remove(&stmt_name);
500 conn.column_info_cache.remove(&sql_hash);
501 }
502 return Err(err);
503 }
504 };
505 if let Err(err) = flow.validate(&msg, "pool fetch_all_cached execute", error.is_some())
506 {
507 if is_cache_miss && !flow.saw_parse_complete() {
508 conn.stmt_cache.remove(&sql_hash);
509 conn.prepared_statements.remove(&stmt_name);
510 conn.column_info_cache.remove(&sql_hash);
511 }
512 return Err(err);
513 }
514 match msg {
515 crate::protocol::BackendMessage::ParseComplete => {}
516 crate::protocol::BackendMessage::BindComplete => {}
517 crate::protocol::BackendMessage::ParameterDescription(_) => {}
518 crate::protocol::BackendMessage::RowDescription(fields) => {
519 let info = Arc::new(ColumnInfo::from_fields(&fields));
520 if is_cache_miss {
521 conn.column_info_cache.insert(sql_hash, info.clone());
522 }
523 column_info = Some(info);
524 }
525 crate::protocol::BackendMessage::DataRow(data) => {
526 if error.is_none() {
527 rows.push(crate::driver::PgRow {
528 columns: data,
529 column_info: column_info.clone(),
530 });
531 }
532 }
533 crate::protocol::BackendMessage::CommandComplete(_) => {}
534 crate::protocol::BackendMessage::ReadyForQuery(_) => {
535 if let Some(err) = error {
536 if is_cache_miss
537 && !flow.saw_parse_complete()
538 && !err.is_prepared_statement_already_exists()
539 {
540 conn.stmt_cache.remove(&sql_hash);
541 conn.prepared_statements.remove(&stmt_name);
542 conn.column_info_cache.remove(&sql_hash);
543 }
544 return Err(err);
545 }
546 if is_cache_miss && !flow.saw_parse_complete() {
547 conn.stmt_cache.remove(&sql_hash);
548 conn.prepared_statements.remove(&stmt_name);
549 conn.column_info_cache.remove(&sql_hash);
550 return Err(PgError::Protocol(
551 "Cache miss query reached ReadyForQuery without ParseComplete"
552 .to_string(),
553 ));
554 }
555 return Ok(rows);
556 }
557 crate::protocol::BackendMessage::ErrorResponse(err) => {
558 if error.is_none() {
559 error = Some(PgError::QueryServer(err.into()));
560 }
561 }
562 msg if is_ignorable_session_message(&msg) => {}
563 other => {
564 if is_cache_miss && !flow.saw_parse_complete() {
565 conn.stmt_cache.remove(&sql_hash);
566 conn.prepared_statements.remove(&stmt_name);
567 conn.column_info_cache.remove(&sql_hash);
568 }
569 return Err(unexpected_backend_message(
570 "pool fetch_all_cached execute",
571 &other,
572 ));
573 }
574 }
575 }
576 }
577
578 pub async fn fetch_all_with_rls(
597 &mut self,
598 cmd: &qail_core::ast::Qail,
599 rls_sql: &str,
600 ) -> PgResult<Vec<crate::driver::PgRow>> {
601 self.fetch_all_with_rls_with_format(cmd, rls_sql, ResultFormat::Text)
602 .await
603 }
604
605 pub async fn fetch_all_with_rls_with_format(
607 &mut self,
608 cmd: &qail_core::ast::Qail,
609 rls_sql: &str,
610 result_format: ResultFormat,
611 ) -> PgResult<Vec<crate::driver::PgRow>> {
612 let mut retried = false;
613 loop {
614 match self
615 .fetch_all_with_rls_with_format_once(cmd, rls_sql, result_format)
616 .await
617 {
618 Ok(rows) => return Ok(rows),
619 Err(err)
620 if !retried
621 && (err.is_prepared_statement_retryable()
622 || err.is_prepared_statement_already_exists()) =>
623 {
624 retried = true;
625 if let Some(conn) = self.conn.as_mut() {
626 if err.is_prepared_statement_retryable() {
627 conn.clear_prepared_statement_state();
628 }
629 let _ = conn.execute_simple("ROLLBACK").await;
632 }
633 self.rls_dirty = false;
634 }
635 Err(err) => return Err(err),
636 }
637 }
638 }
639
640 async fn fetch_all_with_rls_with_format_once(
641 &mut self,
642 cmd: &qail_core::ast::Qail,
643 rls_sql: &str,
644 result_format: ResultFormat,
645 ) -> PgResult<Vec<crate::driver::PgRow>> {
646 use crate::driver::ColumnInfo;
647 use std::collections::hash_map::DefaultHasher;
648 use std::hash::{Hash, Hasher};
649
650 let conn = self.conn.as_mut().ok_or_else(|| {
651 PgError::Connection("Connection already released back to pool".into())
652 })?;
653
654 conn.sql_buf.clear();
655 conn.params_buf.clear();
656
657 match cmd.action {
659 qail_core::ast::Action::Get | qail_core::ast::Action::With => {
660 crate::protocol::ast_encoder::dml::encode_select(
661 cmd,
662 &mut conn.sql_buf,
663 &mut conn.params_buf,
664 )?;
665 }
666 qail_core::ast::Action::Add => {
667 crate::protocol::ast_encoder::dml::encode_insert(
668 cmd,
669 &mut conn.sql_buf,
670 &mut conn.params_buf,
671 )?;
672 }
673 qail_core::ast::Action::Set => {
674 crate::protocol::ast_encoder::dml::encode_update(
675 cmd,
676 &mut conn.sql_buf,
677 &mut conn.params_buf,
678 )?;
679 }
680 qail_core::ast::Action::Del => {
681 crate::protocol::ast_encoder::dml::encode_delete(
682 cmd,
683 &mut conn.sql_buf,
684 &mut conn.params_buf,
685 )?;
686 }
687 _ => {
688 conn.execute_simple(rls_sql).await?;
690 self.rls_dirty = true;
691 return self
692 .fetch_all_uncached_with_format(cmd, result_format)
693 .await;
694 }
695 }
696
697 let mut hasher = DefaultHasher::new();
698 conn.sql_buf.hash(&mut hasher);
699 let sql_hash = hasher.finish();
700
701 let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
702
703 conn.write_buf.clear();
704
705 let rls_msg = crate::protocol::PgEncoder::try_encode_query_string(rls_sql)?;
710 conn.write_buf.extend_from_slice(&rls_msg);
711
712 let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
714 name
715 } else {
716 let name = format!("qail_{:x}", sql_hash);
717
718 conn.evict_prepared_if_full();
719
720 let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
721
722 use crate::protocol::PgEncoder;
723 let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
724 let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
725 conn.write_buf.extend_from_slice(&parse_msg);
726 conn.write_buf.extend_from_slice(&describe_msg);
727
728 conn.stmt_cache.put(sql_hash, name.clone());
729 conn.prepared_statements
730 .insert(name.clone(), sql_str.to_string());
731
732 if let Ok(mut hot) = self.pool.hot_statements.write()
733 && hot.len() < MAX_HOT_STATEMENTS
734 {
735 hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
736 }
737
738 name
739 };
740
741 use crate::protocol::PgEncoder;
742 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
743 &mut conn.write_buf,
744 &stmt_name,
745 &conn.params_buf,
746 result_format.as_wire_code(),
747 ) {
748 rollback_cache_miss_statement_registration(conn, is_cache_miss, sql_hash, &stmt_name);
749 return Err(PgError::Encode(e.to_string()));
750 }
751 PgEncoder::encode_execute_to(&mut conn.write_buf);
752 PgEncoder::encode_sync_to(&mut conn.write_buf);
753
754 if let Err(err) = conn.flush_write_buf().await {
756 rollback_cache_miss_statement_registration(conn, is_cache_miss, sql_hash, &stmt_name);
757 return Err(err);
758 }
759
760 self.rls_dirty = true;
762
763 let mut rls_error: Option<PgError> = None;
767 loop {
768 let msg = match conn.recv().await {
769 Ok(msg) => msg,
770 Err(err) => {
771 rollback_cache_miss_statement_registration(
772 conn,
773 is_cache_miss,
774 sql_hash,
775 &stmt_name,
776 );
777 return Err(err);
778 }
779 };
780 match msg {
781 crate::protocol::BackendMessage::ReadyForQuery(_) => {
782 if let Some(err) = rls_error {
784 rollback_cache_miss_statement_registration(
785 conn,
786 is_cache_miss,
787 sql_hash,
788 &stmt_name,
789 );
790 if let Err(drain_err) =
791 drain_extended_responses_after_rls_setup_error(conn).await
792 {
793 tracing::warn!(
794 error = %drain_err,
795 "failed to drain pipelined extended responses after RLS setup error"
796 );
797 }
798 return Err(err);
799 }
800 break;
801 }
802 crate::protocol::BackendMessage::ErrorResponse(err) => {
803 if rls_error.is_none() {
804 rls_error = Some(PgError::QueryServer(err.into()));
805 }
806 }
807 crate::protocol::BackendMessage::CommandComplete(_)
809 | crate::protocol::BackendMessage::DataRow(_)
810 | crate::protocol::BackendMessage::RowDescription(_)
811 | crate::protocol::BackendMessage::ParseComplete
812 | crate::protocol::BackendMessage::BindComplete => {}
813 msg if is_ignorable_session_message(&msg) => {}
814 other => return Err(unexpected_backend_message("pool rls setup", &other)),
815 }
816 }
817
818 let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
820
821 let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
822 let mut column_info: Option<std::sync::Arc<ColumnInfo>> = cached_column_info;
823 let mut error: Option<PgError> = None;
824 let mut flow = ExtendedFlowTracker::new(
825 ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
826 );
827
828 loop {
829 let msg = match conn.recv().await {
830 Ok(msg) => msg,
831 Err(err) => {
832 if is_cache_miss && !flow.saw_parse_complete() {
833 rollback_cache_miss_statement_registration(
834 conn,
835 is_cache_miss,
836 sql_hash,
837 &stmt_name,
838 );
839 }
840 return Err(err);
841 }
842 };
843 if let Err(err) =
844 flow.validate(&msg, "pool fetch_all_with_rls execute", error.is_some())
845 {
846 if is_cache_miss && !flow.saw_parse_complete() {
847 rollback_cache_miss_statement_registration(
848 conn,
849 is_cache_miss,
850 sql_hash,
851 &stmt_name,
852 );
853 }
854 return Err(err);
855 }
856 match msg {
857 crate::protocol::BackendMessage::ParseComplete => {}
858 crate::protocol::BackendMessage::BindComplete => {}
859 crate::protocol::BackendMessage::ParameterDescription(_) => {}
860 crate::protocol::BackendMessage::RowDescription(fields) => {
861 let info = std::sync::Arc::new(ColumnInfo::from_fields(&fields));
862 if is_cache_miss {
863 conn.column_info_cache.insert(sql_hash, info.clone());
864 }
865 column_info = Some(info);
866 }
867 crate::protocol::BackendMessage::DataRow(data) => {
868 if error.is_none() {
869 rows.push(crate::driver::PgRow {
870 columns: data,
871 column_info: column_info.clone(),
872 });
873 }
874 }
875 crate::protocol::BackendMessage::CommandComplete(_) => {}
876 crate::protocol::BackendMessage::ReadyForQuery(_) => {
877 if let Some(err) = error {
878 if is_cache_miss
879 && !flow.saw_parse_complete()
880 && !err.is_prepared_statement_already_exists()
881 {
882 rollback_cache_miss_statement_registration(
883 conn,
884 is_cache_miss,
885 sql_hash,
886 &stmt_name,
887 );
888 }
889 return Err(err);
890 }
891 if is_cache_miss && !flow.saw_parse_complete() {
892 rollback_cache_miss_statement_registration(
893 conn,
894 is_cache_miss,
895 sql_hash,
896 &stmt_name,
897 );
898 return Err(PgError::Protocol(
899 "Cache miss query reached ReadyForQuery without ParseComplete"
900 .to_string(),
901 ));
902 }
903 return Ok(rows);
904 }
905 crate::protocol::BackendMessage::ErrorResponse(err) => {
906 if error.is_none() {
907 error = Some(PgError::QueryServer(err.into()));
908 }
909 }
910 msg if is_ignorable_session_message(&msg) => {}
911 other => {
912 if is_cache_miss && !flow.saw_parse_complete() {
913 rollback_cache_miss_statement_registration(
914 conn,
915 is_cache_miss,
916 sql_hash,
917 &stmt_name,
918 );
919 }
920 return Err(unexpected_backend_message(
921 "pool fetch_all_with_rls execute",
922 &other,
923 ));
924 }
925 }
926 }
927 }
928}