Skip to main content

qail_pg/driver/
query.rs

1//! Query execution methods for PostgreSQL connection.
2//!
3//! This module provides query, query_cached, and execute_simple.
4
5use super::{
6    PgConnection, PgError, PgResult,
7    extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8    is_ignorable_session_message, is_ignorable_session_msg_type, unexpected_backend_message,
9    unexpected_backend_msg_type,
10};
11use crate::protocol::{BackendMessage, PgEncoder};
12use bytes::{Bytes, BytesMut};
13use std::time::{Duration, Instant};
14
15#[inline]
16fn capture_query_server_error(conn: &mut PgConnection, slot: &mut Option<PgError>, err: PgError) {
17    if slot.is_some() {
18        return;
19    }
20    if err.is_prepared_statement_retryable() {
21        conn.clear_prepared_statement_state();
22    }
23    *slot = Some(err);
24}
25
26#[inline]
27fn return_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
28    if matches!(
29        err,
30        PgError::Protocol(_) | PgError::Connection(_) | PgError::Timeout(_)
31    ) {
32        conn.mark_io_desynced();
33    }
34    Err(err)
35}
36
37#[inline]
38fn return_callback_error_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
39    conn.mark_io_desynced();
40    Err(err)
41}
42
43#[inline]
44fn prepared_bind_execute_sync_wire_len(
45    statement: &str,
46    params: &[Option<Vec<u8>>],
47    result_format: i16,
48) -> PgResult<usize> {
49    let needed = PgEncoder::bind_execute_sync_wire_len_with_formats(
50        statement,
51        params,
52        PgEncoder::FORMAT_TEXT,
53        result_format,
54    )
55    .map_err(|e| PgError::Encode(e.to_string()))?;
56    Ok(needed)
57}
58
59#[inline]
60fn reserve_prepared_single_write_buf(
61    conn: &mut PgConnection,
62    stmt: &super::PreparedStatement,
63    params: &[Option<Vec<u8>>],
64    result_format: i16,
65) -> PgResult<()> {
66    conn.write_buf.clear();
67    let needed = prepared_bind_execute_sync_wire_len(&stmt.name, params, result_format)?;
68    conn.write_buf.reserve(needed);
69    Ok(())
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73enum SimpleStatementState {
74    AwaitingResult,
75    InRowStream,
76}
77
78#[derive(Debug, Clone, Copy)]
79struct SimpleFlowTracker {
80    state: SimpleStatementState,
81    saw_completion: bool,
82}
83
84impl SimpleFlowTracker {
85    fn new() -> Self {
86        Self {
87            state: SimpleStatementState::AwaitingResult,
88            saw_completion: false,
89        }
90    }
91
92    fn on_row_description(&mut self, context: &'static str) -> PgResult<()> {
93        if self.state == SimpleStatementState::InRowStream {
94            return Err(PgError::Protocol(format!(
95                "{}: duplicate RowDescription before statement completion",
96                context
97            )));
98        }
99        self.state = SimpleStatementState::InRowStream;
100        self.saw_completion = false;
101        Ok(())
102    }
103
104    fn on_data_row(&self, context: &'static str) -> PgResult<()> {
105        if self.state != SimpleStatementState::InRowStream {
106            return Err(PgError::Protocol(format!(
107                "{}: DataRow before RowDescription",
108                context
109            )));
110        }
111        Ok(())
112    }
113
114    fn on_command_complete(&mut self) {
115        self.state = SimpleStatementState::AwaitingResult;
116        self.saw_completion = true;
117    }
118
119    fn on_empty_query_response(&mut self, context: &'static str) -> PgResult<()> {
120        if self.state == SimpleStatementState::InRowStream {
121            return Err(PgError::Protocol(format!(
122                "{}: EmptyQueryResponse during active row stream",
123                context
124            )));
125        }
126        self.saw_completion = true;
127        Ok(())
128    }
129
130    fn on_ready_for_query(&self, context: &'static str, error_pending: bool) -> PgResult<()> {
131        if error_pending {
132            return Ok(());
133        }
134        if self.state == SimpleStatementState::InRowStream {
135            return Err(PgError::Protocol(format!(
136                "{}: ReadyForQuery before CommandComplete",
137                context
138            )));
139        }
140        if !self.saw_completion {
141            return Err(PgError::Protocol(format!(
142                "{}: ReadyForQuery before completion",
143                context
144            )));
145        }
146        Ok(())
147    }
148}
149
150impl PgConnection {
151    fn validate_param_type_arity(params: &[Option<Vec<u8>>], param_types: &[u32]) -> PgResult<()> {
152        if !param_types.is_empty() && param_types.len() != params.len() {
153            return Err(PgError::Encode(format!(
154                "parameter type count {} does not match parameter count {}",
155                param_types.len(),
156                params.len()
157            )));
158        }
159        Ok(())
160    }
161
162    /// Execute a query with binary parameters (crate-internal).
163    /// This uses the Extended Query Protocol (Parse/Bind/Execute/Sync):
164    /// - Parameters are sent as binary bytes, skipping the string layer
165    /// - No SQL injection possible - parameters are never interpolated
166    /// - Better performance via prepared statement reuse
167    pub(crate) async fn query(
168        &mut self,
169        sql: &str,
170        params: &[Option<Vec<u8>>],
171    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
172        self.query_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
173            .await
174    }
175
176    /// Execute a query with binary parameters and explicit result-column format.
177    pub(crate) async fn query_with_result_format(
178        &mut self,
179        sql: &str,
180        params: &[Option<Vec<u8>>],
181        result_format: i16,
182    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
183        let bytes = PgEncoder::encode_extended_query_with_result_format(sql, params, result_format)
184            .map_err(|e| PgError::Encode(e.to_string()))?;
185        self.write_all_with_timeout(&bytes, "stream write").await?;
186
187        let mut rows = Vec::new();
188
189        let mut error: Option<PgError> = None;
190        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
191
192        loop {
193            let msg = self.recv().await?;
194            if let Err(err) = flow.validate(&msg, "extended-query execute", error.is_some()) {
195                return return_with_desync(self, err);
196            }
197            match msg {
198                BackendMessage::ParseComplete => {}
199                BackendMessage::BindComplete => {}
200                BackendMessage::RowDescription(_) => {}
201                BackendMessage::DataRow(data) => {
202                    // Only collect rows if no error occurred
203                    if error.is_none() {
204                        rows.push(data);
205                    }
206                }
207                BackendMessage::CommandComplete(_) => {}
208                BackendMessage::NoData => {}
209                BackendMessage::ReadyForQuery(_) => {
210                    if let Some(err) = error {
211                        return Err(err);
212                    }
213                    return Ok(rows);
214                }
215                BackendMessage::ErrorResponse(err) => {
216                    if error.is_none() {
217                        error = Some(PgError::QueryServer(err.into()));
218                    }
219                }
220                msg if is_ignorable_session_message(&msg) => {}
221                other => {
222                    return return_with_desync(
223                        self,
224                        unexpected_backend_message("extended-query execute", &other),
225                    );
226                }
227            }
228        }
229    }
230
231    /// Execute an uncached query and drain completion without materializing rows.
232    pub async fn query_count(&mut self, sql: &str, params: &[Option<Vec<u8>>]) -> PgResult<()> {
233        self.query_count_with_param_types(sql, &[], params).await
234    }
235
236    /// Execute an uncached query with explicit PostgreSQL parameter type OIDs
237    /// and drain completion without materializing rows.
238    pub async fn query_count_with_param_types(
239        &mut self,
240        sql: &str,
241        param_types: &[u32],
242        params: &[Option<Vec<u8>>],
243    ) -> PgResult<()> {
244        Self::validate_param_type_arity(params, param_types)?;
245
246        self.write_buf.clear();
247        PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
248            .map_err(|e| PgError::Encode(e.to_string()))?;
249        PgEncoder::encode_bind_to(&mut self.write_buf, "", params)
250            .map_err(|e| PgError::Encode(e.to_string()))?;
251        PgEncoder::encode_execute_to(&mut self.write_buf);
252        PgEncoder::encode_sync_to(&mut self.write_buf);
253
254        self.flush_write_buf().await?;
255
256        let mut error: Option<PgError> = None;
257        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
258
259        loop {
260            match self.recv_msg_type_fast().await {
261                Ok(msg_type) => {
262                    if let Err(err) = flow.validate_msg_type(
263                        msg_type,
264                        "extended-query count execute",
265                        error.is_some(),
266                    ) {
267                        return return_with_desync(self, err);
268                    }
269                    match msg_type {
270                        b'1' | b'2' | b'T' | b'D' | b'C' | b'n' => {}
271                        b'Z' => {
272                            if let Some(err) = error {
273                                return Err(err);
274                            }
275                            return Ok(());
276                        }
277                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
278                        other => {
279                            return return_with_desync(
280                                self,
281                                unexpected_backend_msg_type("extended-query count execute", other),
282                            );
283                        }
284                    }
285                }
286                Err(e) => {
287                    if matches!(&e, PgError::QueryServer(_)) {
288                        capture_query_server_error(self, &mut error, e);
289                        continue;
290                    }
291                    return Err(e);
292                }
293            }
294        }
295    }
296
297    /// Execute a query with bind parameters and return rows with column metadata.
298    ///
299    /// Uses the Extended Query Protocol without prepared statement caching.
300    /// This is intended for raw SQL compatibility paths that still need
301    /// `PgRow` + `ColumnInfo` for name-aware JSON conversion.
302    pub async fn query_rows(
303        &mut self,
304        sql: &str,
305        params: &[Option<Vec<u8>>],
306    ) -> PgResult<Vec<super::PgRow>> {
307        self.query_rows_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
308            .await
309    }
310
311    /// Execute a query with bind parameters and explicit result-column format,
312    /// returning rows with column metadata.
313    pub async fn query_rows_with_result_format(
314        &mut self,
315        sql: &str,
316        params: &[Option<Vec<u8>>],
317        result_format: i16,
318    ) -> PgResult<Vec<super::PgRow>> {
319        self.query_rows_with_param_types_and_result_format(sql, &[], params, result_format)
320            .await
321    }
322
323    /// Execute an uncached query and stream zero-copy rows to `on_row`.
324    ///
325    /// This is the lowest-allocation raw-SQL path for large result sets when
326    /// the caller does not need `PgRow` materialization or column-name metadata.
327    pub async fn query_visit_bytes_rows_with_result_format<F>(
328        &mut self,
329        sql: &str,
330        params: &[Option<Vec<u8>>],
331        result_format: i16,
332        on_row: F,
333    ) -> PgResult<usize>
334    where
335        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
336    {
337        self.query_visit_bytes_rows_with_param_types_and_result_format(
338            sql,
339            &[],
340            params,
341            result_format,
342            on_row,
343        )
344        .await
345    }
346
347    /// Execute an uncached query and stream only the first column of each row.
348    pub async fn query_visit_first_column_bytes_with_result_format<F>(
349        &mut self,
350        sql: &str,
351        params: &[Option<Vec<u8>>],
352        result_format: i16,
353        on_value: F,
354    ) -> PgResult<usize>
355    where
356        F: FnMut(Option<&[u8]>) -> PgResult<()>,
357    {
358        self.query_visit_first_column_bytes_with_param_types_and_result_format(
359            sql,
360            &[],
361            params,
362            result_format,
363            on_value,
364        )
365        .await
366    }
367
368    /// Execute a query with explicit PostgreSQL parameter type OIDs and return
369    /// rows with column metadata.
370    pub async fn query_rows_with_param_types_and_result_format(
371        &mut self,
372        sql: &str,
373        param_types: &[u32],
374        params: &[Option<Vec<u8>>],
375        result_format: i16,
376    ) -> PgResult<Vec<super::PgRow>> {
377        use std::sync::Arc;
378
379        Self::validate_param_type_arity(params, param_types)?;
380
381        self.write_buf.clear();
382        PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
383            .map_err(|e| PgError::Encode(e.to_string()))?;
384        PgEncoder::encode_bind_to_with_result_format(
385            &mut self.write_buf,
386            "",
387            params,
388            result_format,
389        )
390        .map_err(|e| PgError::Encode(e.to_string()))?;
391        let describe_msg =
392            PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
393        self.write_buf.extend_from_slice(&describe_msg);
394        PgEncoder::encode_execute_to(&mut self.write_buf);
395        PgEncoder::encode_sync_to(&mut self.write_buf);
396        self.flush_write_buf().await?;
397
398        let mut rows: Vec<super::PgRow> = Vec::new();
399        let mut column_info: Option<Arc<super::ColumnInfo>> = None;
400        let mut error: Option<PgError> = None;
401        let mut flow =
402            ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
403
404        loop {
405            let msg = self.recv().await?;
406            if let Err(err) = flow.validate(&msg, "extended-query rows execute", error.is_some()) {
407                return return_with_desync(self, err);
408            }
409            match msg {
410                BackendMessage::ParseComplete => {}
411                BackendMessage::BindComplete => {}
412                BackendMessage::RowDescription(fields) => {
413                    column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
414                }
415                BackendMessage::DataRow(data) => {
416                    if error.is_none() {
417                        rows.push(super::PgRow {
418                            columns: data,
419                            column_info: column_info.clone(),
420                        });
421                    }
422                }
423                BackendMessage::CommandComplete(_) => {}
424                BackendMessage::NoData => {}
425                BackendMessage::ReadyForQuery(_) => {
426                    if let Some(err) = error {
427                        return Err(err);
428                    }
429                    return Ok(rows);
430                }
431                BackendMessage::ErrorResponse(err) => {
432                    if error.is_none() {
433                        error = Some(PgError::QueryServer(err.into()));
434                    }
435                }
436                msg if is_ignorable_session_message(&msg) => {}
437                other => {
438                    return return_with_desync(
439                        self,
440                        unexpected_backend_message("extended-query rows execute", &other),
441                    );
442                }
443            }
444        }
445    }
446
447    /// Execute an uncached query with explicit PostgreSQL parameter types and
448    /// stream zero-copy rows to `on_row`.
449    ///
450    /// Rows are backed by one shared payload buffer plus column offsets, so the
451    /// callback must not hold references past the current invocation.
452    pub async fn query_visit_bytes_rows_with_param_types_and_result_format<F>(
453        &mut self,
454        sql: &str,
455        param_types: &[u32],
456        params: &[Option<Vec<u8>>],
457        result_format: i16,
458        mut on_row: F,
459    ) -> PgResult<usize>
460    where
461        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
462    {
463        Self::validate_param_type_arity(params, param_types)?;
464
465        self.write_buf.clear();
466        PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
467            .map_err(|e| PgError::Encode(e.to_string()))?;
468        PgEncoder::encode_bind_to_with_result_format(
469            &mut self.write_buf,
470            "",
471            params,
472            result_format,
473        )
474        .map_err(|e| PgError::Encode(e.to_string()))?;
475        PgEncoder::encode_execute_to(&mut self.write_buf);
476        PgEncoder::encode_sync_to(&mut self.write_buf);
477
478        self.flush_write_buf().await?;
479
480        let mut row_count = 0usize;
481        let mut row = super::PgBytesRow::default();
482        let mut error: Option<PgError> = None;
483        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
484
485        loop {
486            match self.recv_fill_zerocopy_row_fast(&mut row).await {
487                Ok(msg_type) => {
488                    if let Err(err) = flow.validate_msg_type(
489                        msg_type,
490                        "extended-query visit bytes execute",
491                        error.is_some(),
492                    ) {
493                        return return_with_desync(self, err);
494                    }
495                    match msg_type {
496                        b'1' | b'2' | b'T' | b'n' => {}
497                        b'D' => {
498                            if error.is_none() {
499                                if let Err(err) = on_row(&row) {
500                                    return return_callback_error_with_desync(self, err);
501                                }
502                                row_count += 1;
503                                row.release_payload();
504                            }
505                        }
506                        b'C' => {}
507                        b'Z' => {
508                            if let Some(err) = error {
509                                return Err(err);
510                            }
511                            return Ok(row_count);
512                        }
513                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
514                        other => {
515                            return return_with_desync(
516                                self,
517                                unexpected_backend_msg_type(
518                                    "extended-query visit bytes execute",
519                                    other,
520                                ),
521                            );
522                        }
523                    }
524                }
525                Err(e) => {
526                    if matches!(&e, PgError::QueryServer(_)) {
527                        capture_query_server_error(self, &mut error, e);
528                        continue;
529                    }
530                    return Err(e);
531                }
532            }
533        }
534    }
535
536    /// Execute an uncached query with explicit PostgreSQL parameter types and
537    /// stream only the first column of each row.
538    pub async fn query_visit_first_column_bytes_with_param_types_and_result_format<F>(
539        &mut self,
540        sql: &str,
541        param_types: &[u32],
542        params: &[Option<Vec<u8>>],
543        result_format: i16,
544        mut on_value: F,
545    ) -> PgResult<usize>
546    where
547        F: FnMut(Option<&[u8]>) -> PgResult<()>,
548    {
549        Self::validate_param_type_arity(params, param_types)?;
550
551        self.write_buf.clear();
552        PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
553            .map_err(|e| PgError::Encode(e.to_string()))?;
554        PgEncoder::encode_bind_to_with_result_format(
555            &mut self.write_buf,
556            "",
557            params,
558            result_format,
559        )
560        .map_err(|e| PgError::Encode(e.to_string()))?;
561        PgEncoder::encode_execute_to(&mut self.write_buf);
562        PgEncoder::encode_sync_to(&mut self.write_buf);
563
564        self.flush_write_buf().await?;
565
566        let mut row_count = 0usize;
567        let mut first_column: Option<Bytes> = None;
568        let mut error: Option<PgError> = None;
569        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
570
571        loop {
572            match self
573                .recv_fill_first_column_zerocopy_fast(&mut first_column)
574                .await
575            {
576                Ok(msg_type) => {
577                    if let Err(err) = flow.validate_msg_type(
578                        msg_type,
579                        "extended-query visit first-column execute",
580                        error.is_some(),
581                    ) {
582                        return return_with_desync(self, err);
583                    }
584                    match msg_type {
585                        b'1' | b'2' | b'T' | b'n' => {}
586                        b'D' => {
587                            if error.is_none() {
588                                if let Err(err) = on_value(first_column.as_deref()) {
589                                    return return_callback_error_with_desync(self, err);
590                                }
591                                row_count += 1;
592                                first_column = None;
593                            }
594                        }
595                        b'C' => {}
596                        b'Z' => {
597                            if let Some(err) = error {
598                                return Err(err);
599                            }
600                            return Ok(row_count);
601                        }
602                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
603                        other => {
604                            return return_with_desync(
605                                self,
606                                unexpected_backend_msg_type(
607                                    "extended-query visit first-column execute",
608                                    other,
609                                ),
610                            );
611                        }
612                    }
613                }
614                Err(e) => {
615                    if matches!(&e, PgError::QueryServer(_)) {
616                        capture_query_server_error(self, &mut error, e);
617                        continue;
618                    }
619                    return Err(e);
620                }
621            }
622        }
623    }
624
625    /// Validate a query with explicit PostgreSQL parameter type OIDs without
626    /// executing it. Uses Parse + Bind + Describe(Portal) + Sync.
627    pub async fn probe_query_with_param_types(
628        &mut self,
629        sql: &str,
630        param_types: &[u32],
631        params: &[Option<Vec<u8>>],
632    ) -> PgResult<()> {
633        Self::validate_param_type_arity(params, param_types)?;
634
635        let parse = PgEncoder::try_encode_parse("", sql, param_types)
636            .map_err(|e| PgError::Encode(e.to_string()))?;
637        let bind =
638            PgEncoder::encode_bind("", "", params).map_err(|e| PgError::Encode(e.to_string()))?;
639        let describe =
640            PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
641        let sync = PgEncoder::encode_sync();
642        let mut bytes =
643            BytesMut::with_capacity(parse.len() + bind.len() + describe.len() + sync.len());
644        bytes.extend_from_slice(&parse);
645        bytes.extend_from_slice(&bind);
646        bytes.extend_from_slice(&describe);
647        bytes.extend_from_slice(&sync);
648        self.write_all_with_timeout(&bytes, "stream write").await?;
649
650        let mut saw_describe_response = false;
651        let mut error: Option<PgError> = None;
652        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal());
653
654        loop {
655            let msg = self.recv().await?;
656            if let Err(err) = flow.validate(&msg, "extended-query probe", error.is_some()) {
657                return return_with_desync(self, err);
658            }
659            match msg {
660                BackendMessage::ParseComplete => {}
661                BackendMessage::BindComplete => {}
662                BackendMessage::RowDescription(_) | BackendMessage::NoData => {
663                    saw_describe_response = true;
664                }
665                BackendMessage::ReadyForQuery(_) => {
666                    if let Some(err) = error {
667                        return Err(err);
668                    }
669                    if !saw_describe_response {
670                        return return_with_desync(
671                            self,
672                            PgError::Protocol(
673                                "extended-query probe finished without RowDescription/NoData"
674                                    .to_string(),
675                            ),
676                        );
677                    }
678                    return Ok(());
679                }
680                BackendMessage::ErrorResponse(err) => {
681                    if error.is_none() {
682                        error = Some(PgError::QueryServer(err.into()));
683                    }
684                }
685                msg if is_ignorable_session_message(&msg) => {}
686                other => {
687                    return return_with_desync(
688                        self,
689                        unexpected_backend_message("extended-query probe", &other),
690                    );
691                }
692            }
693        }
694    }
695
696    /// Execute a query with cached prepared statement.
697    /// Like `query()`, but reuses prepared statements across calls.
698    /// The statement name is derived from a hash of the SQL text.
699    /// OPTIMIZED: Pre-allocated buffer + ultra-fast encoders.
700    pub async fn query_cached(
701        &mut self,
702        sql: &str,
703        params: &[Option<Vec<u8>>],
704    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
705        self.query_cached_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
706            .await
707    }
708
709    /// Execute a query with cached prepared statement and explicit result-column format.
710    pub async fn query_cached_with_result_format(
711        &mut self,
712        sql: &str,
713        params: &[Option<Vec<u8>>],
714        result_format: i16,
715    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
716        let mut retried = false;
717        loop {
718            match self
719                .query_cached_with_result_format_once(sql, params, result_format)
720                .await
721            {
722                Ok(rows) => return Ok(rows),
723                Err(err)
724                    if !retried
725                        && (err.is_prepared_statement_retryable()
726                            || err.is_prepared_statement_already_exists()) =>
727                {
728                    retried = true;
729                    if err.is_prepared_statement_retryable() {
730                        self.clear_prepared_statement_state();
731                    }
732                }
733                Err(err) => return Err(err),
734            }
735        }
736    }
737
738    async fn query_cached_with_result_format_once(
739        &mut self,
740        sql: &str,
741        params: &[Option<Vec<u8>>],
742        result_format: i16,
743    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
744        let stmt_name = Self::sql_to_stmt_name(sql);
745        let is_new = !self.prepared_statements.contains_key(&stmt_name);
746
747        let needed = prepared_bind_execute_sync_wire_len(&stmt_name, params, result_format)?;
748        let mut buf = BytesMut::with_capacity(needed);
749
750        if is_new {
751            // Evict LRU prepared statement if at capacity. This prevents
752            // unbounded memory growth from dynamic batch filters while
753            // preserving hot statements (unlike the old nuclear `.clear()`).
754            self.evict_prepared_if_full();
755            if let Err(e) = PgEncoder::try_encode_parse_to(&mut buf, &stmt_name, sql, &[]) {
756                return Err(PgError::Encode(e.to_string()));
757            }
758            // Cache the SQL for debugging
759            self.prepared_statements
760                .insert(stmt_name.clone(), sql.to_string());
761        }
762
763        // Use ULTRA-OPTIMIZED encoders - write directly to buffer
764        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
765            &mut buf,
766            &stmt_name,
767            params,
768            result_format,
769        ) {
770            if is_new {
771                self.prepared_statements.remove(&stmt_name);
772            }
773            return Err(PgError::Encode(e.to_string()));
774        }
775        PgEncoder::encode_execute_to(&mut buf);
776        PgEncoder::encode_sync_to(&mut buf);
777
778        if let Err(err) = self.write_all_with_timeout(&buf, "stream write").await {
779            if is_new {
780                self.prepared_statements.remove(&stmt_name);
781            }
782            return Err(err);
783        }
784
785        let mut rows = Vec::new();
786
787        let mut error: Option<PgError> = None;
788        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(is_new));
789
790        loop {
791            let msg = match self.recv().await {
792                Ok(msg) => msg,
793                Err(err) => {
794                    if is_new && !flow.saw_parse_complete() {
795                        self.prepared_statements.remove(&stmt_name);
796                    }
797                    return Err(err);
798                }
799            };
800            if let Err(err) = flow.validate(&msg, "extended-query cached execute", error.is_some())
801            {
802                if is_new && !flow.saw_parse_complete() {
803                    self.prepared_statements.remove(&stmt_name);
804                }
805                return return_with_desync(self, err);
806            }
807            match msg {
808                BackendMessage::ParseComplete => {
809                    // Already cached in is_new block above.
810                }
811                BackendMessage::BindComplete => {}
812                BackendMessage::RowDescription(_) => {}
813                BackendMessage::DataRow(data) => {
814                    if error.is_none() {
815                        rows.push(data);
816                    }
817                }
818                BackendMessage::CommandComplete(_) => {}
819                BackendMessage::NoData => {}
820                BackendMessage::ReadyForQuery(_) => {
821                    if let Some(err) = error {
822                        if is_new
823                            && !flow.saw_parse_complete()
824                            && !err.is_prepared_statement_already_exists()
825                        {
826                            self.prepared_statements.remove(&stmt_name);
827                        }
828                        return Err(err);
829                    }
830                    if is_new && !flow.saw_parse_complete() {
831                        self.prepared_statements.remove(&stmt_name);
832                        return return_with_desync(
833                            self,
834                            PgError::Protocol(
835                                "Cache miss query reached ReadyForQuery without ParseComplete"
836                                    .to_string(),
837                            ),
838                        );
839                    }
840                    return Ok(rows);
841                }
842                BackendMessage::ErrorResponse(err) => {
843                    if error.is_none() {
844                        let query_err = PgError::QueryServer(err.into());
845                        if !query_err.is_prepared_statement_already_exists() {
846                            // Invalidate cache to prevent stale local mapping after parse failure.
847                            self.prepared_statements.remove(&stmt_name);
848                        }
849                        error = Some(query_err);
850                    }
851                }
852                msg if is_ignorable_session_message(&msg) => {}
853                other => {
854                    if is_new && !flow.saw_parse_complete() {
855                        self.prepared_statements.remove(&stmt_name);
856                    }
857                    return return_with_desync(
858                        self,
859                        unexpected_backend_message("extended-query cached execute", &other),
860                    );
861                }
862            }
863        }
864    }
865
866    /// Generate a statement name from SQL hash.
867    /// Uses a simple hash to create a unique name like "stmt_12345abc".
868    pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
869        use std::collections::hash_map::DefaultHasher;
870        use std::hash::{Hash, Hasher};
871
872        let mut hasher = DefaultHasher::new();
873        sql.hash(&mut hasher);
874        format!("s{:016x}", hasher.finish())
875    }
876
877    /// Execute a simple SQL statement (no parameters).
878    pub async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
879        let bytes = PgEncoder::try_encode_query_string(sql)?;
880        self.write_all_with_timeout(&bytes, "stream write").await?;
881
882        let mut error: Option<PgError> = None;
883        let mut flow = SimpleFlowTracker::new();
884
885        loop {
886            let msg = self.recv().await?;
887            match msg {
888                BackendMessage::RowDescription(_) => {
889                    // Some callers use execute_simple() with session-shaping SQL that
890                    // can legally return rows (e.g., SELECT set_config(...)).
891                    // Drain and ignore row data while preserving protocol ordering checks.
892                    if let Err(err) = flow.on_row_description("simple-query execute") {
893                        return return_with_desync(self, err);
894                    }
895                }
896                BackendMessage::DataRow(_) => {
897                    if let Err(err) = flow.on_data_row("simple-query execute") {
898                        return return_with_desync(self, err);
899                    }
900                }
901                BackendMessage::CommandComplete(_) => {
902                    flow.on_command_complete();
903                }
904                BackendMessage::EmptyQueryResponse => {
905                    if let Err(err) = flow.on_empty_query_response("simple-query execute") {
906                        return return_with_desync(self, err);
907                    }
908                }
909                BackendMessage::ReadyForQuery(_) => {
910                    if let Some(err) = error {
911                        return Err(err);
912                    }
913                    if let Err(err) =
914                        flow.on_ready_for_query("simple-query execute", error.is_some())
915                    {
916                        return return_with_desync(self, err);
917                    }
918                    return Ok(());
919                }
920                BackendMessage::ErrorResponse(err) => {
921                    if error.is_none() {
922                        error = Some(PgError::QueryServer(err.into()));
923                    }
924                }
925                msg if is_ignorable_session_message(&msg) => {}
926                other => {
927                    return return_with_desync(
928                        self,
929                        unexpected_backend_message("simple-query execute", &other),
930                    );
931                }
932            }
933        }
934    }
935
936    /// Execute a simple SQL query and return rows (Simple Query Protocol).
937    ///
938    /// Unlike `execute_simple`, this collects and returns data rows.
939    /// Used for branch management and other administrative queries.
940    ///
941    /// SECURITY: Capped at 10,000 rows to prevent OOM from unbounded results.
942    pub async fn simple_query(&mut self, sql: &str) -> PgResult<Vec<super::PgRow>> {
943        use std::sync::Arc;
944
945        /// Safety cap to prevent OOM from unbounded result accumulation.
946        /// Simple Query Protocol has no streaming; all rows are buffered in memory.
947        const MAX_SIMPLE_QUERY_ROWS: usize = 10_000;
948
949        let bytes = PgEncoder::try_encode_query_string(sql)?;
950        self.write_all_with_timeout(&bytes, "stream write").await?;
951
952        let mut rows: Vec<super::PgRow> = Vec::new();
953        let mut column_info: Option<Arc<super::ColumnInfo>> = None;
954        let mut error: Option<PgError> = None;
955        let mut flow = SimpleFlowTracker::new();
956
957        loop {
958            let msg = self.recv().await?;
959            match msg {
960                BackendMessage::RowDescription(fields) => {
961                    if let Err(err) = flow.on_row_description("simple-query read") {
962                        return return_with_desync(self, err);
963                    }
964                    column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
965                }
966                BackendMessage::DataRow(data) => {
967                    if let Err(err) = flow.on_data_row("simple-query read") {
968                        return return_with_desync(self, err);
969                    }
970                    if error.is_none() {
971                        if rows.len() >= MAX_SIMPLE_QUERY_ROWS {
972                            if error.is_none() {
973                                error = Some(PgError::Query(format!(
974                                    "simple_query exceeded {} row safety cap",
975                                    MAX_SIMPLE_QUERY_ROWS,
976                                )));
977                            }
978                            // Continue draining to reach ReadyForQuery
979                        } else {
980                            rows.push(super::PgRow {
981                                columns: data,
982                                column_info: column_info.clone(),
983                            });
984                        }
985                    }
986                }
987                BackendMessage::CommandComplete(_) => {
988                    flow.on_command_complete();
989                    column_info = None;
990                }
991                BackendMessage::EmptyQueryResponse => {
992                    if let Err(err) = flow.on_empty_query_response("simple-query read") {
993                        return return_with_desync(self, err);
994                    }
995                    column_info = None;
996                }
997                BackendMessage::ReadyForQuery(_) => {
998                    if let Some(err) = error {
999                        return Err(err);
1000                    }
1001                    if let Err(err) = flow.on_ready_for_query("simple-query read", error.is_some())
1002                    {
1003                        return return_with_desync(self, err);
1004                    }
1005                    return Ok(rows);
1006                }
1007                BackendMessage::ErrorResponse(err) => {
1008                    if error.is_none() {
1009                        error = Some(PgError::QueryServer(err.into()));
1010                    }
1011                }
1012                msg if is_ignorable_session_message(&msg) => {}
1013                other => {
1014                    return return_with_desync(
1015                        self,
1016                        unexpected_backend_message("simple-query read", &other),
1017                    );
1018                }
1019            }
1020        }
1021    }
1022
1023    /// ZERO-HASH sequential query using pre-computed PreparedStatement.
1024    /// This is the FASTEST sequential path because it skips:
1025    /// - SQL generation from AST (done once outside loop)
1026    /// - Hash computation for statement name (pre-computed in PreparedStatement)
1027    /// - HashMap lookup for is_new check (statement already prepared)
1028    /// # Example
1029    /// ```ignore
1030    /// let stmt = conn.prepare("SELECT * FROM users WHERE id = $1").await?;
1031    /// for id in 1..10000 {
1032    ///     let rows = conn.query_prepared_single(&stmt, &[Some(id.to_string().into_bytes())]).await?;
1033    /// }
1034    /// ```
1035    #[inline]
1036    pub async fn query_prepared_single(
1037        &mut self,
1038        stmt: &super::PreparedStatement,
1039        params: &[Option<Vec<u8>>],
1040    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1041        self.query_prepared_single_with_result_format(stmt, params, PgEncoder::FORMAT_TEXT)
1042            .await
1043    }
1044
1045    /// ZERO-HASH sequential prepared execution that drains rows without
1046    /// materializing them.
1047    ///
1048    /// Useful for throughput-focused paths where only protocol completion
1049    /// matters and result payload is intentionally ignored.
1050    #[inline]
1051    pub async fn query_prepared_single_count(
1052        &mut self,
1053        stmt: &super::PreparedStatement,
1054        params: &[Option<Vec<u8>>],
1055    ) -> PgResult<()> {
1056        self.write_buf.clear();
1057        PgEncoder::encode_bind_to(&mut self.write_buf, &stmt.name, params)
1058            .map_err(|e| PgError::Encode(e.to_string()))?;
1059        PgEncoder::encode_execute_to(&mut self.write_buf);
1060        PgEncoder::encode_sync_to(&mut self.write_buf);
1061
1062        self.flush_write_buf().await?;
1063
1064        let mut error: Option<PgError> = None;
1065        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1066
1067        loop {
1068            match self.recv_msg_type_fast().await {
1069                Ok(msg_type) => {
1070                    if let Err(err) = flow.validate_msg_type(
1071                        msg_type,
1072                        "prepared single count execute",
1073                        error.is_some(),
1074                    ) {
1075                        return return_with_desync(self, err);
1076                    }
1077                    match msg_type {
1078                        b'2' | b'T' | b'D' | b'C' | b'n' => {}
1079                        b'Z' => {
1080                            if let Some(err) = error {
1081                                return Err(err);
1082                            }
1083                            return Ok(());
1084                        }
1085                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1086                        other => {
1087                            return return_with_desync(
1088                                self,
1089                                unexpected_backend_msg_type("prepared single count execute", other),
1090                            );
1091                        }
1092                    }
1093                }
1094                Err(e) => {
1095                    if matches!(&e, PgError::QueryServer(_)) {
1096                        capture_query_server_error(self, &mut error, e);
1097                        continue;
1098                    }
1099                    return Err(e);
1100                }
1101            }
1102        }
1103    }
1104
1105    /// ZERO-HASH sequential query with explicit result-column format.
1106    #[inline]
1107    pub async fn query_prepared_single_with_result_format(
1108        &mut self,
1109        stmt: &super::PreparedStatement,
1110        params: &[Option<Vec<u8>>],
1111        result_format: i16,
1112    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1113        let needed = prepared_bind_execute_sync_wire_len(&stmt.name, params, result_format)?;
1114        let mut buf = BytesMut::with_capacity(needed);
1115
1116        // ZERO HASH, ZERO LOOKUP - just encode and send!
1117        PgEncoder::encode_bind_to_with_result_format(&mut buf, &stmt.name, params, result_format)
1118            .map_err(|e| PgError::Encode(e.to_string()))?;
1119        PgEncoder::encode_execute_to(&mut buf);
1120        PgEncoder::encode_sync_to(&mut buf);
1121
1122        self.write_all_with_timeout(&buf, "stream write").await?;
1123
1124        let mut rows = Vec::new();
1125
1126        let mut error: Option<PgError> = None;
1127        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1128
1129        loop {
1130            let msg = self.recv().await?;
1131            if let Err(err) = flow.validate(&msg, "prepared single execute", error.is_some()) {
1132                return return_with_desync(self, err);
1133            }
1134            match msg {
1135                BackendMessage::BindComplete => {}
1136                BackendMessage::RowDescription(_) => {}
1137                BackendMessage::DataRow(data) => {
1138                    if error.is_none() {
1139                        rows.push(data);
1140                    }
1141                }
1142                BackendMessage::CommandComplete(_) => {}
1143                BackendMessage::NoData => {}
1144                BackendMessage::ReadyForQuery(_) => {
1145                    if let Some(err) = error {
1146                        return Err(err);
1147                    }
1148                    return Ok(rows);
1149                }
1150                BackendMessage::ErrorResponse(err) => {
1151                    if error.is_none() {
1152                        error = Some(PgError::QueryServer(err.into()));
1153                    }
1154                }
1155                msg if is_ignorable_session_message(&msg) => {}
1156                other => {
1157                    return return_with_desync(
1158                        self,
1159                        unexpected_backend_message("prepared single execute", &other),
1160                    );
1161                }
1162            }
1163        }
1164    }
1165
1166    /// ZERO-HASH sequential query with explicit result-column format using
1167    /// reusable connection buffers (avoids per-call `BytesMut` allocation).
1168    #[inline]
1169    pub async fn query_prepared_single_reuse_with_result_format(
1170        &mut self,
1171        stmt: &super::PreparedStatement,
1172        params: &[Option<Vec<u8>>],
1173        result_format: i16,
1174    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1175        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1176
1177        PgEncoder::encode_bind_to_with_result_format(
1178            &mut self.write_buf,
1179            &stmt.name,
1180            params,
1181            result_format,
1182        )
1183        .map_err(|e| PgError::Encode(e.to_string()))?;
1184        PgEncoder::encode_execute_to(&mut self.write_buf);
1185        PgEncoder::encode_sync_to(&mut self.write_buf);
1186
1187        self.flush_write_buf().await?;
1188
1189        let mut rows = Vec::with_capacity(32);
1190        let mut error: Option<PgError> = None;
1191        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1192
1193        loop {
1194            let msg = self.recv().await?;
1195            if let Err(err) = flow.validate(&msg, "prepared single reuse execute", error.is_some())
1196            {
1197                return return_with_desync(self, err);
1198            }
1199            match msg {
1200                BackendMessage::BindComplete => {}
1201                BackendMessage::RowDescription(_) => {}
1202                BackendMessage::DataRow(data) => {
1203                    if error.is_none() {
1204                        rows.push(data);
1205                    }
1206                }
1207                BackendMessage::CommandComplete(_) => {}
1208                BackendMessage::NoData => {}
1209                BackendMessage::ReadyForQuery(_) => {
1210                    if let Some(err) = error {
1211                        return Err(err);
1212                    }
1213                    return Ok(rows);
1214                }
1215                BackendMessage::ErrorResponse(err) => {
1216                    if error.is_none() {
1217                        error = Some(PgError::QueryServer(err.into()));
1218                    }
1219                }
1220                msg if is_ignorable_session_message(&msg) => {}
1221                other => {
1222                    return return_with_desync(
1223                        self,
1224                        unexpected_backend_message("prepared single reuse execute", &other),
1225                    );
1226                }
1227            }
1228        }
1229    }
1230
1231    /// Sequential prepared query using reusable connection buffers and row visitor.
1232    ///
1233    /// Rows are streamed to `on_row` as owned column buffers, avoiding
1234    /// materializing the full result set.
1235    #[inline]
1236    pub async fn query_prepared_single_reuse_visit_rows_with_result_format<F>(
1237        &mut self,
1238        stmt: &super::PreparedStatement,
1239        params: &[Option<Vec<u8>>],
1240        result_format: i16,
1241        mut on_row: F,
1242    ) -> PgResult<usize>
1243    where
1244        F: FnMut(&[Option<Vec<u8>>]) -> PgResult<()>,
1245    {
1246        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1247
1248        PgEncoder::encode_bind_to_with_result_format(
1249            &mut self.write_buf,
1250            &stmt.name,
1251            params,
1252            result_format,
1253        )
1254        .map_err(|e| PgError::Encode(e.to_string()))?;
1255        PgEncoder::encode_execute_to(&mut self.write_buf);
1256        PgEncoder::encode_sync_to(&mut self.write_buf);
1257
1258        self.flush_write_buf().await?;
1259
1260        let mut row_count = 0usize;
1261        let mut row_buf: Vec<Option<Vec<u8>>> = Vec::new();
1262        let mut error: Option<PgError> = None;
1263        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1264
1265        loop {
1266            match self.recv_fill_data_row_fast(&mut row_buf).await {
1267                Ok(msg_type) => {
1268                    if let Err(err) = flow.validate_msg_type(
1269                        msg_type,
1270                        "prepared single reuse visit execute",
1271                        error.is_some(),
1272                    ) {
1273                        return return_with_desync(self, err);
1274                    }
1275                    match msg_type {
1276                        b'2' | b'T' | b'n' => {}
1277                        b'D' => {
1278                            if error.is_none() {
1279                                if let Err(err) = on_row(row_buf.as_slice()) {
1280                                    return return_callback_error_with_desync(self, err);
1281                                }
1282                                row_count += 1;
1283                            }
1284                        }
1285                        b'C' => {}
1286                        b'Z' => {
1287                            if let Some(err) = error {
1288                                return Err(err);
1289                            }
1290                            return Ok(row_count);
1291                        }
1292                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1293                        other => {
1294                            return return_with_desync(
1295                                self,
1296                                unexpected_backend_msg_type(
1297                                    "prepared single reuse visit execute",
1298                                    other,
1299                                ),
1300                            );
1301                        }
1302                    }
1303                }
1304                Err(e) => {
1305                    if matches!(&e, PgError::QueryServer(_)) {
1306                        capture_query_server_error(self, &mut error, e);
1307                        continue;
1308                    }
1309                    return Err(e);
1310                }
1311            }
1312        }
1313    }
1314
1315    /// Sequential prepared query using reusable connection buffers and zero-copy row visitor.
1316    ///
1317    /// Rows are backed by a shared payload buffer plus column offsets, avoiding
1318    /// per-cell byte copies during receive.
1319    #[inline]
1320    pub async fn query_prepared_single_reuse_visit_bytes_rows_with_result_format<F>(
1321        &mut self,
1322        stmt: &super::PreparedStatement,
1323        params: &[Option<Vec<u8>>],
1324        result_format: i16,
1325        mut on_row: F,
1326    ) -> PgResult<usize>
1327    where
1328        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1329    {
1330        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1331
1332        PgEncoder::encode_bind_to_with_result_format(
1333            &mut self.write_buf,
1334            &stmt.name,
1335            params,
1336            result_format,
1337        )
1338        .map_err(|e| PgError::Encode(e.to_string()))?;
1339        PgEncoder::encode_execute_to(&mut self.write_buf);
1340        PgEncoder::encode_sync_to(&mut self.write_buf);
1341
1342        self.flush_write_buf().await?;
1343
1344        let mut row_count = 0usize;
1345        let mut row = super::PgBytesRow::default();
1346        let mut error: Option<PgError> = None;
1347        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1348
1349        loop {
1350            match self.recv_fill_zerocopy_row_fast(&mut row).await {
1351                Ok(msg_type) => {
1352                    if let Err(err) = flow.validate_msg_type(
1353                        msg_type,
1354                        "prepared single reuse visit bytes execute",
1355                        error.is_some(),
1356                    ) {
1357                        return return_with_desync(self, err);
1358                    }
1359                    match msg_type {
1360                        b'2' | b'T' | b'n' => {}
1361                        b'D' => {
1362                            if error.is_none() {
1363                                if let Err(err) = on_row(&row) {
1364                                    return return_callback_error_with_desync(self, err);
1365                                }
1366                                row_count += 1;
1367                                row.release_payload();
1368                            }
1369                        }
1370                        b'C' => {}
1371                        b'Z' => {
1372                            if let Some(err) = error {
1373                                return Err(err);
1374                            }
1375                            return Ok(row_count);
1376                        }
1377                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1378                        other => {
1379                            return return_with_desync(
1380                                self,
1381                                unexpected_backend_msg_type(
1382                                    "prepared single reuse visit bytes execute",
1383                                    other,
1384                                ),
1385                            );
1386                        }
1387                    }
1388                }
1389                Err(e) => {
1390                    if matches!(&e, PgError::QueryServer(_)) {
1391                        capture_query_server_error(self, &mut error, e);
1392                        continue;
1393                    }
1394                    return Err(e);
1395                }
1396            }
1397        }
1398    }
1399
1400    /// Sequential prepared query using reusable buffers and first-column visitor.
1401    #[inline]
1402    pub async fn query_prepared_single_reuse_visit_first_column_bytes_with_result_format<F>(
1403        &mut self,
1404        stmt: &super::PreparedStatement,
1405        params: &[Option<Vec<u8>>],
1406        result_format: i16,
1407        mut on_value: F,
1408    ) -> PgResult<usize>
1409    where
1410        F: FnMut(Option<&[u8]>) -> PgResult<()>,
1411    {
1412        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1413
1414        PgEncoder::encode_bind_to_with_result_format(
1415            &mut self.write_buf,
1416            &stmt.name,
1417            params,
1418            result_format,
1419        )
1420        .map_err(|e| PgError::Encode(e.to_string()))?;
1421        PgEncoder::encode_execute_to(&mut self.write_buf);
1422        PgEncoder::encode_sync_to(&mut self.write_buf);
1423
1424        self.flush_write_buf().await?;
1425
1426        let mut row_count = 0usize;
1427        let mut first_column: Option<Bytes> = None;
1428        let mut error: Option<PgError> = None;
1429        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1430
1431        loop {
1432            match self
1433                .recv_fill_first_column_zerocopy_fast(&mut first_column)
1434                .await
1435            {
1436                Ok(msg_type) => {
1437                    if let Err(err) = flow.validate_msg_type(
1438                        msg_type,
1439                        "prepared single reuse visit first-column execute",
1440                        error.is_some(),
1441                    ) {
1442                        return return_with_desync(self, err);
1443                    }
1444                    match msg_type {
1445                        b'2' | b'T' | b'n' => {}
1446                        b'D' => {
1447                            if error.is_none() {
1448                                if let Err(err) = on_value(first_column.as_deref()) {
1449                                    return return_callback_error_with_desync(self, err);
1450                                }
1451                                row_count += 1;
1452                                first_column = None;
1453                            }
1454                        }
1455                        b'C' => {}
1456                        b'Z' => {
1457                            if let Some(err) = error {
1458                                return Err(err);
1459                            }
1460                            return Ok(row_count);
1461                        }
1462                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1463                        other => {
1464                            return return_with_desync(
1465                                self,
1466                                unexpected_backend_msg_type(
1467                                    "prepared single reuse visit first-column execute",
1468                                    other,
1469                                ),
1470                            );
1471                        }
1472                    }
1473                }
1474                Err(e) => {
1475                    if matches!(&e, PgError::QueryServer(_)) {
1476                        capture_query_server_error(self, &mut error, e);
1477                        continue;
1478                    }
1479                    return Err(e);
1480                }
1481            }
1482        }
1483    }
1484
1485    /// Sequential prepared query using reusable buffers and fixed 4-column visitor.
1486    #[inline]
1487    pub async fn query_prepared_single_reuse_visit_first_four_columns_bytes_with_result_format<F>(
1488        &mut self,
1489        stmt: &super::PreparedStatement,
1490        params: &[Option<Vec<u8>>],
1491        result_format: i16,
1492        mut on_row: F,
1493    ) -> PgResult<usize>
1494    where
1495        F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1496    {
1497        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1498
1499        PgEncoder::encode_bind_to_with_result_format(
1500            &mut self.write_buf,
1501            &stmt.name,
1502            params,
1503            result_format,
1504        )
1505        .map_err(|e| PgError::Encode(e.to_string()))?;
1506        PgEncoder::encode_execute_to(&mut self.write_buf);
1507        PgEncoder::encode_sync_to(&mut self.write_buf);
1508
1509        self.flush_write_buf().await?;
1510
1511        let mut row_count = 0usize;
1512        let mut columns = [None, None, None, None];
1513        let mut error: Option<PgError> = None;
1514        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1515
1516        loop {
1517            match self
1518                .recv_fill_first_four_columns_zerocopy_fast(&mut columns)
1519                .await
1520            {
1521                Ok(msg_type) => {
1522                    if let Err(err) = flow.validate_msg_type(
1523                        msg_type,
1524                        "prepared single reuse visit first-four execute",
1525                        error.is_some(),
1526                    ) {
1527                        return return_with_desync(self, err);
1528                    }
1529                    match msg_type {
1530                        b'2' | b'T' | b'n' => {}
1531                        b'D' => {
1532                            if error.is_none() {
1533                                if let Err(err) = on_row([
1534                                    columns[0].as_deref(),
1535                                    columns[1].as_deref(),
1536                                    columns[2].as_deref(),
1537                                    columns[3].as_deref(),
1538                                ]) {
1539                                    return return_callback_error_with_desync(self, err);
1540                                }
1541                                columns.fill(None);
1542                                row_count += 1;
1543                            }
1544                        }
1545                        b'C' => {}
1546                        b'Z' => {
1547                            if let Some(err) = error {
1548                                return Err(err);
1549                            }
1550                            return Ok(row_count);
1551                        }
1552                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1553                        other => {
1554                            return return_with_desync(
1555                                self,
1556                                unexpected_backend_msg_type(
1557                                    "prepared single reuse visit first-four execute",
1558                                    other,
1559                                ),
1560                            );
1561                        }
1562                    }
1563                }
1564                Err(e) => {
1565                    if matches!(&e, PgError::QueryServer(_)) {
1566                        capture_query_server_error(self, &mut error, e);
1567                        continue;
1568                    }
1569                    return Err(e);
1570                }
1571            }
1572        }
1573    }
1574
1575    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1576    ///
1577    /// `wire` must contain exactly one prepared-statement execution.
1578    #[inline]
1579    pub async fn query_prepared_single_encoded_visit_bytes_rows<F>(
1580        &mut self,
1581        wire: &[u8],
1582        on_row: F,
1583    ) -> PgResult<usize>
1584    where
1585        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1586    {
1587        let (row_count, _, _) = self
1588            .query_prepared_single_encoded_visit_bytes_rows_profiled(wire, on_row)
1589            .await?;
1590        Ok(row_count)
1591    }
1592
1593    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1594    ///
1595    /// Returns `(rows, send_elapsed, consume_elapsed)`.
1596    #[inline]
1597    pub async fn query_prepared_single_encoded_visit_bytes_rows_profiled<F>(
1598        &mut self,
1599        wire: &[u8],
1600        mut on_row: F,
1601    ) -> PgResult<(usize, Duration, Duration)>
1602    where
1603        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1604    {
1605        let send_start = Instant::now();
1606        self.send_bytes(wire).await?;
1607        let send_elapsed = send_start.elapsed();
1608        let consume_start = Instant::now();
1609
1610        let mut row_count = 0usize;
1611        let mut row = super::PgBytesRow::default();
1612        let mut error: Option<PgError> = None;
1613        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1614
1615        loop {
1616            match self.recv_fill_zerocopy_row_fast(&mut row).await {
1617                Ok(msg_type) => {
1618                    if let Err(err) = flow.validate_msg_type(
1619                        msg_type,
1620                        "prepared single encoded visit bytes execute",
1621                        error.is_some(),
1622                    ) {
1623                        return return_with_desync(self, err);
1624                    }
1625                    match msg_type {
1626                        b'2' | b'T' | b'n' => {}
1627                        b'D' => {
1628                            if error.is_none() {
1629                                if let Err(err) = on_row(&row) {
1630                                    return return_callback_error_with_desync(self, err);
1631                                }
1632                                row_count += 1;
1633                                row.release_payload();
1634                            }
1635                        }
1636                        b'C' => {}
1637                        b'Z' => {
1638                            if let Some(err) = error {
1639                                return Err(err);
1640                            }
1641                            return Ok((row_count, send_elapsed, consume_start.elapsed()));
1642                        }
1643                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1644                        other => {
1645                            return return_with_desync(
1646                                self,
1647                                unexpected_backend_msg_type(
1648                                    "prepared single encoded visit bytes execute",
1649                                    other,
1650                                ),
1651                            );
1652                        }
1653                    }
1654                }
1655                Err(e) => {
1656                    if matches!(&e, PgError::QueryServer(_)) {
1657                        capture_query_server_error(self, &mut error, e);
1658                        continue;
1659                    }
1660                    return Err(e);
1661                }
1662            }
1663        }
1664    }
1665
1666    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1667    #[inline]
1668    pub async fn query_prepared_single_encoded_visit_first_column_bytes<F>(
1669        &mut self,
1670        wire: &[u8],
1671        on_value: F,
1672    ) -> PgResult<usize>
1673    where
1674        F: FnMut(Option<&[u8]>) -> PgResult<()>,
1675    {
1676        let (row_count, _, _) = self
1677            .query_prepared_single_encoded_visit_first_column_bytes_profiled(wire, on_value)
1678            .await?;
1679        Ok(row_count)
1680    }
1681
1682    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1683    ///
1684    /// Returns `(rows, send_elapsed, consume_elapsed)`.
1685    #[inline]
1686    pub async fn query_prepared_single_encoded_visit_first_column_bytes_profiled<F>(
1687        &mut self,
1688        wire: &[u8],
1689        mut on_value: F,
1690    ) -> PgResult<(usize, Duration, Duration)>
1691    where
1692        F: FnMut(Option<&[u8]>) -> PgResult<()>,
1693    {
1694        let send_start = Instant::now();
1695        self.send_bytes(wire).await?;
1696        let send_elapsed = send_start.elapsed();
1697        let consume_start = Instant::now();
1698
1699        let mut row_count = 0usize;
1700        let mut first_column: Option<Bytes> = None;
1701        let mut error: Option<PgError> = None;
1702        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1703
1704        loop {
1705            match self
1706                .recv_fill_first_column_zerocopy_fast(&mut first_column)
1707                .await
1708            {
1709                Ok(msg_type) => {
1710                    if let Err(err) = flow.validate_msg_type(
1711                        msg_type,
1712                        "prepared single encoded visit first-column execute",
1713                        error.is_some(),
1714                    ) {
1715                        return return_with_desync(self, err);
1716                    }
1717                    match msg_type {
1718                        b'2' | b'T' | b'n' => {}
1719                        b'D' => {
1720                            if error.is_none() {
1721                                if let Err(err) = on_value(first_column.as_deref()) {
1722                                    return return_callback_error_with_desync(self, err);
1723                                }
1724                                row_count += 1;
1725                                first_column = None;
1726                            }
1727                        }
1728                        b'C' => {}
1729                        b'Z' => {
1730                            if let Some(err) = error {
1731                                return Err(err);
1732                            }
1733                            return Ok((row_count, send_elapsed, consume_start.elapsed()));
1734                        }
1735                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1736                        other => {
1737                            return return_with_desync(
1738                                self,
1739                                unexpected_backend_msg_type(
1740                                    "prepared single encoded visit first-column execute",
1741                                    other,
1742                                ),
1743                            );
1744                        }
1745                    }
1746                }
1747                Err(e) => {
1748                    if matches!(&e, PgError::QueryServer(_)) {
1749                        capture_query_server_error(self, &mut error, e);
1750                        continue;
1751                    }
1752                    return Err(e);
1753                }
1754            }
1755        }
1756    }
1757
1758    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1759    #[inline]
1760    pub async fn query_prepared_single_encoded_visit_first_four_columns_bytes<F>(
1761        &mut self,
1762        wire: &[u8],
1763        on_row: F,
1764    ) -> PgResult<usize>
1765    where
1766        F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1767    {
1768        let (row_count, _, _) = self
1769            .query_prepared_single_encoded_visit_first_four_columns_bytes_profiled(wire, on_row)
1770            .await?;
1771        Ok(row_count)
1772    }
1773
1774    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1775    ///
1776    /// Returns `(rows, send_elapsed, consume_elapsed)`.
1777    #[inline]
1778    pub async fn query_prepared_single_encoded_visit_first_four_columns_bytes_profiled<F>(
1779        &mut self,
1780        wire: &[u8],
1781        mut on_row: F,
1782    ) -> PgResult<(usize, Duration, Duration)>
1783    where
1784        F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1785    {
1786        let send_start = Instant::now();
1787        self.send_bytes(wire).await?;
1788        let send_elapsed = send_start.elapsed();
1789        let consume_start = Instant::now();
1790
1791        let mut row_count = 0usize;
1792        let mut columns = [None, None, None, None];
1793        let mut error: Option<PgError> = None;
1794        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1795
1796        loop {
1797            match self
1798                .recv_fill_first_four_columns_zerocopy_fast(&mut columns)
1799                .await
1800            {
1801                Ok(msg_type) => {
1802                    if let Err(err) = flow.validate_msg_type(
1803                        msg_type,
1804                        "prepared single encoded visit first-four execute",
1805                        error.is_some(),
1806                    ) {
1807                        return return_with_desync(self, err);
1808                    }
1809                    match msg_type {
1810                        b'2' | b'T' | b'n' => {}
1811                        b'D' => {
1812                            if error.is_none() {
1813                                if let Err(err) = on_row([
1814                                    columns[0].as_deref(),
1815                                    columns[1].as_deref(),
1816                                    columns[2].as_deref(),
1817                                    columns[3].as_deref(),
1818                                ]) {
1819                                    return return_callback_error_with_desync(self, err);
1820                                }
1821                                columns.fill(None);
1822                                row_count += 1;
1823                            }
1824                        }
1825                        b'C' => {}
1826                        b'Z' => {
1827                            if let Some(err) = error {
1828                                return Err(err);
1829                            }
1830                            return Ok((row_count, send_elapsed, consume_start.elapsed()));
1831                        }
1832                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1833                        other => {
1834                            return return_with_desync(
1835                                self,
1836                                unexpected_backend_msg_type(
1837                                    "prepared single encoded visit first-four execute",
1838                                    other,
1839                                ),
1840                            );
1841                        }
1842                    }
1843                }
1844                Err(e) => {
1845                    if matches!(&e, PgError::QueryServer(_)) {
1846                        capture_query_server_error(self, &mut error, e);
1847                        continue;
1848                    }
1849                    return Err(e);
1850                }
1851            }
1852        }
1853    }
1854}
1855
1856#[cfg(test)]
1857mod tests {
1858    use super::*;
1859
1860    #[cfg(unix)]
1861    fn test_conn_with_peer() -> (PgConnection, tokio::net::UnixStream) {
1862        use crate::driver::connection::StatementCache;
1863        use crate::driver::stream::PgStream;
1864        use bytes::BytesMut;
1865        use std::collections::{HashMap, VecDeque};
1866        use std::num::NonZeroUsize;
1867        use tokio::net::UnixStream;
1868
1869        let (unix_stream, peer) = UnixStream::pair().expect("unix stream pair");
1870        (
1871            PgConnection {
1872                stream: PgStream::Unix(unix_stream),
1873                buffer: BytesMut::with_capacity(1024),
1874                write_buf: BytesMut::with_capacity(1024),
1875                sql_buf: BytesMut::with_capacity(256),
1876                params_buf: Vec::new(),
1877                prepared_statements: HashMap::new(),
1878                stmt_cache: StatementCache::new(NonZeroUsize::new(2).expect("non-zero")),
1879                column_info_cache: HashMap::new(),
1880                process_id: 0,
1881                cancel_key_bytes: Vec::new(),
1882                requested_protocol_minor: PgConnection::default_protocol_minor(),
1883                negotiated_protocol_minor: PgConnection::default_protocol_minor(),
1884                notifications: VecDeque::new(),
1885                replication_stream_active: false,
1886                replication_mode_enabled: false,
1887                last_replication_wal_end: None,
1888                io_desynced: false,
1889                pending_statement_closes: Vec::new(),
1890                draining_statement_closes: false,
1891            },
1892            peer,
1893        )
1894    }
1895
1896    #[cfg(unix)]
1897    fn test_conn() -> PgConnection {
1898        test_conn_with_peer().0
1899    }
1900
1901    #[cfg(unix)]
1902    fn push_backend_frame(conn: &mut PgConnection, msg_type: u8, payload: &[u8]) {
1903        conn.buffer.extend_from_slice(&[msg_type]);
1904        conn.buffer
1905            .extend_from_slice(&((payload.len() + 4) as u32).to_be_bytes());
1906        conn.buffer.extend_from_slice(payload);
1907    }
1908
1909    #[test]
1910    fn prepared_buffer_sizing_rejects_too_many_params_before_allocation() {
1911        let params = vec![None; i16::MAX as usize + 1];
1912        let err = prepared_bind_execute_sync_wire_len("stmt", &params, PgEncoder::FORMAT_TEXT)
1913            .expect_err("parameter overflow must be rejected");
1914
1915        assert!(matches!(err, PgError::Encode(msg) if msg.contains("Too many parameters")));
1916    }
1917
1918    #[cfg(unix)]
1919    #[tokio::test]
1920    async fn streaming_callback_error_marks_query_connection_desynced() {
1921        let mut conn = test_conn();
1922
1923        let err = return_callback_error_with_desync::<()>(
1924            &mut conn,
1925            PgError::Query("consumer stopped".to_string()),
1926        )
1927        .expect_err("callback error should be returned");
1928
1929        assert!(matches!(err, PgError::Query(msg) if msg == "consumer stopped"));
1930        assert!(conn.is_io_desynced());
1931    }
1932
1933    #[cfg(unix)]
1934    #[tokio::test]
1935    async fn protocol_order_error_marks_query_connection_desynced() {
1936        let (mut conn, _peer) = test_conn_with_peer();
1937        push_backend_frame(&mut conn, b'D', &0i16.to_be_bytes());
1938        let stmt = super::super::PreparedStatement::from_sql("SELECT 1");
1939
1940        let err = conn
1941            .query_prepared_single_count(&stmt, &[])
1942            .await
1943            .expect_err("out-of-order DataRow must fail");
1944
1945        assert!(err.to_string().contains("DataRow before BindComplete"));
1946        assert!(conn.is_io_desynced());
1947    }
1948
1949    #[cfg(unix)]
1950    #[tokio::test]
1951    async fn simple_flow_error_marks_query_connection_desynced() {
1952        let (mut conn, _peer) = test_conn_with_peer();
1953        push_backend_frame(&mut conn, b'Z', b"I");
1954
1955        let err = conn
1956            .execute_simple("SELECT 1")
1957            .await
1958            .expect_err("ReadyForQuery before completion must fail");
1959
1960        assert!(err.to_string().contains("ReadyForQuery before completion"));
1961        assert!(conn.is_io_desynced());
1962    }
1963}