1use super::{
6 PgConnection, PgError, PgResult,
7 extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8 is_ignorable_session_message, is_ignorable_session_msg_type, unexpected_backend_message,
9 unexpected_backend_msg_type,
10};
11use crate::protocol::{BackendMessage, PgEncoder};
12use bytes::{Bytes, BytesMut};
13use std::time::{Duration, Instant};
14
15#[inline]
16fn capture_query_server_error(conn: &mut PgConnection, slot: &mut Option<PgError>, err: PgError) {
17 if slot.is_some() {
18 return;
19 }
20 if err.is_prepared_statement_retryable() {
21 conn.clear_prepared_statement_state();
22 }
23 *slot = Some(err);
24}
25
26#[inline]
27fn return_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
28 if matches!(
29 err,
30 PgError::Protocol(_) | PgError::Connection(_) | PgError::Timeout(_)
31 ) {
32 conn.mark_io_desynced();
33 }
34 Err(err)
35}
36
37#[inline]
38fn return_callback_error_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
39 conn.mark_io_desynced();
40 Err(err)
41}
42
43#[inline]
44fn prepared_bind_execute_sync_wire_len(
45 statement: &str,
46 params: &[Option<Vec<u8>>],
47 result_format: i16,
48) -> PgResult<usize> {
49 let needed = PgEncoder::bind_execute_sync_wire_len_with_formats(
50 statement,
51 params,
52 PgEncoder::FORMAT_TEXT,
53 result_format,
54 )
55 .map_err(|e| PgError::Encode(e.to_string()))?;
56 Ok(needed)
57}
58
59#[inline]
60fn reserve_prepared_single_write_buf(
61 conn: &mut PgConnection,
62 stmt: &super::PreparedStatement,
63 params: &[Option<Vec<u8>>],
64 result_format: i16,
65) -> PgResult<()> {
66 conn.write_buf.clear();
67 let needed = prepared_bind_execute_sync_wire_len(&stmt.name, params, result_format)?;
68 conn.write_buf.reserve(needed);
69 Ok(())
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73enum SimpleStatementState {
74 AwaitingResult,
75 InRowStream,
76}
77
78#[derive(Debug, Clone, Copy)]
79struct SimpleFlowTracker {
80 state: SimpleStatementState,
81 saw_completion: bool,
82}
83
84impl SimpleFlowTracker {
85 fn new() -> Self {
86 Self {
87 state: SimpleStatementState::AwaitingResult,
88 saw_completion: false,
89 }
90 }
91
92 fn on_row_description(&mut self, context: &'static str) -> PgResult<()> {
93 if self.state == SimpleStatementState::InRowStream {
94 return Err(PgError::Protocol(format!(
95 "{}: duplicate RowDescription before statement completion",
96 context
97 )));
98 }
99 self.state = SimpleStatementState::InRowStream;
100 self.saw_completion = false;
101 Ok(())
102 }
103
104 fn on_data_row(&self, context: &'static str) -> PgResult<()> {
105 if self.state != SimpleStatementState::InRowStream {
106 return Err(PgError::Protocol(format!(
107 "{}: DataRow before RowDescription",
108 context
109 )));
110 }
111 Ok(())
112 }
113
114 fn on_command_complete(&mut self) {
115 self.state = SimpleStatementState::AwaitingResult;
116 self.saw_completion = true;
117 }
118
119 fn on_empty_query_response(&mut self, context: &'static str) -> PgResult<()> {
120 if self.state == SimpleStatementState::InRowStream {
121 return Err(PgError::Protocol(format!(
122 "{}: EmptyQueryResponse during active row stream",
123 context
124 )));
125 }
126 self.saw_completion = true;
127 Ok(())
128 }
129
130 fn on_ready_for_query(&self, context: &'static str, error_pending: bool) -> PgResult<()> {
131 if error_pending {
132 return Ok(());
133 }
134 if self.state == SimpleStatementState::InRowStream {
135 return Err(PgError::Protocol(format!(
136 "{}: ReadyForQuery before CommandComplete",
137 context
138 )));
139 }
140 if !self.saw_completion {
141 return Err(PgError::Protocol(format!(
142 "{}: ReadyForQuery before completion",
143 context
144 )));
145 }
146 Ok(())
147 }
148}
149
150impl PgConnection {
151 fn validate_param_type_arity(params: &[Option<Vec<u8>>], param_types: &[u32]) -> PgResult<()> {
152 if !param_types.is_empty() && param_types.len() != params.len() {
153 return Err(PgError::Encode(format!(
154 "parameter type count {} does not match parameter count {}",
155 param_types.len(),
156 params.len()
157 )));
158 }
159 Ok(())
160 }
161
162 pub(crate) async fn query(
168 &mut self,
169 sql: &str,
170 params: &[Option<Vec<u8>>],
171 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
172 self.query_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
173 .await
174 }
175
176 pub(crate) async fn query_with_result_format(
178 &mut self,
179 sql: &str,
180 params: &[Option<Vec<u8>>],
181 result_format: i16,
182 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
183 let bytes = PgEncoder::encode_extended_query_with_result_format(sql, params, result_format)
184 .map_err(|e| PgError::Encode(e.to_string()))?;
185 self.write_all_with_timeout(&bytes, "stream write").await?;
186
187 let mut rows = Vec::new();
188
189 let mut error: Option<PgError> = None;
190 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
191
192 loop {
193 let msg = self.recv().await?;
194 if let Err(err) = flow.validate(&msg, "extended-query execute", error.is_some()) {
195 return return_with_desync(self, err);
196 }
197 match msg {
198 BackendMessage::ParseComplete => {}
199 BackendMessage::BindComplete => {}
200 BackendMessage::RowDescription(_) => {}
201 BackendMessage::DataRow(data) => {
202 if error.is_none() {
204 rows.push(data);
205 }
206 }
207 BackendMessage::CommandComplete(_) => {}
208 BackendMessage::NoData => {}
209 BackendMessage::ReadyForQuery(_) => {
210 if let Some(err) = error {
211 return Err(err);
212 }
213 return Ok(rows);
214 }
215 BackendMessage::ErrorResponse(err) => {
216 if error.is_none() {
217 error = Some(PgError::QueryServer(err.into()));
218 }
219 }
220 msg if is_ignorable_session_message(&msg) => {}
221 other => {
222 return return_with_desync(
223 self,
224 unexpected_backend_message("extended-query execute", &other),
225 );
226 }
227 }
228 }
229 }
230
231 pub async fn query_count(&mut self, sql: &str, params: &[Option<Vec<u8>>]) -> PgResult<()> {
233 self.query_count_with_param_types(sql, &[], params).await
234 }
235
236 pub async fn query_count_with_param_types(
239 &mut self,
240 sql: &str,
241 param_types: &[u32],
242 params: &[Option<Vec<u8>>],
243 ) -> PgResult<()> {
244 Self::validate_param_type_arity(params, param_types)?;
245
246 self.write_buf.clear();
247 PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
248 .map_err(|e| PgError::Encode(e.to_string()))?;
249 PgEncoder::encode_bind_to(&mut self.write_buf, "", params)
250 .map_err(|e| PgError::Encode(e.to_string()))?;
251 PgEncoder::encode_execute_to(&mut self.write_buf);
252 PgEncoder::encode_sync_to(&mut self.write_buf);
253
254 self.flush_write_buf().await?;
255
256 let mut error: Option<PgError> = None;
257 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
258
259 loop {
260 match self.recv_msg_type_fast().await {
261 Ok(msg_type) => {
262 if let Err(err) = flow.validate_msg_type(
263 msg_type,
264 "extended-query count execute",
265 error.is_some(),
266 ) {
267 return return_with_desync(self, err);
268 }
269 match msg_type {
270 b'1' | b'2' | b'T' | b'D' | b'C' | b'n' => {}
271 b'Z' => {
272 if let Some(err) = error {
273 return Err(err);
274 }
275 return Ok(());
276 }
277 msg_type if is_ignorable_session_msg_type(msg_type) => {}
278 other => {
279 return return_with_desync(
280 self,
281 unexpected_backend_msg_type("extended-query count execute", other),
282 );
283 }
284 }
285 }
286 Err(e) => {
287 if matches!(&e, PgError::QueryServer(_)) {
288 capture_query_server_error(self, &mut error, e);
289 continue;
290 }
291 return Err(e);
292 }
293 }
294 }
295 }
296
297 pub async fn query_rows(
303 &mut self,
304 sql: &str,
305 params: &[Option<Vec<u8>>],
306 ) -> PgResult<Vec<super::PgRow>> {
307 self.query_rows_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
308 .await
309 }
310
311 pub async fn query_rows_with_result_format(
314 &mut self,
315 sql: &str,
316 params: &[Option<Vec<u8>>],
317 result_format: i16,
318 ) -> PgResult<Vec<super::PgRow>> {
319 self.query_rows_with_param_types_and_result_format(sql, &[], params, result_format)
320 .await
321 }
322
323 pub async fn query_visit_bytes_rows_with_result_format<F>(
328 &mut self,
329 sql: &str,
330 params: &[Option<Vec<u8>>],
331 result_format: i16,
332 on_row: F,
333 ) -> PgResult<usize>
334 where
335 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
336 {
337 self.query_visit_bytes_rows_with_param_types_and_result_format(
338 sql,
339 &[],
340 params,
341 result_format,
342 on_row,
343 )
344 .await
345 }
346
347 pub async fn query_visit_first_column_bytes_with_result_format<F>(
349 &mut self,
350 sql: &str,
351 params: &[Option<Vec<u8>>],
352 result_format: i16,
353 on_value: F,
354 ) -> PgResult<usize>
355 where
356 F: FnMut(Option<&[u8]>) -> PgResult<()>,
357 {
358 self.query_visit_first_column_bytes_with_param_types_and_result_format(
359 sql,
360 &[],
361 params,
362 result_format,
363 on_value,
364 )
365 .await
366 }
367
368 pub async fn query_rows_with_param_types_and_result_format(
371 &mut self,
372 sql: &str,
373 param_types: &[u32],
374 params: &[Option<Vec<u8>>],
375 result_format: i16,
376 ) -> PgResult<Vec<super::PgRow>> {
377 use std::sync::Arc;
378
379 Self::validate_param_type_arity(params, param_types)?;
380
381 self.write_buf.clear();
382 PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
383 .map_err(|e| PgError::Encode(e.to_string()))?;
384 PgEncoder::encode_bind_to_with_result_format(
385 &mut self.write_buf,
386 "",
387 params,
388 result_format,
389 )
390 .map_err(|e| PgError::Encode(e.to_string()))?;
391 let describe_msg =
392 PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
393 self.write_buf.extend_from_slice(&describe_msg);
394 PgEncoder::encode_execute_to(&mut self.write_buf);
395 PgEncoder::encode_sync_to(&mut self.write_buf);
396 self.flush_write_buf().await?;
397
398 let mut rows: Vec<super::PgRow> = Vec::new();
399 let mut column_info: Option<Arc<super::ColumnInfo>> = None;
400 let mut error: Option<PgError> = None;
401 let mut flow =
402 ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
403
404 loop {
405 let msg = self.recv().await?;
406 if let Err(err) = flow.validate(&msg, "extended-query rows execute", error.is_some()) {
407 return return_with_desync(self, err);
408 }
409 match msg {
410 BackendMessage::ParseComplete => {}
411 BackendMessage::BindComplete => {}
412 BackendMessage::RowDescription(fields) => {
413 column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
414 }
415 BackendMessage::DataRow(data) => {
416 if error.is_none() {
417 rows.push(super::PgRow {
418 columns: data,
419 column_info: column_info.clone(),
420 });
421 }
422 }
423 BackendMessage::CommandComplete(_) => {}
424 BackendMessage::NoData => {}
425 BackendMessage::ReadyForQuery(_) => {
426 if let Some(err) = error {
427 return Err(err);
428 }
429 return Ok(rows);
430 }
431 BackendMessage::ErrorResponse(err) => {
432 if error.is_none() {
433 error = Some(PgError::QueryServer(err.into()));
434 }
435 }
436 msg if is_ignorable_session_message(&msg) => {}
437 other => {
438 return return_with_desync(
439 self,
440 unexpected_backend_message("extended-query rows execute", &other),
441 );
442 }
443 }
444 }
445 }
446
447 pub async fn query_visit_bytes_rows_with_param_types_and_result_format<F>(
453 &mut self,
454 sql: &str,
455 param_types: &[u32],
456 params: &[Option<Vec<u8>>],
457 result_format: i16,
458 mut on_row: F,
459 ) -> PgResult<usize>
460 where
461 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
462 {
463 Self::validate_param_type_arity(params, param_types)?;
464
465 self.write_buf.clear();
466 PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
467 .map_err(|e| PgError::Encode(e.to_string()))?;
468 PgEncoder::encode_bind_to_with_result_format(
469 &mut self.write_buf,
470 "",
471 params,
472 result_format,
473 )
474 .map_err(|e| PgError::Encode(e.to_string()))?;
475 PgEncoder::encode_execute_to(&mut self.write_buf);
476 PgEncoder::encode_sync_to(&mut self.write_buf);
477
478 self.flush_write_buf().await?;
479
480 let mut row_count = 0usize;
481 let mut row = super::PgBytesRow::default();
482 let mut error: Option<PgError> = None;
483 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
484
485 loop {
486 match self.recv_fill_zerocopy_row_fast(&mut row).await {
487 Ok(msg_type) => {
488 if let Err(err) = flow.validate_msg_type(
489 msg_type,
490 "extended-query visit bytes execute",
491 error.is_some(),
492 ) {
493 return return_with_desync(self, err);
494 }
495 match msg_type {
496 b'1' | b'2' | b'T' | b'n' => {}
497 b'D' => {
498 if error.is_none() {
499 if let Err(err) = on_row(&row) {
500 return return_callback_error_with_desync(self, err);
501 }
502 row_count += 1;
503 row.release_payload();
504 }
505 }
506 b'C' => {}
507 b'Z' => {
508 if let Some(err) = error {
509 return Err(err);
510 }
511 return Ok(row_count);
512 }
513 msg_type if is_ignorable_session_msg_type(msg_type) => {}
514 other => {
515 return return_with_desync(
516 self,
517 unexpected_backend_msg_type(
518 "extended-query visit bytes execute",
519 other,
520 ),
521 );
522 }
523 }
524 }
525 Err(e) => {
526 if matches!(&e, PgError::QueryServer(_)) {
527 capture_query_server_error(self, &mut error, e);
528 continue;
529 }
530 return Err(e);
531 }
532 }
533 }
534 }
535
536 pub async fn query_visit_first_column_bytes_with_param_types_and_result_format<F>(
539 &mut self,
540 sql: &str,
541 param_types: &[u32],
542 params: &[Option<Vec<u8>>],
543 result_format: i16,
544 mut on_value: F,
545 ) -> PgResult<usize>
546 where
547 F: FnMut(Option<&[u8]>) -> PgResult<()>,
548 {
549 Self::validate_param_type_arity(params, param_types)?;
550
551 self.write_buf.clear();
552 PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
553 .map_err(|e| PgError::Encode(e.to_string()))?;
554 PgEncoder::encode_bind_to_with_result_format(
555 &mut self.write_buf,
556 "",
557 params,
558 result_format,
559 )
560 .map_err(|e| PgError::Encode(e.to_string()))?;
561 PgEncoder::encode_execute_to(&mut self.write_buf);
562 PgEncoder::encode_sync_to(&mut self.write_buf);
563
564 self.flush_write_buf().await?;
565
566 let mut row_count = 0usize;
567 let mut first_column: Option<Bytes> = None;
568 let mut error: Option<PgError> = None;
569 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
570
571 loop {
572 match self
573 .recv_fill_first_column_zerocopy_fast(&mut first_column)
574 .await
575 {
576 Ok(msg_type) => {
577 if let Err(err) = flow.validate_msg_type(
578 msg_type,
579 "extended-query visit first-column execute",
580 error.is_some(),
581 ) {
582 return return_with_desync(self, err);
583 }
584 match msg_type {
585 b'1' | b'2' | b'T' | b'n' => {}
586 b'D' => {
587 if error.is_none() {
588 if let Err(err) = on_value(first_column.as_deref()) {
589 return return_callback_error_with_desync(self, err);
590 }
591 row_count += 1;
592 first_column = None;
593 }
594 }
595 b'C' => {}
596 b'Z' => {
597 if let Some(err) = error {
598 return Err(err);
599 }
600 return Ok(row_count);
601 }
602 msg_type if is_ignorable_session_msg_type(msg_type) => {}
603 other => {
604 return return_with_desync(
605 self,
606 unexpected_backend_msg_type(
607 "extended-query visit first-column execute",
608 other,
609 ),
610 );
611 }
612 }
613 }
614 Err(e) => {
615 if matches!(&e, PgError::QueryServer(_)) {
616 capture_query_server_error(self, &mut error, e);
617 continue;
618 }
619 return Err(e);
620 }
621 }
622 }
623 }
624
625 pub async fn probe_query_with_param_types(
628 &mut self,
629 sql: &str,
630 param_types: &[u32],
631 params: &[Option<Vec<u8>>],
632 ) -> PgResult<()> {
633 Self::validate_param_type_arity(params, param_types)?;
634
635 let parse = PgEncoder::try_encode_parse("", sql, param_types)
636 .map_err(|e| PgError::Encode(e.to_string()))?;
637 let bind =
638 PgEncoder::encode_bind("", "", params).map_err(|e| PgError::Encode(e.to_string()))?;
639 let describe =
640 PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
641 let sync = PgEncoder::encode_sync();
642 let mut bytes =
643 BytesMut::with_capacity(parse.len() + bind.len() + describe.len() + sync.len());
644 bytes.extend_from_slice(&parse);
645 bytes.extend_from_slice(&bind);
646 bytes.extend_from_slice(&describe);
647 bytes.extend_from_slice(&sync);
648 self.write_all_with_timeout(&bytes, "stream write").await?;
649
650 let mut saw_describe_response = false;
651 let mut error: Option<PgError> = None;
652 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal());
653
654 loop {
655 let msg = self.recv().await?;
656 if let Err(err) = flow.validate(&msg, "extended-query probe", error.is_some()) {
657 return return_with_desync(self, err);
658 }
659 match msg {
660 BackendMessage::ParseComplete => {}
661 BackendMessage::BindComplete => {}
662 BackendMessage::RowDescription(_) | BackendMessage::NoData => {
663 saw_describe_response = true;
664 }
665 BackendMessage::ReadyForQuery(_) => {
666 if let Some(err) = error {
667 return Err(err);
668 }
669 if !saw_describe_response {
670 return return_with_desync(
671 self,
672 PgError::Protocol(
673 "extended-query probe finished without RowDescription/NoData"
674 .to_string(),
675 ),
676 );
677 }
678 return Ok(());
679 }
680 BackendMessage::ErrorResponse(err) => {
681 if error.is_none() {
682 error = Some(PgError::QueryServer(err.into()));
683 }
684 }
685 msg if is_ignorable_session_message(&msg) => {}
686 other => {
687 return return_with_desync(
688 self,
689 unexpected_backend_message("extended-query probe", &other),
690 );
691 }
692 }
693 }
694 }
695
696 pub async fn query_cached(
701 &mut self,
702 sql: &str,
703 params: &[Option<Vec<u8>>],
704 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
705 self.query_cached_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
706 .await
707 }
708
709 pub async fn query_cached_with_result_format(
711 &mut self,
712 sql: &str,
713 params: &[Option<Vec<u8>>],
714 result_format: i16,
715 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
716 let mut retried = false;
717 loop {
718 match self
719 .query_cached_with_result_format_once(sql, params, result_format)
720 .await
721 {
722 Ok(rows) => return Ok(rows),
723 Err(err)
724 if !retried
725 && (err.is_prepared_statement_retryable()
726 || err.is_prepared_statement_already_exists()) =>
727 {
728 retried = true;
729 if err.is_prepared_statement_retryable() {
730 self.clear_prepared_statement_state();
731 }
732 }
733 Err(err) => return Err(err),
734 }
735 }
736 }
737
738 async fn query_cached_with_result_format_once(
739 &mut self,
740 sql: &str,
741 params: &[Option<Vec<u8>>],
742 result_format: i16,
743 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
744 let stmt_name = Self::sql_to_stmt_name(sql);
745 let is_new = !self.prepared_statements.contains_key(&stmt_name);
746
747 let needed = prepared_bind_execute_sync_wire_len(&stmt_name, params, result_format)?;
748 let mut buf = BytesMut::with_capacity(needed);
749
750 if is_new {
751 self.evict_prepared_if_full();
755 if let Err(e) = PgEncoder::try_encode_parse_to(&mut buf, &stmt_name, sql, &[]) {
756 return Err(PgError::Encode(e.to_string()));
757 }
758 self.prepared_statements
760 .insert(stmt_name.clone(), sql.to_string());
761 }
762
763 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
765 &mut buf,
766 &stmt_name,
767 params,
768 result_format,
769 ) {
770 if is_new {
771 self.prepared_statements.remove(&stmt_name);
772 }
773 return Err(PgError::Encode(e.to_string()));
774 }
775 PgEncoder::encode_execute_to(&mut buf);
776 PgEncoder::encode_sync_to(&mut buf);
777
778 if let Err(err) = self.write_all_with_timeout(&buf, "stream write").await {
779 if is_new {
780 self.prepared_statements.remove(&stmt_name);
781 }
782 return Err(err);
783 }
784
785 let mut rows = Vec::new();
786
787 let mut error: Option<PgError> = None;
788 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(is_new));
789
790 loop {
791 let msg = match self.recv().await {
792 Ok(msg) => msg,
793 Err(err) => {
794 if is_new && !flow.saw_parse_complete() {
795 self.prepared_statements.remove(&stmt_name);
796 }
797 return Err(err);
798 }
799 };
800 if let Err(err) = flow.validate(&msg, "extended-query cached execute", error.is_some())
801 {
802 if is_new && !flow.saw_parse_complete() {
803 self.prepared_statements.remove(&stmt_name);
804 }
805 return return_with_desync(self, err);
806 }
807 match msg {
808 BackendMessage::ParseComplete => {
809 }
811 BackendMessage::BindComplete => {}
812 BackendMessage::RowDescription(_) => {}
813 BackendMessage::DataRow(data) => {
814 if error.is_none() {
815 rows.push(data);
816 }
817 }
818 BackendMessage::CommandComplete(_) => {}
819 BackendMessage::NoData => {}
820 BackendMessage::ReadyForQuery(_) => {
821 if let Some(err) = error {
822 if is_new
823 && !flow.saw_parse_complete()
824 && !err.is_prepared_statement_already_exists()
825 {
826 self.prepared_statements.remove(&stmt_name);
827 }
828 return Err(err);
829 }
830 if is_new && !flow.saw_parse_complete() {
831 self.prepared_statements.remove(&stmt_name);
832 return return_with_desync(
833 self,
834 PgError::Protocol(
835 "Cache miss query reached ReadyForQuery without ParseComplete"
836 .to_string(),
837 ),
838 );
839 }
840 return Ok(rows);
841 }
842 BackendMessage::ErrorResponse(err) => {
843 if error.is_none() {
844 let query_err = PgError::QueryServer(err.into());
845 if query_err.is_prepared_statement_retryable()
846 || (is_new
847 && !flow.saw_parse_complete()
848 && !query_err.is_prepared_statement_already_exists())
849 {
850 self.prepared_statements.remove(&stmt_name);
855 }
856 error = Some(query_err);
857 }
858 }
859 msg if is_ignorable_session_message(&msg) => {}
860 other => {
861 if is_new && !flow.saw_parse_complete() {
862 self.prepared_statements.remove(&stmt_name);
863 }
864 return return_with_desync(
865 self,
866 unexpected_backend_message("extended-query cached execute", &other),
867 );
868 }
869 }
870 }
871 }
872
873 pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
876 super::prepared::sql_bytes_to_stmt_name(sql.as_bytes())
877 }
878
879 pub async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
881 let bytes = PgEncoder::try_encode_query_string(sql)?;
882 self.write_all_with_timeout(&bytes, "stream write").await?;
883
884 let mut error: Option<PgError> = None;
885 let mut flow = SimpleFlowTracker::new();
886
887 loop {
888 let msg = self.recv().await?;
889 match msg {
890 BackendMessage::RowDescription(_) => {
891 if let Err(err) = flow.on_row_description("simple-query execute") {
895 return return_with_desync(self, err);
896 }
897 }
898 BackendMessage::DataRow(_) => {
899 if let Err(err) = flow.on_data_row("simple-query execute") {
900 return return_with_desync(self, err);
901 }
902 }
903 BackendMessage::CommandComplete(_) => {
904 flow.on_command_complete();
905 }
906 BackendMessage::EmptyQueryResponse => {
907 if let Err(err) = flow.on_empty_query_response("simple-query execute") {
908 return return_with_desync(self, err);
909 }
910 }
911 BackendMessage::ReadyForQuery(_) => {
912 if let Some(err) = error {
913 return Err(err);
914 }
915 if let Err(err) =
916 flow.on_ready_for_query("simple-query execute", error.is_some())
917 {
918 return return_with_desync(self, err);
919 }
920 return Ok(());
921 }
922 BackendMessage::ErrorResponse(err) => {
923 if error.is_none() {
924 error = Some(PgError::QueryServer(err.into()));
925 }
926 }
927 msg if is_ignorable_session_message(&msg) => {}
928 other => {
929 return return_with_desync(
930 self,
931 unexpected_backend_message("simple-query execute", &other),
932 );
933 }
934 }
935 }
936 }
937
938 pub async fn simple_query(&mut self, sql: &str) -> PgResult<Vec<super::PgRow>> {
945 use std::sync::Arc;
946
947 const MAX_SIMPLE_QUERY_ROWS: usize = 10_000;
950
951 let bytes = PgEncoder::try_encode_query_string(sql)?;
952 self.write_all_with_timeout(&bytes, "stream write").await?;
953
954 let mut rows: Vec<super::PgRow> = Vec::new();
955 let mut column_info: Option<Arc<super::ColumnInfo>> = None;
956 let mut error: Option<PgError> = None;
957 let mut flow = SimpleFlowTracker::new();
958
959 loop {
960 let msg = self.recv().await?;
961 match msg {
962 BackendMessage::RowDescription(fields) => {
963 if let Err(err) = flow.on_row_description("simple-query read") {
964 return return_with_desync(self, err);
965 }
966 column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
967 }
968 BackendMessage::DataRow(data) => {
969 if let Err(err) = flow.on_data_row("simple-query read") {
970 return return_with_desync(self, err);
971 }
972 if error.is_none() {
973 if rows.len() >= MAX_SIMPLE_QUERY_ROWS {
974 if error.is_none() {
975 error = Some(PgError::Query(format!(
976 "simple_query exceeded {} row safety cap",
977 MAX_SIMPLE_QUERY_ROWS,
978 )));
979 }
980 } else {
982 rows.push(super::PgRow {
983 columns: data,
984 column_info: column_info.clone(),
985 });
986 }
987 }
988 }
989 BackendMessage::CommandComplete(_) => {
990 flow.on_command_complete();
991 column_info = None;
992 }
993 BackendMessage::EmptyQueryResponse => {
994 if let Err(err) = flow.on_empty_query_response("simple-query read") {
995 return return_with_desync(self, err);
996 }
997 column_info = None;
998 }
999 BackendMessage::ReadyForQuery(_) => {
1000 if let Some(err) = error {
1001 return Err(err);
1002 }
1003 if let Err(err) = flow.on_ready_for_query("simple-query read", error.is_some())
1004 {
1005 return return_with_desync(self, err);
1006 }
1007 return Ok(rows);
1008 }
1009 BackendMessage::ErrorResponse(err) => {
1010 if error.is_none() {
1011 error = Some(PgError::QueryServer(err.into()));
1012 }
1013 }
1014 msg if is_ignorable_session_message(&msg) => {}
1015 other => {
1016 return return_with_desync(
1017 self,
1018 unexpected_backend_message("simple-query read", &other),
1019 );
1020 }
1021 }
1022 }
1023 }
1024
1025 #[inline]
1038 pub async fn query_prepared_single(
1039 &mut self,
1040 stmt: &super::PreparedStatement,
1041 params: &[Option<Vec<u8>>],
1042 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1043 self.query_prepared_single_with_result_format(stmt, params, PgEncoder::FORMAT_TEXT)
1044 .await
1045 }
1046
1047 #[inline]
1053 pub async fn query_prepared_single_count(
1054 &mut self,
1055 stmt: &super::PreparedStatement,
1056 params: &[Option<Vec<u8>>],
1057 ) -> PgResult<()> {
1058 self.write_buf.clear();
1059 PgEncoder::encode_bind_to(&mut self.write_buf, &stmt.name, params)
1060 .map_err(|e| PgError::Encode(e.to_string()))?;
1061 PgEncoder::encode_execute_to(&mut self.write_buf);
1062 PgEncoder::encode_sync_to(&mut self.write_buf);
1063
1064 self.flush_write_buf().await?;
1065
1066 let mut error: Option<PgError> = None;
1067 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1068
1069 loop {
1070 match self.recv_msg_type_fast().await {
1071 Ok(msg_type) => {
1072 if let Err(err) = flow.validate_msg_type(
1073 msg_type,
1074 "prepared single count execute",
1075 error.is_some(),
1076 ) {
1077 return return_with_desync(self, err);
1078 }
1079 match msg_type {
1080 b'2' | b'T' | b'D' | b'C' | b'n' => {}
1081 b'Z' => {
1082 if let Some(err) = error {
1083 return Err(err);
1084 }
1085 return Ok(());
1086 }
1087 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1088 other => {
1089 return return_with_desync(
1090 self,
1091 unexpected_backend_msg_type("prepared single count execute", other),
1092 );
1093 }
1094 }
1095 }
1096 Err(e) => {
1097 if matches!(&e, PgError::QueryServer(_)) {
1098 capture_query_server_error(self, &mut error, e);
1099 continue;
1100 }
1101 return Err(e);
1102 }
1103 }
1104 }
1105 }
1106
1107 #[inline]
1109 pub async fn query_prepared_single_with_result_format(
1110 &mut self,
1111 stmt: &super::PreparedStatement,
1112 params: &[Option<Vec<u8>>],
1113 result_format: i16,
1114 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1115 let needed = prepared_bind_execute_sync_wire_len(&stmt.name, params, result_format)?;
1116 let mut buf = BytesMut::with_capacity(needed);
1117
1118 PgEncoder::encode_bind_to_with_result_format(&mut buf, &stmt.name, params, result_format)
1120 .map_err(|e| PgError::Encode(e.to_string()))?;
1121 PgEncoder::encode_execute_to(&mut buf);
1122 PgEncoder::encode_sync_to(&mut buf);
1123
1124 self.write_all_with_timeout(&buf, "stream write").await?;
1125
1126 let mut rows = Vec::new();
1127
1128 let mut error: Option<PgError> = None;
1129 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1130
1131 loop {
1132 let msg = self.recv().await?;
1133 if let Err(err) = flow.validate(&msg, "prepared single execute", error.is_some()) {
1134 return return_with_desync(self, err);
1135 }
1136 match msg {
1137 BackendMessage::BindComplete => {}
1138 BackendMessage::RowDescription(_) => {}
1139 BackendMessage::DataRow(data) => {
1140 if error.is_none() {
1141 rows.push(data);
1142 }
1143 }
1144 BackendMessage::CommandComplete(_) => {}
1145 BackendMessage::NoData => {}
1146 BackendMessage::ReadyForQuery(_) => {
1147 if let Some(err) = error {
1148 return Err(err);
1149 }
1150 return Ok(rows);
1151 }
1152 BackendMessage::ErrorResponse(err) => {
1153 capture_query_server_error(self, &mut error, PgError::QueryServer(err.into()));
1154 }
1155 msg if is_ignorable_session_message(&msg) => {}
1156 other => {
1157 return return_with_desync(
1158 self,
1159 unexpected_backend_message("prepared single execute", &other),
1160 );
1161 }
1162 }
1163 }
1164 }
1165
1166 #[inline]
1169 pub async fn query_prepared_single_reuse_with_result_format(
1170 &mut self,
1171 stmt: &super::PreparedStatement,
1172 params: &[Option<Vec<u8>>],
1173 result_format: i16,
1174 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1175 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1176
1177 PgEncoder::encode_bind_to_with_result_format(
1178 &mut self.write_buf,
1179 &stmt.name,
1180 params,
1181 result_format,
1182 )
1183 .map_err(|e| PgError::Encode(e.to_string()))?;
1184 PgEncoder::encode_execute_to(&mut self.write_buf);
1185 PgEncoder::encode_sync_to(&mut self.write_buf);
1186
1187 self.flush_write_buf().await?;
1188
1189 let mut rows = Vec::with_capacity(32);
1190 let mut error: Option<PgError> = None;
1191 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1192
1193 loop {
1194 let msg = self.recv().await?;
1195 if let Err(err) = flow.validate(&msg, "prepared single reuse execute", error.is_some())
1196 {
1197 return return_with_desync(self, err);
1198 }
1199 match msg {
1200 BackendMessage::BindComplete => {}
1201 BackendMessage::RowDescription(_) => {}
1202 BackendMessage::DataRow(data) => {
1203 if error.is_none() {
1204 rows.push(data);
1205 }
1206 }
1207 BackendMessage::CommandComplete(_) => {}
1208 BackendMessage::NoData => {}
1209 BackendMessage::ReadyForQuery(_) => {
1210 if let Some(err) = error {
1211 return Err(err);
1212 }
1213 return Ok(rows);
1214 }
1215 BackendMessage::ErrorResponse(err) => {
1216 capture_query_server_error(self, &mut error, PgError::QueryServer(err.into()));
1217 }
1218 msg if is_ignorable_session_message(&msg) => {}
1219 other => {
1220 return return_with_desync(
1221 self,
1222 unexpected_backend_message("prepared single reuse execute", &other),
1223 );
1224 }
1225 }
1226 }
1227 }
1228
1229 #[inline]
1234 pub async fn query_prepared_single_reuse_visit_rows_with_result_format<F>(
1235 &mut self,
1236 stmt: &super::PreparedStatement,
1237 params: &[Option<Vec<u8>>],
1238 result_format: i16,
1239 mut on_row: F,
1240 ) -> PgResult<usize>
1241 where
1242 F: FnMut(&[Option<Vec<u8>>]) -> PgResult<()>,
1243 {
1244 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1245
1246 PgEncoder::encode_bind_to_with_result_format(
1247 &mut self.write_buf,
1248 &stmt.name,
1249 params,
1250 result_format,
1251 )
1252 .map_err(|e| PgError::Encode(e.to_string()))?;
1253 PgEncoder::encode_execute_to(&mut self.write_buf);
1254 PgEncoder::encode_sync_to(&mut self.write_buf);
1255
1256 self.flush_write_buf().await?;
1257
1258 let mut row_count = 0usize;
1259 let mut row_buf: Vec<Option<Vec<u8>>> = Vec::new();
1260 let mut error: Option<PgError> = None;
1261 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1262
1263 loop {
1264 match self.recv_fill_data_row_fast(&mut row_buf).await {
1265 Ok(msg_type) => {
1266 if let Err(err) = flow.validate_msg_type(
1267 msg_type,
1268 "prepared single reuse visit execute",
1269 error.is_some(),
1270 ) {
1271 return return_with_desync(self, err);
1272 }
1273 match msg_type {
1274 b'2' | b'T' | b'n' => {}
1275 b'D' => {
1276 if error.is_none() {
1277 if let Err(err) = on_row(row_buf.as_slice()) {
1278 return return_callback_error_with_desync(self, err);
1279 }
1280 row_count += 1;
1281 }
1282 }
1283 b'C' => {}
1284 b'Z' => {
1285 if let Some(err) = error {
1286 return Err(err);
1287 }
1288 return Ok(row_count);
1289 }
1290 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1291 other => {
1292 return return_with_desync(
1293 self,
1294 unexpected_backend_msg_type(
1295 "prepared single reuse visit execute",
1296 other,
1297 ),
1298 );
1299 }
1300 }
1301 }
1302 Err(e) => {
1303 if matches!(&e, PgError::QueryServer(_)) {
1304 capture_query_server_error(self, &mut error, e);
1305 continue;
1306 }
1307 return Err(e);
1308 }
1309 }
1310 }
1311 }
1312
1313 #[inline]
1318 pub async fn query_prepared_single_reuse_visit_bytes_rows_with_result_format<F>(
1319 &mut self,
1320 stmt: &super::PreparedStatement,
1321 params: &[Option<Vec<u8>>],
1322 result_format: i16,
1323 mut on_row: F,
1324 ) -> PgResult<usize>
1325 where
1326 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1327 {
1328 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1329
1330 PgEncoder::encode_bind_to_with_result_format(
1331 &mut self.write_buf,
1332 &stmt.name,
1333 params,
1334 result_format,
1335 )
1336 .map_err(|e| PgError::Encode(e.to_string()))?;
1337 PgEncoder::encode_execute_to(&mut self.write_buf);
1338 PgEncoder::encode_sync_to(&mut self.write_buf);
1339
1340 self.flush_write_buf().await?;
1341
1342 let mut row_count = 0usize;
1343 let mut row = super::PgBytesRow::default();
1344 let mut error: Option<PgError> = None;
1345 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1346
1347 loop {
1348 match self.recv_fill_zerocopy_row_fast(&mut row).await {
1349 Ok(msg_type) => {
1350 if let Err(err) = flow.validate_msg_type(
1351 msg_type,
1352 "prepared single reuse visit bytes execute",
1353 error.is_some(),
1354 ) {
1355 return return_with_desync(self, err);
1356 }
1357 match msg_type {
1358 b'2' | b'T' | b'n' => {}
1359 b'D' => {
1360 if error.is_none() {
1361 if let Err(err) = on_row(&row) {
1362 return return_callback_error_with_desync(self, err);
1363 }
1364 row_count += 1;
1365 row.release_payload();
1366 }
1367 }
1368 b'C' => {}
1369 b'Z' => {
1370 if let Some(err) = error {
1371 return Err(err);
1372 }
1373 return Ok(row_count);
1374 }
1375 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1376 other => {
1377 return return_with_desync(
1378 self,
1379 unexpected_backend_msg_type(
1380 "prepared single reuse visit bytes execute",
1381 other,
1382 ),
1383 );
1384 }
1385 }
1386 }
1387 Err(e) => {
1388 if matches!(&e, PgError::QueryServer(_)) {
1389 capture_query_server_error(self, &mut error, e);
1390 continue;
1391 }
1392 return Err(e);
1393 }
1394 }
1395 }
1396 }
1397
1398 #[inline]
1400 pub async fn query_prepared_single_reuse_visit_first_column_bytes_with_result_format<F>(
1401 &mut self,
1402 stmt: &super::PreparedStatement,
1403 params: &[Option<Vec<u8>>],
1404 result_format: i16,
1405 mut on_value: F,
1406 ) -> PgResult<usize>
1407 where
1408 F: FnMut(Option<&[u8]>) -> PgResult<()>,
1409 {
1410 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1411
1412 PgEncoder::encode_bind_to_with_result_format(
1413 &mut self.write_buf,
1414 &stmt.name,
1415 params,
1416 result_format,
1417 )
1418 .map_err(|e| PgError::Encode(e.to_string()))?;
1419 PgEncoder::encode_execute_to(&mut self.write_buf);
1420 PgEncoder::encode_sync_to(&mut self.write_buf);
1421
1422 self.flush_write_buf().await?;
1423
1424 let mut row_count = 0usize;
1425 let mut first_column: Option<Bytes> = None;
1426 let mut error: Option<PgError> = None;
1427 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1428
1429 loop {
1430 match self
1431 .recv_fill_first_column_zerocopy_fast(&mut first_column)
1432 .await
1433 {
1434 Ok(msg_type) => {
1435 if let Err(err) = flow.validate_msg_type(
1436 msg_type,
1437 "prepared single reuse visit first-column execute",
1438 error.is_some(),
1439 ) {
1440 return return_with_desync(self, err);
1441 }
1442 match msg_type {
1443 b'2' | b'T' | b'n' => {}
1444 b'D' => {
1445 if error.is_none() {
1446 if let Err(err) = on_value(first_column.as_deref()) {
1447 return return_callback_error_with_desync(self, err);
1448 }
1449 row_count += 1;
1450 first_column = None;
1451 }
1452 }
1453 b'C' => {}
1454 b'Z' => {
1455 if let Some(err) = error {
1456 return Err(err);
1457 }
1458 return Ok(row_count);
1459 }
1460 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1461 other => {
1462 return return_with_desync(
1463 self,
1464 unexpected_backend_msg_type(
1465 "prepared single reuse visit first-column execute",
1466 other,
1467 ),
1468 );
1469 }
1470 }
1471 }
1472 Err(e) => {
1473 if matches!(&e, PgError::QueryServer(_)) {
1474 capture_query_server_error(self, &mut error, e);
1475 continue;
1476 }
1477 return Err(e);
1478 }
1479 }
1480 }
1481 }
1482
1483 #[inline]
1485 pub async fn query_prepared_single_reuse_visit_first_four_columns_bytes_with_result_format<F>(
1486 &mut self,
1487 stmt: &super::PreparedStatement,
1488 params: &[Option<Vec<u8>>],
1489 result_format: i16,
1490 mut on_row: F,
1491 ) -> PgResult<usize>
1492 where
1493 F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1494 {
1495 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1496
1497 PgEncoder::encode_bind_to_with_result_format(
1498 &mut self.write_buf,
1499 &stmt.name,
1500 params,
1501 result_format,
1502 )
1503 .map_err(|e| PgError::Encode(e.to_string()))?;
1504 PgEncoder::encode_execute_to(&mut self.write_buf);
1505 PgEncoder::encode_sync_to(&mut self.write_buf);
1506
1507 self.flush_write_buf().await?;
1508
1509 let mut row_count = 0usize;
1510 let mut columns = [None, None, None, None];
1511 let mut error: Option<PgError> = None;
1512 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1513
1514 loop {
1515 match self
1516 .recv_fill_first_four_columns_zerocopy_fast(&mut columns)
1517 .await
1518 {
1519 Ok(msg_type) => {
1520 if let Err(err) = flow.validate_msg_type(
1521 msg_type,
1522 "prepared single reuse visit first-four execute",
1523 error.is_some(),
1524 ) {
1525 return return_with_desync(self, err);
1526 }
1527 match msg_type {
1528 b'2' | b'T' | b'n' => {}
1529 b'D' => {
1530 if error.is_none() {
1531 if let Err(err) = on_row([
1532 columns[0].as_deref(),
1533 columns[1].as_deref(),
1534 columns[2].as_deref(),
1535 columns[3].as_deref(),
1536 ]) {
1537 return return_callback_error_with_desync(self, err);
1538 }
1539 columns.fill(None);
1540 row_count += 1;
1541 }
1542 }
1543 b'C' => {}
1544 b'Z' => {
1545 if let Some(err) = error {
1546 return Err(err);
1547 }
1548 return Ok(row_count);
1549 }
1550 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1551 other => {
1552 return return_with_desync(
1553 self,
1554 unexpected_backend_msg_type(
1555 "prepared single reuse visit first-four execute",
1556 other,
1557 ),
1558 );
1559 }
1560 }
1561 }
1562 Err(e) => {
1563 if matches!(&e, PgError::QueryServer(_)) {
1564 capture_query_server_error(self, &mut error, e);
1565 continue;
1566 }
1567 return Err(e);
1568 }
1569 }
1570 }
1571 }
1572
1573 #[inline]
1577 pub async fn query_prepared_single_encoded_visit_bytes_rows<F>(
1578 &mut self,
1579 wire: &[u8],
1580 on_row: F,
1581 ) -> PgResult<usize>
1582 where
1583 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1584 {
1585 let (row_count, _, _) = self
1586 .query_prepared_single_encoded_visit_bytes_rows_profiled(wire, on_row)
1587 .await?;
1588 Ok(row_count)
1589 }
1590
1591 #[inline]
1595 pub async fn query_prepared_single_encoded_visit_bytes_rows_profiled<F>(
1596 &mut self,
1597 wire: &[u8],
1598 mut on_row: F,
1599 ) -> PgResult<(usize, Duration, Duration)>
1600 where
1601 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1602 {
1603 let send_start = Instant::now();
1604 self.send_bytes(wire).await?;
1605 let send_elapsed = send_start.elapsed();
1606 let consume_start = Instant::now();
1607
1608 let mut row_count = 0usize;
1609 let mut row = super::PgBytesRow::default();
1610 let mut error: Option<PgError> = None;
1611 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1612
1613 loop {
1614 match self.recv_fill_zerocopy_row_fast(&mut row).await {
1615 Ok(msg_type) => {
1616 if let Err(err) = flow.validate_msg_type(
1617 msg_type,
1618 "prepared single encoded visit bytes execute",
1619 error.is_some(),
1620 ) {
1621 return return_with_desync(self, err);
1622 }
1623 match msg_type {
1624 b'2' | b'T' | b'n' => {}
1625 b'D' => {
1626 if error.is_none() {
1627 if let Err(err) = on_row(&row) {
1628 return return_callback_error_with_desync(self, err);
1629 }
1630 row_count += 1;
1631 row.release_payload();
1632 }
1633 }
1634 b'C' => {}
1635 b'Z' => {
1636 if let Some(err) = error {
1637 return Err(err);
1638 }
1639 return Ok((row_count, send_elapsed, consume_start.elapsed()));
1640 }
1641 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1642 other => {
1643 return return_with_desync(
1644 self,
1645 unexpected_backend_msg_type(
1646 "prepared single encoded visit bytes execute",
1647 other,
1648 ),
1649 );
1650 }
1651 }
1652 }
1653 Err(e) => {
1654 if matches!(&e, PgError::QueryServer(_)) {
1655 capture_query_server_error(self, &mut error, e);
1656 continue;
1657 }
1658 return Err(e);
1659 }
1660 }
1661 }
1662 }
1663
1664 #[inline]
1666 pub async fn query_prepared_single_encoded_visit_first_column_bytes<F>(
1667 &mut self,
1668 wire: &[u8],
1669 on_value: F,
1670 ) -> PgResult<usize>
1671 where
1672 F: FnMut(Option<&[u8]>) -> PgResult<()>,
1673 {
1674 let (row_count, _, _) = self
1675 .query_prepared_single_encoded_visit_first_column_bytes_profiled(wire, on_value)
1676 .await?;
1677 Ok(row_count)
1678 }
1679
1680 #[inline]
1684 pub async fn query_prepared_single_encoded_visit_first_column_bytes_profiled<F>(
1685 &mut self,
1686 wire: &[u8],
1687 mut on_value: F,
1688 ) -> PgResult<(usize, Duration, Duration)>
1689 where
1690 F: FnMut(Option<&[u8]>) -> PgResult<()>,
1691 {
1692 let send_start = Instant::now();
1693 self.send_bytes(wire).await?;
1694 let send_elapsed = send_start.elapsed();
1695 let consume_start = Instant::now();
1696
1697 let mut row_count = 0usize;
1698 let mut first_column: Option<Bytes> = None;
1699 let mut error: Option<PgError> = None;
1700 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1701
1702 loop {
1703 match self
1704 .recv_fill_first_column_zerocopy_fast(&mut first_column)
1705 .await
1706 {
1707 Ok(msg_type) => {
1708 if let Err(err) = flow.validate_msg_type(
1709 msg_type,
1710 "prepared single encoded visit first-column execute",
1711 error.is_some(),
1712 ) {
1713 return return_with_desync(self, err);
1714 }
1715 match msg_type {
1716 b'2' | b'T' | b'n' => {}
1717 b'D' => {
1718 if error.is_none() {
1719 if let Err(err) = on_value(first_column.as_deref()) {
1720 return return_callback_error_with_desync(self, err);
1721 }
1722 row_count += 1;
1723 first_column = None;
1724 }
1725 }
1726 b'C' => {}
1727 b'Z' => {
1728 if let Some(err) = error {
1729 return Err(err);
1730 }
1731 return Ok((row_count, send_elapsed, consume_start.elapsed()));
1732 }
1733 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1734 other => {
1735 return return_with_desync(
1736 self,
1737 unexpected_backend_msg_type(
1738 "prepared single encoded visit first-column execute",
1739 other,
1740 ),
1741 );
1742 }
1743 }
1744 }
1745 Err(e) => {
1746 if matches!(&e, PgError::QueryServer(_)) {
1747 capture_query_server_error(self, &mut error, e);
1748 continue;
1749 }
1750 return Err(e);
1751 }
1752 }
1753 }
1754 }
1755
1756 #[inline]
1758 pub async fn query_prepared_single_encoded_visit_first_four_columns_bytes<F>(
1759 &mut self,
1760 wire: &[u8],
1761 on_row: F,
1762 ) -> PgResult<usize>
1763 where
1764 F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1765 {
1766 let (row_count, _, _) = self
1767 .query_prepared_single_encoded_visit_first_four_columns_bytes_profiled(wire, on_row)
1768 .await?;
1769 Ok(row_count)
1770 }
1771
1772 #[inline]
1776 pub async fn query_prepared_single_encoded_visit_first_four_columns_bytes_profiled<F>(
1777 &mut self,
1778 wire: &[u8],
1779 mut on_row: F,
1780 ) -> PgResult<(usize, Duration, Duration)>
1781 where
1782 F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1783 {
1784 let send_start = Instant::now();
1785 self.send_bytes(wire).await?;
1786 let send_elapsed = send_start.elapsed();
1787 let consume_start = Instant::now();
1788
1789 let mut row_count = 0usize;
1790 let mut columns = [None, None, None, None];
1791 let mut error: Option<PgError> = None;
1792 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1793
1794 loop {
1795 match self
1796 .recv_fill_first_four_columns_zerocopy_fast(&mut columns)
1797 .await
1798 {
1799 Ok(msg_type) => {
1800 if let Err(err) = flow.validate_msg_type(
1801 msg_type,
1802 "prepared single encoded visit first-four execute",
1803 error.is_some(),
1804 ) {
1805 return return_with_desync(self, err);
1806 }
1807 match msg_type {
1808 b'2' | b'T' | b'n' => {}
1809 b'D' => {
1810 if error.is_none() {
1811 if let Err(err) = on_row([
1812 columns[0].as_deref(),
1813 columns[1].as_deref(),
1814 columns[2].as_deref(),
1815 columns[3].as_deref(),
1816 ]) {
1817 return return_callback_error_with_desync(self, err);
1818 }
1819 columns.fill(None);
1820 row_count += 1;
1821 }
1822 }
1823 b'C' => {}
1824 b'Z' => {
1825 if let Some(err) = error {
1826 return Err(err);
1827 }
1828 return Ok((row_count, send_elapsed, consume_start.elapsed()));
1829 }
1830 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1831 other => {
1832 return return_with_desync(
1833 self,
1834 unexpected_backend_msg_type(
1835 "prepared single encoded visit first-four execute",
1836 other,
1837 ),
1838 );
1839 }
1840 }
1841 }
1842 Err(e) => {
1843 if matches!(&e, PgError::QueryServer(_)) {
1844 capture_query_server_error(self, &mut error, e);
1845 continue;
1846 }
1847 return Err(e);
1848 }
1849 }
1850 }
1851 }
1852}
1853
1854#[cfg(test)]
1855mod tests {
1856 use super::*;
1857
1858 #[cfg(unix)]
1859 fn test_conn_with_peer() -> (PgConnection, tokio::net::UnixStream) {
1860 use crate::driver::connection::StatementCache;
1861 use crate::driver::stream::PgStream;
1862 use bytes::BytesMut;
1863 use std::collections::{HashMap, VecDeque};
1864 use std::num::NonZeroUsize;
1865 use tokio::net::UnixStream;
1866
1867 let (unix_stream, peer) = UnixStream::pair().expect("unix stream pair");
1868 (
1869 PgConnection {
1870 stream: PgStream::Unix(unix_stream),
1871 buffer: BytesMut::with_capacity(1024),
1872 write_buf: BytesMut::with_capacity(1024),
1873 sql_buf: BytesMut::with_capacity(256),
1874 params_buf: Vec::new(),
1875 prepared_statements: HashMap::new(),
1876 stmt_cache: StatementCache::new(NonZeroUsize::new(2).expect("non-zero")),
1877 column_info_cache: HashMap::new(),
1878 process_id: 0,
1879 cancel_key_bytes: Vec::new(),
1880 requested_protocol_minor: PgConnection::default_protocol_minor(),
1881 negotiated_protocol_minor: PgConnection::default_protocol_minor(),
1882 notifications: VecDeque::new(),
1883 replication_stream_active: false,
1884 replication_mode_enabled: false,
1885 last_replication_wal_end: None,
1886 io_desynced: false,
1887 pending_statement_closes: Vec::new(),
1888 draining_statement_closes: false,
1889 },
1890 peer,
1891 )
1892 }
1893
1894 #[cfg(unix)]
1895 fn test_conn() -> PgConnection {
1896 test_conn_with_peer().0
1897 }
1898
1899 #[cfg(unix)]
1900 fn push_backend_frame(conn: &mut PgConnection, msg_type: u8, payload: &[u8]) {
1901 conn.buffer.extend_from_slice(&[msg_type]);
1902 conn.buffer
1903 .extend_from_slice(&((payload.len() + 4) as u32).to_be_bytes());
1904 conn.buffer.extend_from_slice(payload);
1905 }
1906
1907 fn error_response_payload(code: &str, message: &str) -> Vec<u8> {
1908 let mut payload = Vec::new();
1909 payload.push(b'S');
1910 payload.extend_from_slice(b"ERROR\0");
1911 payload.push(b'C');
1912 payload.extend_from_slice(code.as_bytes());
1913 payload.push(0);
1914 payload.push(b'M');
1915 payload.extend_from_slice(message.as_bytes());
1916 payload.push(0);
1917 payload.push(0);
1918 payload
1919 }
1920
1921 #[test]
1922 fn prepared_buffer_sizing_rejects_too_many_params_before_allocation() {
1923 let params = vec![None; i16::MAX as usize + 1];
1924 let err = prepared_bind_execute_sync_wire_len("stmt", ¶ms, PgEncoder::FORMAT_TEXT)
1925 .expect_err("parameter overflow must be rejected");
1926
1927 assert!(matches!(err, PgError::Encode(msg) if msg.contains("Too many parameters")));
1928 }
1929
1930 #[test]
1931 fn sql_to_stmt_name_matches_prepared_statement_identity() {
1932 let sql = "SELECT id, name FROM users WHERE id = $1";
1933 let stmt = super::super::PreparedStatement::from_sql(sql);
1934 assert_eq!(PgConnection::sql_to_stmt_name(sql), stmt.name());
1935 }
1936
1937 #[cfg(unix)]
1938 #[tokio::test]
1939 async fn streaming_callback_error_marks_query_connection_desynced() {
1940 let mut conn = test_conn();
1941
1942 let err = return_callback_error_with_desync::<()>(
1943 &mut conn,
1944 PgError::Query("consumer stopped".to_string()),
1945 )
1946 .expect_err("callback error should be returned");
1947
1948 assert!(matches!(err, PgError::Query(msg) if msg == "consumer stopped"));
1949 assert!(conn.is_io_desynced());
1950 }
1951
1952 #[cfg(unix)]
1953 #[tokio::test]
1954 async fn protocol_order_error_marks_query_connection_desynced() {
1955 let (mut conn, _peer) = test_conn_with_peer();
1956 push_backend_frame(&mut conn, b'D', &0i16.to_be_bytes());
1957 let stmt = super::super::PreparedStatement::from_sql("SELECT 1");
1958
1959 let err = conn
1960 .query_prepared_single_count(&stmt, &[])
1961 .await
1962 .expect_err("out-of-order DataRow must fail");
1963
1964 assert!(err.to_string().contains("DataRow before BindComplete"));
1965 assert!(conn.is_io_desynced());
1966 }
1967
1968 #[cfg(unix)]
1969 #[tokio::test]
1970 async fn simple_flow_error_marks_query_connection_desynced() {
1971 let (mut conn, _peer) = test_conn_with_peer();
1972 push_backend_frame(&mut conn, b'Z', b"I");
1973
1974 let err = conn
1975 .execute_simple("SELECT 1")
1976 .await
1977 .expect_err("ReadyForQuery before completion must fail");
1978
1979 assert!(err.to_string().contains("ReadyForQuery before completion"));
1980 assert!(conn.is_io_desynced());
1981 }
1982
1983 #[cfg(unix)]
1984 #[tokio::test]
1985 async fn query_cached_keeps_statement_after_post_parse_error() {
1986 let (mut conn, _peer) = test_conn_with_peer();
1987 let sql = "SELECT $1";
1988 let stmt_name = PgConnection::sql_to_stmt_name(sql);
1989 let err_payload = error_response_payload("23514", "check constraint violation");
1990
1991 push_backend_frame(&mut conn, b'1', &[]);
1992 push_backend_frame(&mut conn, b'E', &err_payload);
1993 push_backend_frame(&mut conn, b'Z', b"I");
1994
1995 let err = conn
1996 .query_cached(sql, &[Some(b"bad".to_vec())])
1997 .await
1998 .expect_err("execution-stage error should be returned");
1999
2000 assert!(matches!(err, PgError::QueryServer(_)));
2001 assert!(conn.prepared_statements.contains_key(&stmt_name));
2002 assert!(!conn.is_io_desynced());
2003 }
2004
2005 #[cfg(unix)]
2006 #[tokio::test]
2007 async fn query_cached_removes_new_statement_when_parse_fails() {
2008 let (mut conn, _peer) = test_conn_with_peer();
2009 let sql = "SELECT broken";
2010 let stmt_name = PgConnection::sql_to_stmt_name(sql);
2011 let err_payload = error_response_payload("42601", "syntax error");
2012
2013 push_backend_frame(&mut conn, b'E', &err_payload);
2014 push_backend_frame(&mut conn, b'Z', b"I");
2015
2016 let err = conn
2017 .query_cached(sql, &[])
2018 .await
2019 .expect_err("parse-stage error should be returned");
2020
2021 assert!(matches!(err, PgError::QueryServer(_)));
2022 assert!(!conn.prepared_statements.contains_key(&stmt_name));
2023 assert!(!conn.is_io_desynced());
2024 }
2025
2026 #[cfg(unix)]
2027 #[tokio::test]
2028 async fn query_prepared_single_clears_state_on_missing_statement() {
2029 let (mut conn, _peer) = test_conn_with_peer();
2030 let stmt = super::super::PreparedStatement::from_sql("SELECT 1");
2031 let stmt_name = stmt.name().to_string();
2032 conn.prepared_statements
2033 .insert(stmt_name.clone(), "SELECT 1".to_string());
2034 let err_payload = error_response_payload(
2035 "26000",
2036 &format!("prepared statement \"{}\" does not exist", stmt_name),
2037 );
2038
2039 push_backend_frame(&mut conn, b'E', &err_payload);
2040 push_backend_frame(&mut conn, b'Z', b"I");
2041
2042 let err = conn
2043 .query_prepared_single(&stmt, &[])
2044 .await
2045 .expect_err("missing server statement should be returned");
2046
2047 assert!(matches!(err, PgError::QueryServer(_)));
2048 assert!(conn.prepared_statements.is_empty());
2049 assert!(!conn.is_io_desynced());
2050 }
2051
2052 #[cfg(unix)]
2053 #[tokio::test]
2054 async fn query_prepared_single_reuse_clears_state_on_missing_statement() {
2055 let (mut conn, _peer) = test_conn_with_peer();
2056 let stmt = super::super::PreparedStatement::from_sql("SELECT 1");
2057 let stmt_name = stmt.name().to_string();
2058 conn.prepared_statements
2059 .insert(stmt_name.clone(), "SELECT 1".to_string());
2060 let err_payload = error_response_payload(
2061 "26000",
2062 &format!("prepared statement \"{}\" does not exist", stmt_name),
2063 );
2064
2065 push_backend_frame(&mut conn, b'E', &err_payload);
2066 push_backend_frame(&mut conn, b'Z', b"I");
2067
2068 let err = conn
2069 .query_prepared_single_reuse_with_result_format(&stmt, &[], PgEncoder::FORMAT_TEXT)
2070 .await
2071 .expect_err("missing server statement should be returned");
2072
2073 assert!(matches!(err, PgError::QueryServer(_)));
2074 assert!(conn.prepared_statements.is_empty());
2075 assert!(!conn.is_io_desynced());
2076 }
2077}