1use super::{
6 PgConnection, PgError, PgResult,
7 extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8 is_ignorable_session_message, is_ignorable_session_msg_type, unexpected_backend_message,
9 unexpected_backend_msg_type,
10};
11use crate::protocol::{BackendMessage, PgEncoder};
12use bytes::{Bytes, BytesMut};
13use std::time::{Duration, Instant};
14
15#[inline]
16fn capture_query_server_error(conn: &mut PgConnection, slot: &mut Option<PgError>, err: PgError) {
17 if slot.is_some() {
18 return;
19 }
20 if err.is_prepared_statement_retryable() {
21 conn.clear_prepared_statement_state();
22 }
23 *slot = Some(err);
24}
25
26#[inline]
27fn return_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
28 if matches!(
29 err,
30 PgError::Protocol(_) | PgError::Connection(_) | PgError::Timeout(_)
31 ) {
32 conn.mark_io_desynced();
33 }
34 Err(err)
35}
36
37#[inline]
38fn return_callback_error_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
39 conn.mark_io_desynced();
40 Err(err)
41}
42
43#[inline]
44fn prepared_bind_execute_sync_wire_len(
45 statement: &str,
46 params: &[Option<Vec<u8>>],
47 result_format: i16,
48) -> PgResult<usize> {
49 let needed = PgEncoder::bind_execute_sync_wire_len_with_formats(
50 statement,
51 params,
52 PgEncoder::FORMAT_TEXT,
53 result_format,
54 )
55 .map_err(|e| PgError::Encode(e.to_string()))?;
56 Ok(needed)
57}
58
59#[inline]
60fn reserve_prepared_single_write_buf(
61 conn: &mut PgConnection,
62 stmt: &super::PreparedStatement,
63 params: &[Option<Vec<u8>>],
64 result_format: i16,
65) -> PgResult<()> {
66 conn.write_buf.clear();
67 let needed = prepared_bind_execute_sync_wire_len(&stmt.name, params, result_format)?;
68 conn.write_buf.reserve(needed);
69 Ok(())
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73enum SimpleStatementState {
74 AwaitingResult,
75 InRowStream,
76}
77
78#[derive(Debug, Clone, Copy)]
79struct SimpleFlowTracker {
80 state: SimpleStatementState,
81 saw_completion: bool,
82}
83
84impl SimpleFlowTracker {
85 fn new() -> Self {
86 Self {
87 state: SimpleStatementState::AwaitingResult,
88 saw_completion: false,
89 }
90 }
91
92 fn on_row_description(&mut self, context: &'static str) -> PgResult<()> {
93 if self.state == SimpleStatementState::InRowStream {
94 return Err(PgError::Protocol(format!(
95 "{}: duplicate RowDescription before statement completion",
96 context
97 )));
98 }
99 self.state = SimpleStatementState::InRowStream;
100 self.saw_completion = false;
101 Ok(())
102 }
103
104 fn on_data_row(&self, context: &'static str) -> PgResult<()> {
105 if self.state != SimpleStatementState::InRowStream {
106 return Err(PgError::Protocol(format!(
107 "{}: DataRow before RowDescription",
108 context
109 )));
110 }
111 Ok(())
112 }
113
114 fn on_command_complete(&mut self) {
115 self.state = SimpleStatementState::AwaitingResult;
116 self.saw_completion = true;
117 }
118
119 fn on_empty_query_response(&mut self, context: &'static str) -> PgResult<()> {
120 if self.state == SimpleStatementState::InRowStream {
121 return Err(PgError::Protocol(format!(
122 "{}: EmptyQueryResponse during active row stream",
123 context
124 )));
125 }
126 self.saw_completion = true;
127 Ok(())
128 }
129
130 fn on_ready_for_query(&self, context: &'static str, error_pending: bool) -> PgResult<()> {
131 if error_pending {
132 return Ok(());
133 }
134 if self.state == SimpleStatementState::InRowStream {
135 return Err(PgError::Protocol(format!(
136 "{}: ReadyForQuery before CommandComplete",
137 context
138 )));
139 }
140 if !self.saw_completion {
141 return Err(PgError::Protocol(format!(
142 "{}: ReadyForQuery before completion",
143 context
144 )));
145 }
146 Ok(())
147 }
148}
149
150impl PgConnection {
151 fn validate_param_type_arity(params: &[Option<Vec<u8>>], param_types: &[u32]) -> PgResult<()> {
152 if !param_types.is_empty() && param_types.len() != params.len() {
153 return Err(PgError::Encode(format!(
154 "parameter type count {} does not match parameter count {}",
155 param_types.len(),
156 params.len()
157 )));
158 }
159 Ok(())
160 }
161
162 pub(crate) async fn query(
168 &mut self,
169 sql: &str,
170 params: &[Option<Vec<u8>>],
171 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
172 self.query_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
173 .await
174 }
175
176 pub(crate) async fn query_with_result_format(
178 &mut self,
179 sql: &str,
180 params: &[Option<Vec<u8>>],
181 result_format: i16,
182 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
183 let bytes = PgEncoder::encode_extended_query_with_result_format(sql, params, result_format)
184 .map_err(|e| PgError::Encode(e.to_string()))?;
185 self.write_all_with_timeout(&bytes, "stream write").await?;
186
187 let mut rows = Vec::new();
188
189 let mut error: Option<PgError> = None;
190 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
191
192 loop {
193 let msg = self.recv().await?;
194 if let Err(err) = flow.validate(&msg, "extended-query execute", error.is_some()) {
195 return return_with_desync(self, err);
196 }
197 match msg {
198 BackendMessage::ParseComplete => {}
199 BackendMessage::BindComplete => {}
200 BackendMessage::RowDescription(_) => {}
201 BackendMessage::DataRow(data) => {
202 if error.is_none() {
204 rows.push(data);
205 }
206 }
207 BackendMessage::CommandComplete(_) => {}
208 BackendMessage::NoData => {}
209 BackendMessage::ReadyForQuery(_) => {
210 if let Some(err) = error {
211 return Err(err);
212 }
213 return Ok(rows);
214 }
215 BackendMessage::ErrorResponse(err) => {
216 if error.is_none() {
217 error = Some(PgError::QueryServer(err.into()));
218 }
219 }
220 msg if is_ignorable_session_message(&msg) => {}
221 other => {
222 return return_with_desync(
223 self,
224 unexpected_backend_message("extended-query execute", &other),
225 );
226 }
227 }
228 }
229 }
230
231 pub async fn query_count(&mut self, sql: &str, params: &[Option<Vec<u8>>]) -> PgResult<()> {
233 self.query_count_with_param_types(sql, &[], params).await
234 }
235
236 pub async fn query_count_with_param_types(
239 &mut self,
240 sql: &str,
241 param_types: &[u32],
242 params: &[Option<Vec<u8>>],
243 ) -> PgResult<()> {
244 Self::validate_param_type_arity(params, param_types)?;
245
246 self.write_buf.clear();
247 PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
248 .map_err(|e| PgError::Encode(e.to_string()))?;
249 PgEncoder::encode_bind_to(&mut self.write_buf, "", params)
250 .map_err(|e| PgError::Encode(e.to_string()))?;
251 PgEncoder::encode_execute_to(&mut self.write_buf);
252 PgEncoder::encode_sync_to(&mut self.write_buf);
253
254 self.flush_write_buf().await?;
255
256 let mut error: Option<PgError> = None;
257 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
258
259 loop {
260 match self.recv_msg_type_fast().await {
261 Ok(msg_type) => {
262 if let Err(err) = flow.validate_msg_type(
263 msg_type,
264 "extended-query count execute",
265 error.is_some(),
266 ) {
267 return return_with_desync(self, err);
268 }
269 match msg_type {
270 b'1' | b'2' | b'T' | b'D' | b'C' | b'n' => {}
271 b'Z' => {
272 if let Some(err) = error {
273 return Err(err);
274 }
275 return Ok(());
276 }
277 msg_type if is_ignorable_session_msg_type(msg_type) => {}
278 other => {
279 return return_with_desync(
280 self,
281 unexpected_backend_msg_type("extended-query count execute", other),
282 );
283 }
284 }
285 }
286 Err(e) => {
287 if matches!(&e, PgError::QueryServer(_)) {
288 capture_query_server_error(self, &mut error, e);
289 continue;
290 }
291 return Err(e);
292 }
293 }
294 }
295 }
296
297 pub async fn query_rows(
303 &mut self,
304 sql: &str,
305 params: &[Option<Vec<u8>>],
306 ) -> PgResult<Vec<super::PgRow>> {
307 self.query_rows_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
308 .await
309 }
310
311 pub async fn query_rows_with_result_format(
314 &mut self,
315 sql: &str,
316 params: &[Option<Vec<u8>>],
317 result_format: i16,
318 ) -> PgResult<Vec<super::PgRow>> {
319 self.query_rows_with_param_types_and_result_format(sql, &[], params, result_format)
320 .await
321 }
322
323 pub async fn query_visit_bytes_rows_with_result_format<F>(
328 &mut self,
329 sql: &str,
330 params: &[Option<Vec<u8>>],
331 result_format: i16,
332 on_row: F,
333 ) -> PgResult<usize>
334 where
335 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
336 {
337 self.query_visit_bytes_rows_with_param_types_and_result_format(
338 sql,
339 &[],
340 params,
341 result_format,
342 on_row,
343 )
344 .await
345 }
346
347 pub async fn query_visit_first_column_bytes_with_result_format<F>(
349 &mut self,
350 sql: &str,
351 params: &[Option<Vec<u8>>],
352 result_format: i16,
353 on_value: F,
354 ) -> PgResult<usize>
355 where
356 F: FnMut(Option<&[u8]>) -> PgResult<()>,
357 {
358 self.query_visit_first_column_bytes_with_param_types_and_result_format(
359 sql,
360 &[],
361 params,
362 result_format,
363 on_value,
364 )
365 .await
366 }
367
368 pub async fn query_rows_with_param_types_and_result_format(
371 &mut self,
372 sql: &str,
373 param_types: &[u32],
374 params: &[Option<Vec<u8>>],
375 result_format: i16,
376 ) -> PgResult<Vec<super::PgRow>> {
377 use std::sync::Arc;
378
379 Self::validate_param_type_arity(params, param_types)?;
380
381 self.write_buf.clear();
382 PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
383 .map_err(|e| PgError::Encode(e.to_string()))?;
384 PgEncoder::encode_bind_to_with_result_format(
385 &mut self.write_buf,
386 "",
387 params,
388 result_format,
389 )
390 .map_err(|e| PgError::Encode(e.to_string()))?;
391 let describe_msg =
392 PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
393 self.write_buf.extend_from_slice(&describe_msg);
394 PgEncoder::encode_execute_to(&mut self.write_buf);
395 PgEncoder::encode_sync_to(&mut self.write_buf);
396 self.flush_write_buf().await?;
397
398 let mut rows: Vec<super::PgRow> = Vec::new();
399 let mut column_info: Option<Arc<super::ColumnInfo>> = None;
400 let mut error: Option<PgError> = None;
401 let mut flow =
402 ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
403
404 loop {
405 let msg = self.recv().await?;
406 if let Err(err) = flow.validate(&msg, "extended-query rows execute", error.is_some()) {
407 return return_with_desync(self, err);
408 }
409 match msg {
410 BackendMessage::ParseComplete => {}
411 BackendMessage::BindComplete => {}
412 BackendMessage::RowDescription(fields) => {
413 column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
414 }
415 BackendMessage::DataRow(data) => {
416 if error.is_none() {
417 rows.push(super::PgRow {
418 columns: data,
419 column_info: column_info.clone(),
420 });
421 }
422 }
423 BackendMessage::CommandComplete(_) => {}
424 BackendMessage::NoData => {}
425 BackendMessage::ReadyForQuery(_) => {
426 if let Some(err) = error {
427 return Err(err);
428 }
429 return Ok(rows);
430 }
431 BackendMessage::ErrorResponse(err) => {
432 if error.is_none() {
433 error = Some(PgError::QueryServer(err.into()));
434 }
435 }
436 msg if is_ignorable_session_message(&msg) => {}
437 other => {
438 return return_with_desync(
439 self,
440 unexpected_backend_message("extended-query rows execute", &other),
441 );
442 }
443 }
444 }
445 }
446
447 pub async fn query_visit_bytes_rows_with_param_types_and_result_format<F>(
453 &mut self,
454 sql: &str,
455 param_types: &[u32],
456 params: &[Option<Vec<u8>>],
457 result_format: i16,
458 mut on_row: F,
459 ) -> PgResult<usize>
460 where
461 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
462 {
463 Self::validate_param_type_arity(params, param_types)?;
464
465 self.write_buf.clear();
466 PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
467 .map_err(|e| PgError::Encode(e.to_string()))?;
468 PgEncoder::encode_bind_to_with_result_format(
469 &mut self.write_buf,
470 "",
471 params,
472 result_format,
473 )
474 .map_err(|e| PgError::Encode(e.to_string()))?;
475 PgEncoder::encode_execute_to(&mut self.write_buf);
476 PgEncoder::encode_sync_to(&mut self.write_buf);
477
478 self.flush_write_buf().await?;
479
480 let mut row_count = 0usize;
481 let mut row = super::PgBytesRow::default();
482 let mut error: Option<PgError> = None;
483 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
484
485 loop {
486 match self.recv_fill_zerocopy_row_fast(&mut row).await {
487 Ok(msg_type) => {
488 if let Err(err) = flow.validate_msg_type(
489 msg_type,
490 "extended-query visit bytes execute",
491 error.is_some(),
492 ) {
493 return return_with_desync(self, err);
494 }
495 match msg_type {
496 b'1' | b'2' | b'T' | b'n' => {}
497 b'D' => {
498 if error.is_none() {
499 if let Err(err) = on_row(&row) {
500 return return_callback_error_with_desync(self, err);
501 }
502 row_count += 1;
503 row.release_payload();
504 }
505 }
506 b'C' => {}
507 b'Z' => {
508 if let Some(err) = error {
509 return Err(err);
510 }
511 return Ok(row_count);
512 }
513 msg_type if is_ignorable_session_msg_type(msg_type) => {}
514 other => {
515 return return_with_desync(
516 self,
517 unexpected_backend_msg_type(
518 "extended-query visit bytes execute",
519 other,
520 ),
521 );
522 }
523 }
524 }
525 Err(e) => {
526 if matches!(&e, PgError::QueryServer(_)) {
527 capture_query_server_error(self, &mut error, e);
528 continue;
529 }
530 return Err(e);
531 }
532 }
533 }
534 }
535
536 pub async fn query_visit_first_column_bytes_with_param_types_and_result_format<F>(
539 &mut self,
540 sql: &str,
541 param_types: &[u32],
542 params: &[Option<Vec<u8>>],
543 result_format: i16,
544 mut on_value: F,
545 ) -> PgResult<usize>
546 where
547 F: FnMut(Option<&[u8]>) -> PgResult<()>,
548 {
549 Self::validate_param_type_arity(params, param_types)?;
550
551 self.write_buf.clear();
552 PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
553 .map_err(|e| PgError::Encode(e.to_string()))?;
554 PgEncoder::encode_bind_to_with_result_format(
555 &mut self.write_buf,
556 "",
557 params,
558 result_format,
559 )
560 .map_err(|e| PgError::Encode(e.to_string()))?;
561 PgEncoder::encode_execute_to(&mut self.write_buf);
562 PgEncoder::encode_sync_to(&mut self.write_buf);
563
564 self.flush_write_buf().await?;
565
566 let mut row_count = 0usize;
567 let mut first_column: Option<Bytes> = None;
568 let mut error: Option<PgError> = None;
569 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
570
571 loop {
572 match self
573 .recv_fill_first_column_zerocopy_fast(&mut first_column)
574 .await
575 {
576 Ok(msg_type) => {
577 if let Err(err) = flow.validate_msg_type(
578 msg_type,
579 "extended-query visit first-column execute",
580 error.is_some(),
581 ) {
582 return return_with_desync(self, err);
583 }
584 match msg_type {
585 b'1' | b'2' | b'T' | b'n' => {}
586 b'D' => {
587 if error.is_none() {
588 if let Err(err) = on_value(first_column.as_deref()) {
589 return return_callback_error_with_desync(self, err);
590 }
591 row_count += 1;
592 first_column = None;
593 }
594 }
595 b'C' => {}
596 b'Z' => {
597 if let Some(err) = error {
598 return Err(err);
599 }
600 return Ok(row_count);
601 }
602 msg_type if is_ignorable_session_msg_type(msg_type) => {}
603 other => {
604 return return_with_desync(
605 self,
606 unexpected_backend_msg_type(
607 "extended-query visit first-column execute",
608 other,
609 ),
610 );
611 }
612 }
613 }
614 Err(e) => {
615 if matches!(&e, PgError::QueryServer(_)) {
616 capture_query_server_error(self, &mut error, e);
617 continue;
618 }
619 return Err(e);
620 }
621 }
622 }
623 }
624
625 pub async fn probe_query_with_param_types(
628 &mut self,
629 sql: &str,
630 param_types: &[u32],
631 params: &[Option<Vec<u8>>],
632 ) -> PgResult<()> {
633 Self::validate_param_type_arity(params, param_types)?;
634
635 let parse = PgEncoder::try_encode_parse("", sql, param_types)
636 .map_err(|e| PgError::Encode(e.to_string()))?;
637 let bind =
638 PgEncoder::encode_bind("", "", params).map_err(|e| PgError::Encode(e.to_string()))?;
639 let describe =
640 PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
641 let sync = PgEncoder::encode_sync();
642 let mut bytes =
643 BytesMut::with_capacity(parse.len() + bind.len() + describe.len() + sync.len());
644 bytes.extend_from_slice(&parse);
645 bytes.extend_from_slice(&bind);
646 bytes.extend_from_slice(&describe);
647 bytes.extend_from_slice(&sync);
648 self.write_all_with_timeout(&bytes, "stream write").await?;
649
650 let mut saw_describe_response = false;
651 let mut error: Option<PgError> = None;
652 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal());
653
654 loop {
655 let msg = self.recv().await?;
656 if let Err(err) = flow.validate(&msg, "extended-query probe", error.is_some()) {
657 return return_with_desync(self, err);
658 }
659 match msg {
660 BackendMessage::ParseComplete => {}
661 BackendMessage::BindComplete => {}
662 BackendMessage::RowDescription(_) | BackendMessage::NoData => {
663 saw_describe_response = true;
664 }
665 BackendMessage::ReadyForQuery(_) => {
666 if let Some(err) = error {
667 return Err(err);
668 }
669 if !saw_describe_response {
670 return return_with_desync(
671 self,
672 PgError::Protocol(
673 "extended-query probe finished without RowDescription/NoData"
674 .to_string(),
675 ),
676 );
677 }
678 return Ok(());
679 }
680 BackendMessage::ErrorResponse(err) => {
681 if error.is_none() {
682 error = Some(PgError::QueryServer(err.into()));
683 }
684 }
685 msg if is_ignorable_session_message(&msg) => {}
686 other => {
687 return return_with_desync(
688 self,
689 unexpected_backend_message("extended-query probe", &other),
690 );
691 }
692 }
693 }
694 }
695
696 pub async fn query_cached(
701 &mut self,
702 sql: &str,
703 params: &[Option<Vec<u8>>],
704 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
705 self.query_cached_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
706 .await
707 }
708
709 pub async fn query_cached_with_result_format(
711 &mut self,
712 sql: &str,
713 params: &[Option<Vec<u8>>],
714 result_format: i16,
715 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
716 let mut retried = false;
717 loop {
718 match self
719 .query_cached_with_result_format_once(sql, params, result_format)
720 .await
721 {
722 Ok(rows) => return Ok(rows),
723 Err(err)
724 if !retried
725 && (err.is_prepared_statement_retryable()
726 || err.is_prepared_statement_already_exists()) =>
727 {
728 retried = true;
729 if err.is_prepared_statement_retryable() {
730 self.clear_prepared_statement_state();
731 }
732 }
733 Err(err) => return Err(err),
734 }
735 }
736 }
737
738 async fn query_cached_with_result_format_once(
739 &mut self,
740 sql: &str,
741 params: &[Option<Vec<u8>>],
742 result_format: i16,
743 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
744 let stmt_name = Self::sql_to_stmt_name(sql);
745 let is_new = !self.prepared_statements.contains_key(&stmt_name);
746
747 let needed = prepared_bind_execute_sync_wire_len(&stmt_name, params, result_format)?;
748 let mut buf = BytesMut::with_capacity(needed);
749
750 if is_new {
751 self.evict_prepared_if_full();
755 if let Err(e) = PgEncoder::try_encode_parse_to(&mut buf, &stmt_name, sql, &[]) {
756 return Err(PgError::Encode(e.to_string()));
757 }
758 self.prepared_statements
760 .insert(stmt_name.clone(), sql.to_string());
761 }
762
763 if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
765 &mut buf,
766 &stmt_name,
767 params,
768 result_format,
769 ) {
770 if is_new {
771 self.prepared_statements.remove(&stmt_name);
772 }
773 return Err(PgError::Encode(e.to_string()));
774 }
775 PgEncoder::encode_execute_to(&mut buf);
776 PgEncoder::encode_sync_to(&mut buf);
777
778 if let Err(err) = self.write_all_with_timeout(&buf, "stream write").await {
779 if is_new {
780 self.prepared_statements.remove(&stmt_name);
781 }
782 return Err(err);
783 }
784
785 let mut rows = Vec::new();
786
787 let mut error: Option<PgError> = None;
788 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(is_new));
789
790 loop {
791 let msg = match self.recv().await {
792 Ok(msg) => msg,
793 Err(err) => {
794 if is_new && !flow.saw_parse_complete() {
795 self.prepared_statements.remove(&stmt_name);
796 }
797 return Err(err);
798 }
799 };
800 if let Err(err) = flow.validate(&msg, "extended-query cached execute", error.is_some())
801 {
802 if is_new && !flow.saw_parse_complete() {
803 self.prepared_statements.remove(&stmt_name);
804 }
805 return return_with_desync(self, err);
806 }
807 match msg {
808 BackendMessage::ParseComplete => {
809 }
811 BackendMessage::BindComplete => {}
812 BackendMessage::RowDescription(_) => {}
813 BackendMessage::DataRow(data) => {
814 if error.is_none() {
815 rows.push(data);
816 }
817 }
818 BackendMessage::CommandComplete(_) => {}
819 BackendMessage::NoData => {}
820 BackendMessage::ReadyForQuery(_) => {
821 if let Some(err) = error {
822 if is_new
823 && !flow.saw_parse_complete()
824 && !err.is_prepared_statement_already_exists()
825 {
826 self.prepared_statements.remove(&stmt_name);
827 }
828 return Err(err);
829 }
830 if is_new && !flow.saw_parse_complete() {
831 self.prepared_statements.remove(&stmt_name);
832 return return_with_desync(
833 self,
834 PgError::Protocol(
835 "Cache miss query reached ReadyForQuery without ParseComplete"
836 .to_string(),
837 ),
838 );
839 }
840 return Ok(rows);
841 }
842 BackendMessage::ErrorResponse(err) => {
843 if error.is_none() {
844 let query_err = PgError::QueryServer(err.into());
845 if !query_err.is_prepared_statement_already_exists() {
846 self.prepared_statements.remove(&stmt_name);
848 }
849 error = Some(query_err);
850 }
851 }
852 msg if is_ignorable_session_message(&msg) => {}
853 other => {
854 if is_new && !flow.saw_parse_complete() {
855 self.prepared_statements.remove(&stmt_name);
856 }
857 return return_with_desync(
858 self,
859 unexpected_backend_message("extended-query cached execute", &other),
860 );
861 }
862 }
863 }
864 }
865
866 pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
869 use std::collections::hash_map::DefaultHasher;
870 use std::hash::{Hash, Hasher};
871
872 let mut hasher = DefaultHasher::new();
873 sql.hash(&mut hasher);
874 format!("s{:016x}", hasher.finish())
875 }
876
877 pub async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
879 let bytes = PgEncoder::try_encode_query_string(sql)?;
880 self.write_all_with_timeout(&bytes, "stream write").await?;
881
882 let mut error: Option<PgError> = None;
883 let mut flow = SimpleFlowTracker::new();
884
885 loop {
886 let msg = self.recv().await?;
887 match msg {
888 BackendMessage::RowDescription(_) => {
889 if let Err(err) = flow.on_row_description("simple-query execute") {
893 return return_with_desync(self, err);
894 }
895 }
896 BackendMessage::DataRow(_) => {
897 if let Err(err) = flow.on_data_row("simple-query execute") {
898 return return_with_desync(self, err);
899 }
900 }
901 BackendMessage::CommandComplete(_) => {
902 flow.on_command_complete();
903 }
904 BackendMessage::EmptyQueryResponse => {
905 if let Err(err) = flow.on_empty_query_response("simple-query execute") {
906 return return_with_desync(self, err);
907 }
908 }
909 BackendMessage::ReadyForQuery(_) => {
910 if let Some(err) = error {
911 return Err(err);
912 }
913 if let Err(err) =
914 flow.on_ready_for_query("simple-query execute", error.is_some())
915 {
916 return return_with_desync(self, err);
917 }
918 return Ok(());
919 }
920 BackendMessage::ErrorResponse(err) => {
921 if error.is_none() {
922 error = Some(PgError::QueryServer(err.into()));
923 }
924 }
925 msg if is_ignorable_session_message(&msg) => {}
926 other => {
927 return return_with_desync(
928 self,
929 unexpected_backend_message("simple-query execute", &other),
930 );
931 }
932 }
933 }
934 }
935
936 pub async fn simple_query(&mut self, sql: &str) -> PgResult<Vec<super::PgRow>> {
943 use std::sync::Arc;
944
945 const MAX_SIMPLE_QUERY_ROWS: usize = 10_000;
948
949 let bytes = PgEncoder::try_encode_query_string(sql)?;
950 self.write_all_with_timeout(&bytes, "stream write").await?;
951
952 let mut rows: Vec<super::PgRow> = Vec::new();
953 let mut column_info: Option<Arc<super::ColumnInfo>> = None;
954 let mut error: Option<PgError> = None;
955 let mut flow = SimpleFlowTracker::new();
956
957 loop {
958 let msg = self.recv().await?;
959 match msg {
960 BackendMessage::RowDescription(fields) => {
961 if let Err(err) = flow.on_row_description("simple-query read") {
962 return return_with_desync(self, err);
963 }
964 column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
965 }
966 BackendMessage::DataRow(data) => {
967 if let Err(err) = flow.on_data_row("simple-query read") {
968 return return_with_desync(self, err);
969 }
970 if error.is_none() {
971 if rows.len() >= MAX_SIMPLE_QUERY_ROWS {
972 if error.is_none() {
973 error = Some(PgError::Query(format!(
974 "simple_query exceeded {} row safety cap",
975 MAX_SIMPLE_QUERY_ROWS,
976 )));
977 }
978 } else {
980 rows.push(super::PgRow {
981 columns: data,
982 column_info: column_info.clone(),
983 });
984 }
985 }
986 }
987 BackendMessage::CommandComplete(_) => {
988 flow.on_command_complete();
989 column_info = None;
990 }
991 BackendMessage::EmptyQueryResponse => {
992 if let Err(err) = flow.on_empty_query_response("simple-query read") {
993 return return_with_desync(self, err);
994 }
995 column_info = None;
996 }
997 BackendMessage::ReadyForQuery(_) => {
998 if let Some(err) = error {
999 return Err(err);
1000 }
1001 if let Err(err) = flow.on_ready_for_query("simple-query read", error.is_some())
1002 {
1003 return return_with_desync(self, err);
1004 }
1005 return Ok(rows);
1006 }
1007 BackendMessage::ErrorResponse(err) => {
1008 if error.is_none() {
1009 error = Some(PgError::QueryServer(err.into()));
1010 }
1011 }
1012 msg if is_ignorable_session_message(&msg) => {}
1013 other => {
1014 return return_with_desync(
1015 self,
1016 unexpected_backend_message("simple-query read", &other),
1017 );
1018 }
1019 }
1020 }
1021 }
1022
1023 #[inline]
1036 pub async fn query_prepared_single(
1037 &mut self,
1038 stmt: &super::PreparedStatement,
1039 params: &[Option<Vec<u8>>],
1040 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1041 self.query_prepared_single_with_result_format(stmt, params, PgEncoder::FORMAT_TEXT)
1042 .await
1043 }
1044
1045 #[inline]
1051 pub async fn query_prepared_single_count(
1052 &mut self,
1053 stmt: &super::PreparedStatement,
1054 params: &[Option<Vec<u8>>],
1055 ) -> PgResult<()> {
1056 self.write_buf.clear();
1057 PgEncoder::encode_bind_to(&mut self.write_buf, &stmt.name, params)
1058 .map_err(|e| PgError::Encode(e.to_string()))?;
1059 PgEncoder::encode_execute_to(&mut self.write_buf);
1060 PgEncoder::encode_sync_to(&mut self.write_buf);
1061
1062 self.flush_write_buf().await?;
1063
1064 let mut error: Option<PgError> = None;
1065 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1066
1067 loop {
1068 match self.recv_msg_type_fast().await {
1069 Ok(msg_type) => {
1070 if let Err(err) = flow.validate_msg_type(
1071 msg_type,
1072 "prepared single count execute",
1073 error.is_some(),
1074 ) {
1075 return return_with_desync(self, err);
1076 }
1077 match msg_type {
1078 b'2' | b'T' | b'D' | b'C' | b'n' => {}
1079 b'Z' => {
1080 if let Some(err) = error {
1081 return Err(err);
1082 }
1083 return Ok(());
1084 }
1085 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1086 other => {
1087 return return_with_desync(
1088 self,
1089 unexpected_backend_msg_type("prepared single count execute", other),
1090 );
1091 }
1092 }
1093 }
1094 Err(e) => {
1095 if matches!(&e, PgError::QueryServer(_)) {
1096 capture_query_server_error(self, &mut error, e);
1097 continue;
1098 }
1099 return Err(e);
1100 }
1101 }
1102 }
1103 }
1104
1105 #[inline]
1107 pub async fn query_prepared_single_with_result_format(
1108 &mut self,
1109 stmt: &super::PreparedStatement,
1110 params: &[Option<Vec<u8>>],
1111 result_format: i16,
1112 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1113 let needed = prepared_bind_execute_sync_wire_len(&stmt.name, params, result_format)?;
1114 let mut buf = BytesMut::with_capacity(needed);
1115
1116 PgEncoder::encode_bind_to_with_result_format(&mut buf, &stmt.name, params, result_format)
1118 .map_err(|e| PgError::Encode(e.to_string()))?;
1119 PgEncoder::encode_execute_to(&mut buf);
1120 PgEncoder::encode_sync_to(&mut buf);
1121
1122 self.write_all_with_timeout(&buf, "stream write").await?;
1123
1124 let mut rows = Vec::new();
1125
1126 let mut error: Option<PgError> = None;
1127 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1128
1129 loop {
1130 let msg = self.recv().await?;
1131 if let Err(err) = flow.validate(&msg, "prepared single execute", error.is_some()) {
1132 return return_with_desync(self, err);
1133 }
1134 match msg {
1135 BackendMessage::BindComplete => {}
1136 BackendMessage::RowDescription(_) => {}
1137 BackendMessage::DataRow(data) => {
1138 if error.is_none() {
1139 rows.push(data);
1140 }
1141 }
1142 BackendMessage::CommandComplete(_) => {}
1143 BackendMessage::NoData => {}
1144 BackendMessage::ReadyForQuery(_) => {
1145 if let Some(err) = error {
1146 return Err(err);
1147 }
1148 return Ok(rows);
1149 }
1150 BackendMessage::ErrorResponse(err) => {
1151 if error.is_none() {
1152 error = Some(PgError::QueryServer(err.into()));
1153 }
1154 }
1155 msg if is_ignorable_session_message(&msg) => {}
1156 other => {
1157 return return_with_desync(
1158 self,
1159 unexpected_backend_message("prepared single execute", &other),
1160 );
1161 }
1162 }
1163 }
1164 }
1165
1166 #[inline]
1169 pub async fn query_prepared_single_reuse_with_result_format(
1170 &mut self,
1171 stmt: &super::PreparedStatement,
1172 params: &[Option<Vec<u8>>],
1173 result_format: i16,
1174 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1175 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1176
1177 PgEncoder::encode_bind_to_with_result_format(
1178 &mut self.write_buf,
1179 &stmt.name,
1180 params,
1181 result_format,
1182 )
1183 .map_err(|e| PgError::Encode(e.to_string()))?;
1184 PgEncoder::encode_execute_to(&mut self.write_buf);
1185 PgEncoder::encode_sync_to(&mut self.write_buf);
1186
1187 self.flush_write_buf().await?;
1188
1189 let mut rows = Vec::with_capacity(32);
1190 let mut error: Option<PgError> = None;
1191 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1192
1193 loop {
1194 let msg = self.recv().await?;
1195 if let Err(err) = flow.validate(&msg, "prepared single reuse execute", error.is_some())
1196 {
1197 return return_with_desync(self, err);
1198 }
1199 match msg {
1200 BackendMessage::BindComplete => {}
1201 BackendMessage::RowDescription(_) => {}
1202 BackendMessage::DataRow(data) => {
1203 if error.is_none() {
1204 rows.push(data);
1205 }
1206 }
1207 BackendMessage::CommandComplete(_) => {}
1208 BackendMessage::NoData => {}
1209 BackendMessage::ReadyForQuery(_) => {
1210 if let Some(err) = error {
1211 return Err(err);
1212 }
1213 return Ok(rows);
1214 }
1215 BackendMessage::ErrorResponse(err) => {
1216 if error.is_none() {
1217 error = Some(PgError::QueryServer(err.into()));
1218 }
1219 }
1220 msg if is_ignorable_session_message(&msg) => {}
1221 other => {
1222 return return_with_desync(
1223 self,
1224 unexpected_backend_message("prepared single reuse execute", &other),
1225 );
1226 }
1227 }
1228 }
1229 }
1230
1231 #[inline]
1236 pub async fn query_prepared_single_reuse_visit_rows_with_result_format<F>(
1237 &mut self,
1238 stmt: &super::PreparedStatement,
1239 params: &[Option<Vec<u8>>],
1240 result_format: i16,
1241 mut on_row: F,
1242 ) -> PgResult<usize>
1243 where
1244 F: FnMut(&[Option<Vec<u8>>]) -> PgResult<()>,
1245 {
1246 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1247
1248 PgEncoder::encode_bind_to_with_result_format(
1249 &mut self.write_buf,
1250 &stmt.name,
1251 params,
1252 result_format,
1253 )
1254 .map_err(|e| PgError::Encode(e.to_string()))?;
1255 PgEncoder::encode_execute_to(&mut self.write_buf);
1256 PgEncoder::encode_sync_to(&mut self.write_buf);
1257
1258 self.flush_write_buf().await?;
1259
1260 let mut row_count = 0usize;
1261 let mut row_buf: Vec<Option<Vec<u8>>> = Vec::new();
1262 let mut error: Option<PgError> = None;
1263 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1264
1265 loop {
1266 match self.recv_fill_data_row_fast(&mut row_buf).await {
1267 Ok(msg_type) => {
1268 if let Err(err) = flow.validate_msg_type(
1269 msg_type,
1270 "prepared single reuse visit execute",
1271 error.is_some(),
1272 ) {
1273 return return_with_desync(self, err);
1274 }
1275 match msg_type {
1276 b'2' | b'T' | b'n' => {}
1277 b'D' => {
1278 if error.is_none() {
1279 if let Err(err) = on_row(row_buf.as_slice()) {
1280 return return_callback_error_with_desync(self, err);
1281 }
1282 row_count += 1;
1283 }
1284 }
1285 b'C' => {}
1286 b'Z' => {
1287 if let Some(err) = error {
1288 return Err(err);
1289 }
1290 return Ok(row_count);
1291 }
1292 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1293 other => {
1294 return return_with_desync(
1295 self,
1296 unexpected_backend_msg_type(
1297 "prepared single reuse visit execute",
1298 other,
1299 ),
1300 );
1301 }
1302 }
1303 }
1304 Err(e) => {
1305 if matches!(&e, PgError::QueryServer(_)) {
1306 capture_query_server_error(self, &mut error, e);
1307 continue;
1308 }
1309 return Err(e);
1310 }
1311 }
1312 }
1313 }
1314
1315 #[inline]
1320 pub async fn query_prepared_single_reuse_visit_bytes_rows_with_result_format<F>(
1321 &mut self,
1322 stmt: &super::PreparedStatement,
1323 params: &[Option<Vec<u8>>],
1324 result_format: i16,
1325 mut on_row: F,
1326 ) -> PgResult<usize>
1327 where
1328 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1329 {
1330 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1331
1332 PgEncoder::encode_bind_to_with_result_format(
1333 &mut self.write_buf,
1334 &stmt.name,
1335 params,
1336 result_format,
1337 )
1338 .map_err(|e| PgError::Encode(e.to_string()))?;
1339 PgEncoder::encode_execute_to(&mut self.write_buf);
1340 PgEncoder::encode_sync_to(&mut self.write_buf);
1341
1342 self.flush_write_buf().await?;
1343
1344 let mut row_count = 0usize;
1345 let mut row = super::PgBytesRow::default();
1346 let mut error: Option<PgError> = None;
1347 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1348
1349 loop {
1350 match self.recv_fill_zerocopy_row_fast(&mut row).await {
1351 Ok(msg_type) => {
1352 if let Err(err) = flow.validate_msg_type(
1353 msg_type,
1354 "prepared single reuse visit bytes execute",
1355 error.is_some(),
1356 ) {
1357 return return_with_desync(self, err);
1358 }
1359 match msg_type {
1360 b'2' | b'T' | b'n' => {}
1361 b'D' => {
1362 if error.is_none() {
1363 if let Err(err) = on_row(&row) {
1364 return return_callback_error_with_desync(self, err);
1365 }
1366 row_count += 1;
1367 row.release_payload();
1368 }
1369 }
1370 b'C' => {}
1371 b'Z' => {
1372 if let Some(err) = error {
1373 return Err(err);
1374 }
1375 return Ok(row_count);
1376 }
1377 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1378 other => {
1379 return return_with_desync(
1380 self,
1381 unexpected_backend_msg_type(
1382 "prepared single reuse visit bytes execute",
1383 other,
1384 ),
1385 );
1386 }
1387 }
1388 }
1389 Err(e) => {
1390 if matches!(&e, PgError::QueryServer(_)) {
1391 capture_query_server_error(self, &mut error, e);
1392 continue;
1393 }
1394 return Err(e);
1395 }
1396 }
1397 }
1398 }
1399
1400 #[inline]
1402 pub async fn query_prepared_single_reuse_visit_first_column_bytes_with_result_format<F>(
1403 &mut self,
1404 stmt: &super::PreparedStatement,
1405 params: &[Option<Vec<u8>>],
1406 result_format: i16,
1407 mut on_value: F,
1408 ) -> PgResult<usize>
1409 where
1410 F: FnMut(Option<&[u8]>) -> PgResult<()>,
1411 {
1412 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1413
1414 PgEncoder::encode_bind_to_with_result_format(
1415 &mut self.write_buf,
1416 &stmt.name,
1417 params,
1418 result_format,
1419 )
1420 .map_err(|e| PgError::Encode(e.to_string()))?;
1421 PgEncoder::encode_execute_to(&mut self.write_buf);
1422 PgEncoder::encode_sync_to(&mut self.write_buf);
1423
1424 self.flush_write_buf().await?;
1425
1426 let mut row_count = 0usize;
1427 let mut first_column: Option<Bytes> = None;
1428 let mut error: Option<PgError> = None;
1429 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1430
1431 loop {
1432 match self
1433 .recv_fill_first_column_zerocopy_fast(&mut first_column)
1434 .await
1435 {
1436 Ok(msg_type) => {
1437 if let Err(err) = flow.validate_msg_type(
1438 msg_type,
1439 "prepared single reuse visit first-column execute",
1440 error.is_some(),
1441 ) {
1442 return return_with_desync(self, err);
1443 }
1444 match msg_type {
1445 b'2' | b'T' | b'n' => {}
1446 b'D' => {
1447 if error.is_none() {
1448 if let Err(err) = on_value(first_column.as_deref()) {
1449 return return_callback_error_with_desync(self, err);
1450 }
1451 row_count += 1;
1452 first_column = None;
1453 }
1454 }
1455 b'C' => {}
1456 b'Z' => {
1457 if let Some(err) = error {
1458 return Err(err);
1459 }
1460 return Ok(row_count);
1461 }
1462 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1463 other => {
1464 return return_with_desync(
1465 self,
1466 unexpected_backend_msg_type(
1467 "prepared single reuse visit first-column execute",
1468 other,
1469 ),
1470 );
1471 }
1472 }
1473 }
1474 Err(e) => {
1475 if matches!(&e, PgError::QueryServer(_)) {
1476 capture_query_server_error(self, &mut error, e);
1477 continue;
1478 }
1479 return Err(e);
1480 }
1481 }
1482 }
1483 }
1484
1485 #[inline]
1487 pub async fn query_prepared_single_reuse_visit_first_four_columns_bytes_with_result_format<F>(
1488 &mut self,
1489 stmt: &super::PreparedStatement,
1490 params: &[Option<Vec<u8>>],
1491 result_format: i16,
1492 mut on_row: F,
1493 ) -> PgResult<usize>
1494 where
1495 F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1496 {
1497 reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1498
1499 PgEncoder::encode_bind_to_with_result_format(
1500 &mut self.write_buf,
1501 &stmt.name,
1502 params,
1503 result_format,
1504 )
1505 .map_err(|e| PgError::Encode(e.to_string()))?;
1506 PgEncoder::encode_execute_to(&mut self.write_buf);
1507 PgEncoder::encode_sync_to(&mut self.write_buf);
1508
1509 self.flush_write_buf().await?;
1510
1511 let mut row_count = 0usize;
1512 let mut columns = [None, None, None, None];
1513 let mut error: Option<PgError> = None;
1514 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1515
1516 loop {
1517 match self
1518 .recv_fill_first_four_columns_zerocopy_fast(&mut columns)
1519 .await
1520 {
1521 Ok(msg_type) => {
1522 if let Err(err) = flow.validate_msg_type(
1523 msg_type,
1524 "prepared single reuse visit first-four execute",
1525 error.is_some(),
1526 ) {
1527 return return_with_desync(self, err);
1528 }
1529 match msg_type {
1530 b'2' | b'T' | b'n' => {}
1531 b'D' => {
1532 if error.is_none() {
1533 if let Err(err) = on_row([
1534 columns[0].as_deref(),
1535 columns[1].as_deref(),
1536 columns[2].as_deref(),
1537 columns[3].as_deref(),
1538 ]) {
1539 return return_callback_error_with_desync(self, err);
1540 }
1541 columns.fill(None);
1542 row_count += 1;
1543 }
1544 }
1545 b'C' => {}
1546 b'Z' => {
1547 if let Some(err) = error {
1548 return Err(err);
1549 }
1550 return Ok(row_count);
1551 }
1552 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1553 other => {
1554 return return_with_desync(
1555 self,
1556 unexpected_backend_msg_type(
1557 "prepared single reuse visit first-four execute",
1558 other,
1559 ),
1560 );
1561 }
1562 }
1563 }
1564 Err(e) => {
1565 if matches!(&e, PgError::QueryServer(_)) {
1566 capture_query_server_error(self, &mut error, e);
1567 continue;
1568 }
1569 return Err(e);
1570 }
1571 }
1572 }
1573 }
1574
1575 #[inline]
1579 pub async fn query_prepared_single_encoded_visit_bytes_rows<F>(
1580 &mut self,
1581 wire: &[u8],
1582 on_row: F,
1583 ) -> PgResult<usize>
1584 where
1585 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1586 {
1587 let (row_count, _, _) = self
1588 .query_prepared_single_encoded_visit_bytes_rows_profiled(wire, on_row)
1589 .await?;
1590 Ok(row_count)
1591 }
1592
1593 #[inline]
1597 pub async fn query_prepared_single_encoded_visit_bytes_rows_profiled<F>(
1598 &mut self,
1599 wire: &[u8],
1600 mut on_row: F,
1601 ) -> PgResult<(usize, Duration, Duration)>
1602 where
1603 F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1604 {
1605 let send_start = Instant::now();
1606 self.send_bytes(wire).await?;
1607 let send_elapsed = send_start.elapsed();
1608 let consume_start = Instant::now();
1609
1610 let mut row_count = 0usize;
1611 let mut row = super::PgBytesRow::default();
1612 let mut error: Option<PgError> = None;
1613 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1614
1615 loop {
1616 match self.recv_fill_zerocopy_row_fast(&mut row).await {
1617 Ok(msg_type) => {
1618 if let Err(err) = flow.validate_msg_type(
1619 msg_type,
1620 "prepared single encoded visit bytes execute",
1621 error.is_some(),
1622 ) {
1623 return return_with_desync(self, err);
1624 }
1625 match msg_type {
1626 b'2' | b'T' | b'n' => {}
1627 b'D' => {
1628 if error.is_none() {
1629 if let Err(err) = on_row(&row) {
1630 return return_callback_error_with_desync(self, err);
1631 }
1632 row_count += 1;
1633 row.release_payload();
1634 }
1635 }
1636 b'C' => {}
1637 b'Z' => {
1638 if let Some(err) = error {
1639 return Err(err);
1640 }
1641 return Ok((row_count, send_elapsed, consume_start.elapsed()));
1642 }
1643 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1644 other => {
1645 return return_with_desync(
1646 self,
1647 unexpected_backend_msg_type(
1648 "prepared single encoded visit bytes execute",
1649 other,
1650 ),
1651 );
1652 }
1653 }
1654 }
1655 Err(e) => {
1656 if matches!(&e, PgError::QueryServer(_)) {
1657 capture_query_server_error(self, &mut error, e);
1658 continue;
1659 }
1660 return Err(e);
1661 }
1662 }
1663 }
1664 }
1665
1666 #[inline]
1668 pub async fn query_prepared_single_encoded_visit_first_column_bytes<F>(
1669 &mut self,
1670 wire: &[u8],
1671 on_value: F,
1672 ) -> PgResult<usize>
1673 where
1674 F: FnMut(Option<&[u8]>) -> PgResult<()>,
1675 {
1676 let (row_count, _, _) = self
1677 .query_prepared_single_encoded_visit_first_column_bytes_profiled(wire, on_value)
1678 .await?;
1679 Ok(row_count)
1680 }
1681
1682 #[inline]
1686 pub async fn query_prepared_single_encoded_visit_first_column_bytes_profiled<F>(
1687 &mut self,
1688 wire: &[u8],
1689 mut on_value: F,
1690 ) -> PgResult<(usize, Duration, Duration)>
1691 where
1692 F: FnMut(Option<&[u8]>) -> PgResult<()>,
1693 {
1694 let send_start = Instant::now();
1695 self.send_bytes(wire).await?;
1696 let send_elapsed = send_start.elapsed();
1697 let consume_start = Instant::now();
1698
1699 let mut row_count = 0usize;
1700 let mut first_column: Option<Bytes> = None;
1701 let mut error: Option<PgError> = None;
1702 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1703
1704 loop {
1705 match self
1706 .recv_fill_first_column_zerocopy_fast(&mut first_column)
1707 .await
1708 {
1709 Ok(msg_type) => {
1710 if let Err(err) = flow.validate_msg_type(
1711 msg_type,
1712 "prepared single encoded visit first-column execute",
1713 error.is_some(),
1714 ) {
1715 return return_with_desync(self, err);
1716 }
1717 match msg_type {
1718 b'2' | b'T' | b'n' => {}
1719 b'D' => {
1720 if error.is_none() {
1721 if let Err(err) = on_value(first_column.as_deref()) {
1722 return return_callback_error_with_desync(self, err);
1723 }
1724 row_count += 1;
1725 first_column = None;
1726 }
1727 }
1728 b'C' => {}
1729 b'Z' => {
1730 if let Some(err) = error {
1731 return Err(err);
1732 }
1733 return Ok((row_count, send_elapsed, consume_start.elapsed()));
1734 }
1735 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1736 other => {
1737 return return_with_desync(
1738 self,
1739 unexpected_backend_msg_type(
1740 "prepared single encoded visit first-column execute",
1741 other,
1742 ),
1743 );
1744 }
1745 }
1746 }
1747 Err(e) => {
1748 if matches!(&e, PgError::QueryServer(_)) {
1749 capture_query_server_error(self, &mut error, e);
1750 continue;
1751 }
1752 return Err(e);
1753 }
1754 }
1755 }
1756 }
1757
1758 #[inline]
1760 pub async fn query_prepared_single_encoded_visit_first_four_columns_bytes<F>(
1761 &mut self,
1762 wire: &[u8],
1763 on_row: F,
1764 ) -> PgResult<usize>
1765 where
1766 F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1767 {
1768 let (row_count, _, _) = self
1769 .query_prepared_single_encoded_visit_first_four_columns_bytes_profiled(wire, on_row)
1770 .await?;
1771 Ok(row_count)
1772 }
1773
1774 #[inline]
1778 pub async fn query_prepared_single_encoded_visit_first_four_columns_bytes_profiled<F>(
1779 &mut self,
1780 wire: &[u8],
1781 mut on_row: F,
1782 ) -> PgResult<(usize, Duration, Duration)>
1783 where
1784 F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1785 {
1786 let send_start = Instant::now();
1787 self.send_bytes(wire).await?;
1788 let send_elapsed = send_start.elapsed();
1789 let consume_start = Instant::now();
1790
1791 let mut row_count = 0usize;
1792 let mut columns = [None, None, None, None];
1793 let mut error: Option<PgError> = None;
1794 let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1795
1796 loop {
1797 match self
1798 .recv_fill_first_four_columns_zerocopy_fast(&mut columns)
1799 .await
1800 {
1801 Ok(msg_type) => {
1802 if let Err(err) = flow.validate_msg_type(
1803 msg_type,
1804 "prepared single encoded visit first-four execute",
1805 error.is_some(),
1806 ) {
1807 return return_with_desync(self, err);
1808 }
1809 match msg_type {
1810 b'2' | b'T' | b'n' => {}
1811 b'D' => {
1812 if error.is_none() {
1813 if let Err(err) = on_row([
1814 columns[0].as_deref(),
1815 columns[1].as_deref(),
1816 columns[2].as_deref(),
1817 columns[3].as_deref(),
1818 ]) {
1819 return return_callback_error_with_desync(self, err);
1820 }
1821 columns.fill(None);
1822 row_count += 1;
1823 }
1824 }
1825 b'C' => {}
1826 b'Z' => {
1827 if let Some(err) = error {
1828 return Err(err);
1829 }
1830 return Ok((row_count, send_elapsed, consume_start.elapsed()));
1831 }
1832 msg_type if is_ignorable_session_msg_type(msg_type) => {}
1833 other => {
1834 return return_with_desync(
1835 self,
1836 unexpected_backend_msg_type(
1837 "prepared single encoded visit first-four execute",
1838 other,
1839 ),
1840 );
1841 }
1842 }
1843 }
1844 Err(e) => {
1845 if matches!(&e, PgError::QueryServer(_)) {
1846 capture_query_server_error(self, &mut error, e);
1847 continue;
1848 }
1849 return Err(e);
1850 }
1851 }
1852 }
1853 }
1854}
1855
1856#[cfg(test)]
1857mod tests {
1858 use super::*;
1859
1860 #[cfg(unix)]
1861 fn test_conn_with_peer() -> (PgConnection, tokio::net::UnixStream) {
1862 use crate::driver::connection::StatementCache;
1863 use crate::driver::stream::PgStream;
1864 use bytes::BytesMut;
1865 use std::collections::{HashMap, VecDeque};
1866 use std::num::NonZeroUsize;
1867 use tokio::net::UnixStream;
1868
1869 let (unix_stream, peer) = UnixStream::pair().expect("unix stream pair");
1870 (
1871 PgConnection {
1872 stream: PgStream::Unix(unix_stream),
1873 buffer: BytesMut::with_capacity(1024),
1874 write_buf: BytesMut::with_capacity(1024),
1875 sql_buf: BytesMut::with_capacity(256),
1876 params_buf: Vec::new(),
1877 prepared_statements: HashMap::new(),
1878 stmt_cache: StatementCache::new(NonZeroUsize::new(2).expect("non-zero")),
1879 column_info_cache: HashMap::new(),
1880 process_id: 0,
1881 cancel_key_bytes: Vec::new(),
1882 requested_protocol_minor: PgConnection::default_protocol_minor(),
1883 negotiated_protocol_minor: PgConnection::default_protocol_minor(),
1884 notifications: VecDeque::new(),
1885 replication_stream_active: false,
1886 replication_mode_enabled: false,
1887 last_replication_wal_end: None,
1888 io_desynced: false,
1889 pending_statement_closes: Vec::new(),
1890 draining_statement_closes: false,
1891 },
1892 peer,
1893 )
1894 }
1895
1896 #[cfg(unix)]
1897 fn test_conn() -> PgConnection {
1898 test_conn_with_peer().0
1899 }
1900
1901 #[cfg(unix)]
1902 fn push_backend_frame(conn: &mut PgConnection, msg_type: u8, payload: &[u8]) {
1903 conn.buffer.extend_from_slice(&[msg_type]);
1904 conn.buffer
1905 .extend_from_slice(&((payload.len() + 4) as u32).to_be_bytes());
1906 conn.buffer.extend_from_slice(payload);
1907 }
1908
1909 #[test]
1910 fn prepared_buffer_sizing_rejects_too_many_params_before_allocation() {
1911 let params = vec![None; i16::MAX as usize + 1];
1912 let err = prepared_bind_execute_sync_wire_len("stmt", ¶ms, PgEncoder::FORMAT_TEXT)
1913 .expect_err("parameter overflow must be rejected");
1914
1915 assert!(matches!(err, PgError::Encode(msg) if msg.contains("Too many parameters")));
1916 }
1917
1918 #[cfg(unix)]
1919 #[tokio::test]
1920 async fn streaming_callback_error_marks_query_connection_desynced() {
1921 let mut conn = test_conn();
1922
1923 let err = return_callback_error_with_desync::<()>(
1924 &mut conn,
1925 PgError::Query("consumer stopped".to_string()),
1926 )
1927 .expect_err("callback error should be returned");
1928
1929 assert!(matches!(err, PgError::Query(msg) if msg == "consumer stopped"));
1930 assert!(conn.is_io_desynced());
1931 }
1932
1933 #[cfg(unix)]
1934 #[tokio::test]
1935 async fn protocol_order_error_marks_query_connection_desynced() {
1936 let (mut conn, _peer) = test_conn_with_peer();
1937 push_backend_frame(&mut conn, b'D', &0i16.to_be_bytes());
1938 let stmt = super::super::PreparedStatement::from_sql("SELECT 1");
1939
1940 let err = conn
1941 .query_prepared_single_count(&stmt, &[])
1942 .await
1943 .expect_err("out-of-order DataRow must fail");
1944
1945 assert!(err.to_string().contains("DataRow before BindComplete"));
1946 assert!(conn.is_io_desynced());
1947 }
1948
1949 #[cfg(unix)]
1950 #[tokio::test]
1951 async fn simple_flow_error_marks_query_connection_desynced() {
1952 let (mut conn, _peer) = test_conn_with_peer();
1953 push_backend_frame(&mut conn, b'Z', b"I");
1954
1955 let err = conn
1956 .execute_simple("SELECT 1")
1957 .await
1958 .expect_err("ReadyForQuery before completion must fail");
1959
1960 assert!(err.to_string().contains("ReadyForQuery before completion"));
1961 assert!(conn.is_io_desynced());
1962 }
1963}