Skip to main content

qail_pg/driver/
query.rs

1//! Query execution methods for PostgreSQL connection.
2//!
3//! This module provides query, query_cached, and execute_simple.
4
5use super::{
6    PgConnection, PgError, PgResult,
7    extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8    is_ignorable_session_message, is_ignorable_session_msg_type, unexpected_backend_message,
9    unexpected_backend_msg_type,
10};
11use crate::protocol::{BackendMessage, PgEncoder};
12use bytes::{Bytes, BytesMut};
13use std::time::{Duration, Instant};
14
15#[inline]
16fn capture_query_server_error(conn: &mut PgConnection, slot: &mut Option<PgError>, err: PgError) {
17    if slot.is_some() {
18        return;
19    }
20    if err.is_prepared_statement_retryable() {
21        conn.clear_prepared_statement_state();
22    }
23    *slot = Some(err);
24}
25
26#[inline]
27fn return_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
28    if matches!(
29        err,
30        PgError::Protocol(_) | PgError::Connection(_) | PgError::Timeout(_)
31    ) {
32        conn.mark_io_desynced();
33    }
34    Err(err)
35}
36
37#[inline]
38fn return_callback_error_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
39    conn.mark_io_desynced();
40    Err(err)
41}
42
43#[inline]
44fn prepared_bind_execute_sync_wire_len(
45    statement: &str,
46    params: &[Option<Vec<u8>>],
47    result_format: i16,
48) -> PgResult<usize> {
49    let needed = PgEncoder::bind_execute_sync_wire_len_with_formats(
50        statement,
51        params,
52        PgEncoder::FORMAT_TEXT,
53        result_format,
54    )
55    .map_err(|e| PgError::Encode(e.to_string()))?;
56    Ok(needed)
57}
58
59#[inline]
60fn reserve_prepared_single_write_buf(
61    conn: &mut PgConnection,
62    stmt: &super::PreparedStatement,
63    params: &[Option<Vec<u8>>],
64    result_format: i16,
65) -> PgResult<()> {
66    conn.write_buf.clear();
67    let needed = prepared_bind_execute_sync_wire_len(&stmt.name, params, result_format)?;
68    conn.write_buf.reserve(needed);
69    Ok(())
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73enum SimpleStatementState {
74    AwaitingResult,
75    InRowStream,
76}
77
78#[derive(Debug, Clone, Copy)]
79struct SimpleFlowTracker {
80    state: SimpleStatementState,
81    saw_completion: bool,
82}
83
84impl SimpleFlowTracker {
85    fn new() -> Self {
86        Self {
87            state: SimpleStatementState::AwaitingResult,
88            saw_completion: false,
89        }
90    }
91
92    fn on_row_description(&mut self, context: &'static str) -> PgResult<()> {
93        if self.state == SimpleStatementState::InRowStream {
94            return Err(PgError::Protocol(format!(
95                "{}: duplicate RowDescription before statement completion",
96                context
97            )));
98        }
99        self.state = SimpleStatementState::InRowStream;
100        self.saw_completion = false;
101        Ok(())
102    }
103
104    fn on_data_row(&self, context: &'static str) -> PgResult<()> {
105        if self.state != SimpleStatementState::InRowStream {
106            return Err(PgError::Protocol(format!(
107                "{}: DataRow before RowDescription",
108                context
109            )));
110        }
111        Ok(())
112    }
113
114    fn on_command_complete(&mut self) {
115        self.state = SimpleStatementState::AwaitingResult;
116        self.saw_completion = true;
117    }
118
119    fn on_empty_query_response(&mut self, context: &'static str) -> PgResult<()> {
120        if self.state == SimpleStatementState::InRowStream {
121            return Err(PgError::Protocol(format!(
122                "{}: EmptyQueryResponse during active row stream",
123                context
124            )));
125        }
126        self.saw_completion = true;
127        Ok(())
128    }
129
130    fn on_ready_for_query(&self, context: &'static str, error_pending: bool) -> PgResult<()> {
131        if error_pending {
132            return Ok(());
133        }
134        if self.state == SimpleStatementState::InRowStream {
135            return Err(PgError::Protocol(format!(
136                "{}: ReadyForQuery before CommandComplete",
137                context
138            )));
139        }
140        if !self.saw_completion {
141            return Err(PgError::Protocol(format!(
142                "{}: ReadyForQuery before completion",
143                context
144            )));
145        }
146        Ok(())
147    }
148}
149
150impl PgConnection {
151    fn validate_param_type_arity(params: &[Option<Vec<u8>>], param_types: &[u32]) -> PgResult<()> {
152        if !param_types.is_empty() && param_types.len() != params.len() {
153            return Err(PgError::Encode(format!(
154                "parameter type count {} does not match parameter count {}",
155                param_types.len(),
156                params.len()
157            )));
158        }
159        Ok(())
160    }
161
162    /// Execute a query with binary parameters (crate-internal).
163    /// This uses the Extended Query Protocol (Parse/Bind/Execute/Sync):
164    /// - Parameters are sent as binary bytes, skipping the string layer
165    /// - No SQL injection possible - parameters are never interpolated
166    /// - Better performance via prepared statement reuse
167    pub(crate) async fn query(
168        &mut self,
169        sql: &str,
170        params: &[Option<Vec<u8>>],
171    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
172        self.query_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
173            .await
174    }
175
176    /// Execute a query with binary parameters and explicit result-column format.
177    pub(crate) async fn query_with_result_format(
178        &mut self,
179        sql: &str,
180        params: &[Option<Vec<u8>>],
181        result_format: i16,
182    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
183        let bytes = PgEncoder::encode_extended_query_with_result_format(sql, params, result_format)
184            .map_err(|e| PgError::Encode(e.to_string()))?;
185        self.write_all_with_timeout(&bytes, "stream write").await?;
186
187        let mut rows = Vec::new();
188
189        let mut error: Option<PgError> = None;
190        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
191
192        loop {
193            let msg = self.recv().await?;
194            if let Err(err) = flow.validate(&msg, "extended-query execute", error.is_some()) {
195                return return_with_desync(self, err);
196            }
197            match msg {
198                BackendMessage::ParseComplete => {}
199                BackendMessage::BindComplete => {}
200                BackendMessage::RowDescription(_) => {}
201                BackendMessage::DataRow(data) => {
202                    // Only collect rows if no error occurred
203                    if error.is_none() {
204                        rows.push(data);
205                    }
206                }
207                BackendMessage::CommandComplete(_) => {}
208                BackendMessage::NoData => {}
209                BackendMessage::ReadyForQuery(_) => {
210                    if let Some(err) = error {
211                        return Err(err);
212                    }
213                    return Ok(rows);
214                }
215                BackendMessage::ErrorResponse(err) => {
216                    if error.is_none() {
217                        error = Some(PgError::QueryServer(err.into()));
218                    }
219                }
220                msg if is_ignorable_session_message(&msg) => {}
221                other => {
222                    return return_with_desync(
223                        self,
224                        unexpected_backend_message("extended-query execute", &other),
225                    );
226                }
227            }
228        }
229    }
230
231    /// Execute an uncached query and drain completion without materializing rows.
232    pub async fn query_count(&mut self, sql: &str, params: &[Option<Vec<u8>>]) -> PgResult<()> {
233        self.query_count_with_param_types(sql, &[], params).await
234    }
235
236    /// Execute an uncached query with explicit PostgreSQL parameter type OIDs
237    /// and drain completion without materializing rows.
238    pub async fn query_count_with_param_types(
239        &mut self,
240        sql: &str,
241        param_types: &[u32],
242        params: &[Option<Vec<u8>>],
243    ) -> PgResult<()> {
244        Self::validate_param_type_arity(params, param_types)?;
245
246        self.write_buf.clear();
247        PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
248            .map_err(|e| PgError::Encode(e.to_string()))?;
249        PgEncoder::encode_bind_to(&mut self.write_buf, "", params)
250            .map_err(|e| PgError::Encode(e.to_string()))?;
251        PgEncoder::encode_execute_to(&mut self.write_buf);
252        PgEncoder::encode_sync_to(&mut self.write_buf);
253
254        self.flush_write_buf().await?;
255
256        let mut error: Option<PgError> = None;
257        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
258
259        loop {
260            match self.recv_msg_type_fast().await {
261                Ok(msg_type) => {
262                    if let Err(err) = flow.validate_msg_type(
263                        msg_type,
264                        "extended-query count execute",
265                        error.is_some(),
266                    ) {
267                        return return_with_desync(self, err);
268                    }
269                    match msg_type {
270                        b'1' | b'2' | b'T' | b'D' | b'C' | b'n' => {}
271                        b'Z' => {
272                            if let Some(err) = error {
273                                return Err(err);
274                            }
275                            return Ok(());
276                        }
277                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
278                        other => {
279                            return return_with_desync(
280                                self,
281                                unexpected_backend_msg_type("extended-query count execute", other),
282                            );
283                        }
284                    }
285                }
286                Err(e) => {
287                    if matches!(&e, PgError::QueryServer(_)) {
288                        capture_query_server_error(self, &mut error, e);
289                        continue;
290                    }
291                    return Err(e);
292                }
293            }
294        }
295    }
296
297    /// Execute a query with bind parameters and return rows with column metadata.
298    ///
299    /// Uses the Extended Query Protocol without prepared statement caching.
300    /// This is intended for raw SQL compatibility paths that still need
301    /// `PgRow` + `ColumnInfo` for name-aware JSON conversion.
302    pub async fn query_rows(
303        &mut self,
304        sql: &str,
305        params: &[Option<Vec<u8>>],
306    ) -> PgResult<Vec<super::PgRow>> {
307        self.query_rows_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
308            .await
309    }
310
311    /// Execute a query with bind parameters and explicit result-column format,
312    /// returning rows with column metadata.
313    pub async fn query_rows_with_result_format(
314        &mut self,
315        sql: &str,
316        params: &[Option<Vec<u8>>],
317        result_format: i16,
318    ) -> PgResult<Vec<super::PgRow>> {
319        self.query_rows_with_param_types_and_result_format(sql, &[], params, result_format)
320            .await
321    }
322
323    /// Execute an uncached query and stream zero-copy rows to `on_row`.
324    ///
325    /// This is the lowest-allocation raw-SQL path for large result sets when
326    /// the caller does not need `PgRow` materialization or column-name metadata.
327    pub async fn query_visit_bytes_rows_with_result_format<F>(
328        &mut self,
329        sql: &str,
330        params: &[Option<Vec<u8>>],
331        result_format: i16,
332        on_row: F,
333    ) -> PgResult<usize>
334    where
335        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
336    {
337        self.query_visit_bytes_rows_with_param_types_and_result_format(
338            sql,
339            &[],
340            params,
341            result_format,
342            on_row,
343        )
344        .await
345    }
346
347    /// Execute an uncached query and stream only the first column of each row.
348    pub async fn query_visit_first_column_bytes_with_result_format<F>(
349        &mut self,
350        sql: &str,
351        params: &[Option<Vec<u8>>],
352        result_format: i16,
353        on_value: F,
354    ) -> PgResult<usize>
355    where
356        F: FnMut(Option<&[u8]>) -> PgResult<()>,
357    {
358        self.query_visit_first_column_bytes_with_param_types_and_result_format(
359            sql,
360            &[],
361            params,
362            result_format,
363            on_value,
364        )
365        .await
366    }
367
368    /// Execute a query with explicit PostgreSQL parameter type OIDs and return
369    /// rows with column metadata.
370    pub async fn query_rows_with_param_types_and_result_format(
371        &mut self,
372        sql: &str,
373        param_types: &[u32],
374        params: &[Option<Vec<u8>>],
375        result_format: i16,
376    ) -> PgResult<Vec<super::PgRow>> {
377        use std::sync::Arc;
378
379        Self::validate_param_type_arity(params, param_types)?;
380
381        self.write_buf.clear();
382        PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
383            .map_err(|e| PgError::Encode(e.to_string()))?;
384        PgEncoder::encode_bind_to_with_result_format(
385            &mut self.write_buf,
386            "",
387            params,
388            result_format,
389        )
390        .map_err(|e| PgError::Encode(e.to_string()))?;
391        let describe_msg =
392            PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
393        self.write_buf.extend_from_slice(&describe_msg);
394        PgEncoder::encode_execute_to(&mut self.write_buf);
395        PgEncoder::encode_sync_to(&mut self.write_buf);
396        self.flush_write_buf().await?;
397
398        let mut rows: Vec<super::PgRow> = Vec::new();
399        let mut column_info: Option<Arc<super::ColumnInfo>> = None;
400        let mut error: Option<PgError> = None;
401        let mut flow =
402            ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
403
404        loop {
405            let msg = self.recv().await?;
406            if let Err(err) = flow.validate(&msg, "extended-query rows execute", error.is_some()) {
407                return return_with_desync(self, err);
408            }
409            match msg {
410                BackendMessage::ParseComplete => {}
411                BackendMessage::BindComplete => {}
412                BackendMessage::RowDescription(fields) => {
413                    column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
414                }
415                BackendMessage::DataRow(data) => {
416                    if error.is_none() {
417                        rows.push(super::PgRow {
418                            columns: data,
419                            column_info: column_info.clone(),
420                        });
421                    }
422                }
423                BackendMessage::CommandComplete(_) => {}
424                BackendMessage::NoData => {}
425                BackendMessage::ReadyForQuery(_) => {
426                    if let Some(err) = error {
427                        return Err(err);
428                    }
429                    return Ok(rows);
430                }
431                BackendMessage::ErrorResponse(err) => {
432                    if error.is_none() {
433                        error = Some(PgError::QueryServer(err.into()));
434                    }
435                }
436                msg if is_ignorable_session_message(&msg) => {}
437                other => {
438                    return return_with_desync(
439                        self,
440                        unexpected_backend_message("extended-query rows execute", &other),
441                    );
442                }
443            }
444        }
445    }
446
447    /// Execute an uncached query with explicit PostgreSQL parameter types and
448    /// stream zero-copy rows to `on_row`.
449    ///
450    /// Rows are backed by one shared payload buffer plus column offsets, so the
451    /// callback must not hold references past the current invocation.
452    pub async fn query_visit_bytes_rows_with_param_types_and_result_format<F>(
453        &mut self,
454        sql: &str,
455        param_types: &[u32],
456        params: &[Option<Vec<u8>>],
457        result_format: i16,
458        mut on_row: F,
459    ) -> PgResult<usize>
460    where
461        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
462    {
463        Self::validate_param_type_arity(params, param_types)?;
464
465        self.write_buf.clear();
466        PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
467            .map_err(|e| PgError::Encode(e.to_string()))?;
468        PgEncoder::encode_bind_to_with_result_format(
469            &mut self.write_buf,
470            "",
471            params,
472            result_format,
473        )
474        .map_err(|e| PgError::Encode(e.to_string()))?;
475        PgEncoder::encode_execute_to(&mut self.write_buf);
476        PgEncoder::encode_sync_to(&mut self.write_buf);
477
478        self.flush_write_buf().await?;
479
480        let mut row_count = 0usize;
481        let mut row = super::PgBytesRow::default();
482        let mut error: Option<PgError> = None;
483        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
484
485        loop {
486            match self.recv_fill_zerocopy_row_fast(&mut row).await {
487                Ok(msg_type) => {
488                    if let Err(err) = flow.validate_msg_type(
489                        msg_type,
490                        "extended-query visit bytes execute",
491                        error.is_some(),
492                    ) {
493                        return return_with_desync(self, err);
494                    }
495                    match msg_type {
496                        b'1' | b'2' | b'T' | b'n' => {}
497                        b'D' => {
498                            if error.is_none() {
499                                if let Err(err) = on_row(&row) {
500                                    return return_callback_error_with_desync(self, err);
501                                }
502                                row_count += 1;
503                                row.release_payload();
504                            }
505                        }
506                        b'C' => {}
507                        b'Z' => {
508                            if let Some(err) = error {
509                                return Err(err);
510                            }
511                            return Ok(row_count);
512                        }
513                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
514                        other => {
515                            return return_with_desync(
516                                self,
517                                unexpected_backend_msg_type(
518                                    "extended-query visit bytes execute",
519                                    other,
520                                ),
521                            );
522                        }
523                    }
524                }
525                Err(e) => {
526                    if matches!(&e, PgError::QueryServer(_)) {
527                        capture_query_server_error(self, &mut error, e);
528                        continue;
529                    }
530                    return Err(e);
531                }
532            }
533        }
534    }
535
536    /// Execute an uncached query with explicit PostgreSQL parameter types and
537    /// stream only the first column of each row.
538    pub async fn query_visit_first_column_bytes_with_param_types_and_result_format<F>(
539        &mut self,
540        sql: &str,
541        param_types: &[u32],
542        params: &[Option<Vec<u8>>],
543        result_format: i16,
544        mut on_value: F,
545    ) -> PgResult<usize>
546    where
547        F: FnMut(Option<&[u8]>) -> PgResult<()>,
548    {
549        Self::validate_param_type_arity(params, param_types)?;
550
551        self.write_buf.clear();
552        PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
553            .map_err(|e| PgError::Encode(e.to_string()))?;
554        PgEncoder::encode_bind_to_with_result_format(
555            &mut self.write_buf,
556            "",
557            params,
558            result_format,
559        )
560        .map_err(|e| PgError::Encode(e.to_string()))?;
561        PgEncoder::encode_execute_to(&mut self.write_buf);
562        PgEncoder::encode_sync_to(&mut self.write_buf);
563
564        self.flush_write_buf().await?;
565
566        let mut row_count = 0usize;
567        let mut first_column: Option<Bytes> = None;
568        let mut error: Option<PgError> = None;
569        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
570
571        loop {
572            match self
573                .recv_fill_first_column_zerocopy_fast(&mut first_column)
574                .await
575            {
576                Ok(msg_type) => {
577                    if let Err(err) = flow.validate_msg_type(
578                        msg_type,
579                        "extended-query visit first-column execute",
580                        error.is_some(),
581                    ) {
582                        return return_with_desync(self, err);
583                    }
584                    match msg_type {
585                        b'1' | b'2' | b'T' | b'n' => {}
586                        b'D' => {
587                            if error.is_none() {
588                                if let Err(err) = on_value(first_column.as_deref()) {
589                                    return return_callback_error_with_desync(self, err);
590                                }
591                                row_count += 1;
592                                first_column = None;
593                            }
594                        }
595                        b'C' => {}
596                        b'Z' => {
597                            if let Some(err) = error {
598                                return Err(err);
599                            }
600                            return Ok(row_count);
601                        }
602                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
603                        other => {
604                            return return_with_desync(
605                                self,
606                                unexpected_backend_msg_type(
607                                    "extended-query visit first-column execute",
608                                    other,
609                                ),
610                            );
611                        }
612                    }
613                }
614                Err(e) => {
615                    if matches!(&e, PgError::QueryServer(_)) {
616                        capture_query_server_error(self, &mut error, e);
617                        continue;
618                    }
619                    return Err(e);
620                }
621            }
622        }
623    }
624
625    /// Validate a query with explicit PostgreSQL parameter type OIDs without
626    /// executing it. Uses Parse + Bind + Describe(Portal) + Sync.
627    pub async fn probe_query_with_param_types(
628        &mut self,
629        sql: &str,
630        param_types: &[u32],
631        params: &[Option<Vec<u8>>],
632    ) -> PgResult<()> {
633        Self::validate_param_type_arity(params, param_types)?;
634
635        let parse = PgEncoder::try_encode_parse("", sql, param_types)
636            .map_err(|e| PgError::Encode(e.to_string()))?;
637        let bind =
638            PgEncoder::encode_bind("", "", params).map_err(|e| PgError::Encode(e.to_string()))?;
639        let describe =
640            PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
641        let sync = PgEncoder::encode_sync();
642        let mut bytes =
643            BytesMut::with_capacity(parse.len() + bind.len() + describe.len() + sync.len());
644        bytes.extend_from_slice(&parse);
645        bytes.extend_from_slice(&bind);
646        bytes.extend_from_slice(&describe);
647        bytes.extend_from_slice(&sync);
648        self.write_all_with_timeout(&bytes, "stream write").await?;
649
650        let mut saw_describe_response = false;
651        let mut error: Option<PgError> = None;
652        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal());
653
654        loop {
655            let msg = self.recv().await?;
656            if let Err(err) = flow.validate(&msg, "extended-query probe", error.is_some()) {
657                return return_with_desync(self, err);
658            }
659            match msg {
660                BackendMessage::ParseComplete => {}
661                BackendMessage::BindComplete => {}
662                BackendMessage::RowDescription(_) | BackendMessage::NoData => {
663                    saw_describe_response = true;
664                }
665                BackendMessage::ReadyForQuery(_) => {
666                    if let Some(err) = error {
667                        return Err(err);
668                    }
669                    if !saw_describe_response {
670                        return return_with_desync(
671                            self,
672                            PgError::Protocol(
673                                "extended-query probe finished without RowDescription/NoData"
674                                    .to_string(),
675                            ),
676                        );
677                    }
678                    return Ok(());
679                }
680                BackendMessage::ErrorResponse(err) => {
681                    if error.is_none() {
682                        error = Some(PgError::QueryServer(err.into()));
683                    }
684                }
685                msg if is_ignorable_session_message(&msg) => {}
686                other => {
687                    return return_with_desync(
688                        self,
689                        unexpected_backend_message("extended-query probe", &other),
690                    );
691                }
692            }
693        }
694    }
695
696    /// Execute a query with cached prepared statement.
697    /// Like `query()`, but reuses prepared statements across calls.
698    /// The statement name is derived from a hash of the SQL text.
699    /// OPTIMIZED: Pre-allocated buffer + ultra-fast encoders.
700    pub async fn query_cached(
701        &mut self,
702        sql: &str,
703        params: &[Option<Vec<u8>>],
704    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
705        self.query_cached_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
706            .await
707    }
708
709    /// Execute a query with cached prepared statement and explicit result-column format.
710    pub async fn query_cached_with_result_format(
711        &mut self,
712        sql: &str,
713        params: &[Option<Vec<u8>>],
714        result_format: i16,
715    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
716        let mut retried = false;
717        loop {
718            match self
719                .query_cached_with_result_format_once(sql, params, result_format)
720                .await
721            {
722                Ok(rows) => return Ok(rows),
723                Err(err)
724                    if !retried
725                        && (err.is_prepared_statement_retryable()
726                            || err.is_prepared_statement_already_exists()) =>
727                {
728                    retried = true;
729                    if err.is_prepared_statement_retryable() {
730                        self.clear_prepared_statement_state();
731                    }
732                }
733                Err(err) => return Err(err),
734            }
735        }
736    }
737
738    async fn query_cached_with_result_format_once(
739        &mut self,
740        sql: &str,
741        params: &[Option<Vec<u8>>],
742        result_format: i16,
743    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
744        let stmt_name = Self::sql_to_stmt_name(sql);
745        let is_new = !self.prepared_statements.contains_key(&stmt_name);
746
747        let needed = prepared_bind_execute_sync_wire_len(&stmt_name, params, result_format)?;
748        let mut buf = BytesMut::with_capacity(needed);
749
750        if is_new {
751            // Evict LRU prepared statement if at capacity. This prevents
752            // unbounded memory growth from dynamic batch filters while
753            // preserving hot statements (unlike the old nuclear `.clear()`).
754            self.evict_prepared_if_full();
755            if let Err(e) = PgEncoder::try_encode_parse_to(&mut buf, &stmt_name, sql, &[]) {
756                return Err(PgError::Encode(e.to_string()));
757            }
758            // Cache the SQL for debugging
759            self.prepared_statements
760                .insert(stmt_name.clone(), sql.to_string());
761        }
762
763        // Use ULTRA-OPTIMIZED encoders - write directly to buffer
764        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
765            &mut buf,
766            &stmt_name,
767            params,
768            result_format,
769        ) {
770            if is_new {
771                self.prepared_statements.remove(&stmt_name);
772            }
773            return Err(PgError::Encode(e.to_string()));
774        }
775        PgEncoder::encode_execute_to(&mut buf);
776        PgEncoder::encode_sync_to(&mut buf);
777
778        if let Err(err) = self.write_all_with_timeout(&buf, "stream write").await {
779            if is_new {
780                self.prepared_statements.remove(&stmt_name);
781            }
782            return Err(err);
783        }
784
785        let mut rows = Vec::new();
786
787        let mut error: Option<PgError> = None;
788        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(is_new));
789
790        loop {
791            let msg = match self.recv().await {
792                Ok(msg) => msg,
793                Err(err) => {
794                    if is_new && !flow.saw_parse_complete() {
795                        self.prepared_statements.remove(&stmt_name);
796                    }
797                    return Err(err);
798                }
799            };
800            if let Err(err) = flow.validate(&msg, "extended-query cached execute", error.is_some())
801            {
802                if is_new && !flow.saw_parse_complete() {
803                    self.prepared_statements.remove(&stmt_name);
804                }
805                return return_with_desync(self, err);
806            }
807            match msg {
808                BackendMessage::ParseComplete => {
809                    // Already cached in is_new block above.
810                }
811                BackendMessage::BindComplete => {}
812                BackendMessage::RowDescription(_) => {}
813                BackendMessage::DataRow(data) => {
814                    if error.is_none() {
815                        rows.push(data);
816                    }
817                }
818                BackendMessage::CommandComplete(_) => {}
819                BackendMessage::NoData => {}
820                BackendMessage::ReadyForQuery(_) => {
821                    if let Some(err) = error {
822                        if is_new
823                            && !flow.saw_parse_complete()
824                            && !err.is_prepared_statement_already_exists()
825                        {
826                            self.prepared_statements.remove(&stmt_name);
827                        }
828                        return Err(err);
829                    }
830                    if is_new && !flow.saw_parse_complete() {
831                        self.prepared_statements.remove(&stmt_name);
832                        return return_with_desync(
833                            self,
834                            PgError::Protocol(
835                                "Cache miss query reached ReadyForQuery without ParseComplete"
836                                    .to_string(),
837                            ),
838                        );
839                    }
840                    return Ok(rows);
841                }
842                BackendMessage::ErrorResponse(err) => {
843                    if error.is_none() {
844                        let query_err = PgError::QueryServer(err.into());
845                        if query_err.is_prepared_statement_retryable()
846                            || (is_new
847                                && !flow.saw_parse_complete()
848                                && !query_err.is_prepared_statement_already_exists())
849                        {
850                            // Invalidate cache only when the server-side prepared
851                            // statement is known absent or the Parse step failed.
852                            // Execution-stage errors after ParseComplete leave the
853                            // statement usable on the backend.
854                            self.prepared_statements.remove(&stmt_name);
855                        }
856                        error = Some(query_err);
857                    }
858                }
859                msg if is_ignorable_session_message(&msg) => {}
860                other => {
861                    if is_new && !flow.saw_parse_complete() {
862                        self.prepared_statements.remove(&stmt_name);
863                    }
864                    return return_with_desync(
865                        self,
866                        unexpected_backend_message("extended-query cached execute", &other),
867                    );
868                }
869            }
870        }
871    }
872
873    /// Generate a statement name from SQL hash.
874    /// Uses a simple hash to create a unique name like "stmt_12345abc".
875    pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
876        super::prepared::sql_bytes_to_stmt_name(sql.as_bytes())
877    }
878
879    /// Execute a simple SQL statement (no parameters).
880    pub async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
881        let bytes = PgEncoder::try_encode_query_string(sql)?;
882        self.write_all_with_timeout(&bytes, "stream write").await?;
883
884        let mut error: Option<PgError> = None;
885        let mut flow = SimpleFlowTracker::new();
886
887        loop {
888            let msg = self.recv().await?;
889            match msg {
890                BackendMessage::RowDescription(_) => {
891                    // Some callers use execute_simple() with session-shaping SQL that
892                    // can legally return rows (e.g., SELECT set_config(...)).
893                    // Drain and ignore row data while preserving protocol ordering checks.
894                    if let Err(err) = flow.on_row_description("simple-query execute") {
895                        return return_with_desync(self, err);
896                    }
897                }
898                BackendMessage::DataRow(_) => {
899                    if let Err(err) = flow.on_data_row("simple-query execute") {
900                        return return_with_desync(self, err);
901                    }
902                }
903                BackendMessage::CommandComplete(_) => {
904                    flow.on_command_complete();
905                }
906                BackendMessage::EmptyQueryResponse => {
907                    if let Err(err) = flow.on_empty_query_response("simple-query execute") {
908                        return return_with_desync(self, err);
909                    }
910                }
911                BackendMessage::ReadyForQuery(_) => {
912                    if let Some(err) = error {
913                        return Err(err);
914                    }
915                    if let Err(err) =
916                        flow.on_ready_for_query("simple-query execute", error.is_some())
917                    {
918                        return return_with_desync(self, err);
919                    }
920                    return Ok(());
921                }
922                BackendMessage::ErrorResponse(err) => {
923                    if error.is_none() {
924                        error = Some(PgError::QueryServer(err.into()));
925                    }
926                }
927                msg if is_ignorable_session_message(&msg) => {}
928                other => {
929                    return return_with_desync(
930                        self,
931                        unexpected_backend_message("simple-query execute", &other),
932                    );
933                }
934            }
935        }
936    }
937
938    /// Execute a simple SQL query and return rows (Simple Query Protocol).
939    ///
940    /// Unlike `execute_simple`, this collects and returns data rows.
941    /// Used for branch management and other administrative queries.
942    ///
943    /// SECURITY: Capped at 10,000 rows to prevent OOM from unbounded results.
944    pub async fn simple_query(&mut self, sql: &str) -> PgResult<Vec<super::PgRow>> {
945        use std::sync::Arc;
946
947        /// Safety cap to prevent OOM from unbounded result accumulation.
948        /// Simple Query Protocol has no streaming; all rows are buffered in memory.
949        const MAX_SIMPLE_QUERY_ROWS: usize = 10_000;
950
951        let bytes = PgEncoder::try_encode_query_string(sql)?;
952        self.write_all_with_timeout(&bytes, "stream write").await?;
953
954        let mut rows: Vec<super::PgRow> = Vec::new();
955        let mut column_info: Option<Arc<super::ColumnInfo>> = None;
956        let mut error: Option<PgError> = None;
957        let mut flow = SimpleFlowTracker::new();
958
959        loop {
960            let msg = self.recv().await?;
961            match msg {
962                BackendMessage::RowDescription(fields) => {
963                    if let Err(err) = flow.on_row_description("simple-query read") {
964                        return return_with_desync(self, err);
965                    }
966                    column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
967                }
968                BackendMessage::DataRow(data) => {
969                    if let Err(err) = flow.on_data_row("simple-query read") {
970                        return return_with_desync(self, err);
971                    }
972                    if error.is_none() {
973                        if rows.len() >= MAX_SIMPLE_QUERY_ROWS {
974                            if error.is_none() {
975                                error = Some(PgError::Query(format!(
976                                    "simple_query exceeded {} row safety cap",
977                                    MAX_SIMPLE_QUERY_ROWS,
978                                )));
979                            }
980                            // Continue draining to reach ReadyForQuery
981                        } else {
982                            rows.push(super::PgRow {
983                                columns: data,
984                                column_info: column_info.clone(),
985                            });
986                        }
987                    }
988                }
989                BackendMessage::CommandComplete(_) => {
990                    flow.on_command_complete();
991                    column_info = None;
992                }
993                BackendMessage::EmptyQueryResponse => {
994                    if let Err(err) = flow.on_empty_query_response("simple-query read") {
995                        return return_with_desync(self, err);
996                    }
997                    column_info = None;
998                }
999                BackendMessage::ReadyForQuery(_) => {
1000                    if let Some(err) = error {
1001                        return Err(err);
1002                    }
1003                    if let Err(err) = flow.on_ready_for_query("simple-query read", error.is_some())
1004                    {
1005                        return return_with_desync(self, err);
1006                    }
1007                    return Ok(rows);
1008                }
1009                BackendMessage::ErrorResponse(err) => {
1010                    if error.is_none() {
1011                        error = Some(PgError::QueryServer(err.into()));
1012                    }
1013                }
1014                msg if is_ignorable_session_message(&msg) => {}
1015                other => {
1016                    return return_with_desync(
1017                        self,
1018                        unexpected_backend_message("simple-query read", &other),
1019                    );
1020                }
1021            }
1022        }
1023    }
1024
1025    /// ZERO-HASH sequential query using pre-computed PreparedStatement.
1026    /// This is the FASTEST sequential path because it skips:
1027    /// - SQL generation from AST (done once outside loop)
1028    /// - Hash computation for statement name (pre-computed in PreparedStatement)
1029    /// - HashMap lookup for is_new check (statement already prepared)
1030    /// # Example
1031    /// ```ignore
1032    /// let stmt = conn.prepare("SELECT * FROM users WHERE id = $1").await?;
1033    /// for id in 1..10000 {
1034    ///     let rows = conn.query_prepared_single(&stmt, &[Some(id.to_string().into_bytes())]).await?;
1035    /// }
1036    /// ```
1037    #[inline]
1038    pub async fn query_prepared_single(
1039        &mut self,
1040        stmt: &super::PreparedStatement,
1041        params: &[Option<Vec<u8>>],
1042    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1043        self.query_prepared_single_with_result_format(stmt, params, PgEncoder::FORMAT_TEXT)
1044            .await
1045    }
1046
1047    /// ZERO-HASH sequential prepared execution that drains rows without
1048    /// materializing them.
1049    ///
1050    /// Useful for throughput-focused paths where only protocol completion
1051    /// matters and result payload is intentionally ignored.
1052    #[inline]
1053    pub async fn query_prepared_single_count(
1054        &mut self,
1055        stmt: &super::PreparedStatement,
1056        params: &[Option<Vec<u8>>],
1057    ) -> PgResult<()> {
1058        self.write_buf.clear();
1059        PgEncoder::encode_bind_to(&mut self.write_buf, &stmt.name, params)
1060            .map_err(|e| PgError::Encode(e.to_string()))?;
1061        PgEncoder::encode_execute_to(&mut self.write_buf);
1062        PgEncoder::encode_sync_to(&mut self.write_buf);
1063
1064        self.flush_write_buf().await?;
1065
1066        let mut error: Option<PgError> = None;
1067        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1068
1069        loop {
1070            match self.recv_msg_type_fast().await {
1071                Ok(msg_type) => {
1072                    if let Err(err) = flow.validate_msg_type(
1073                        msg_type,
1074                        "prepared single count execute",
1075                        error.is_some(),
1076                    ) {
1077                        return return_with_desync(self, err);
1078                    }
1079                    match msg_type {
1080                        b'2' | b'T' | b'D' | b'C' | b'n' => {}
1081                        b'Z' => {
1082                            if let Some(err) = error {
1083                                return Err(err);
1084                            }
1085                            return Ok(());
1086                        }
1087                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1088                        other => {
1089                            return return_with_desync(
1090                                self,
1091                                unexpected_backend_msg_type("prepared single count execute", other),
1092                            );
1093                        }
1094                    }
1095                }
1096                Err(e) => {
1097                    if matches!(&e, PgError::QueryServer(_)) {
1098                        capture_query_server_error(self, &mut error, e);
1099                        continue;
1100                    }
1101                    return Err(e);
1102                }
1103            }
1104        }
1105    }
1106
1107    /// ZERO-HASH sequential query with explicit result-column format.
1108    #[inline]
1109    pub async fn query_prepared_single_with_result_format(
1110        &mut self,
1111        stmt: &super::PreparedStatement,
1112        params: &[Option<Vec<u8>>],
1113        result_format: i16,
1114    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1115        let needed = prepared_bind_execute_sync_wire_len(&stmt.name, params, result_format)?;
1116        let mut buf = BytesMut::with_capacity(needed);
1117
1118        // ZERO HASH, ZERO LOOKUP - just encode and send!
1119        PgEncoder::encode_bind_to_with_result_format(&mut buf, &stmt.name, params, result_format)
1120            .map_err(|e| PgError::Encode(e.to_string()))?;
1121        PgEncoder::encode_execute_to(&mut buf);
1122        PgEncoder::encode_sync_to(&mut buf);
1123
1124        self.write_all_with_timeout(&buf, "stream write").await?;
1125
1126        let mut rows = Vec::new();
1127
1128        let mut error: Option<PgError> = None;
1129        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1130
1131        loop {
1132            let msg = self.recv().await?;
1133            if let Err(err) = flow.validate(&msg, "prepared single execute", error.is_some()) {
1134                return return_with_desync(self, err);
1135            }
1136            match msg {
1137                BackendMessage::BindComplete => {}
1138                BackendMessage::RowDescription(_) => {}
1139                BackendMessage::DataRow(data) => {
1140                    if error.is_none() {
1141                        rows.push(data);
1142                    }
1143                }
1144                BackendMessage::CommandComplete(_) => {}
1145                BackendMessage::NoData => {}
1146                BackendMessage::ReadyForQuery(_) => {
1147                    if let Some(err) = error {
1148                        return Err(err);
1149                    }
1150                    return Ok(rows);
1151                }
1152                BackendMessage::ErrorResponse(err) => {
1153                    capture_query_server_error(self, &mut error, PgError::QueryServer(err.into()));
1154                }
1155                msg if is_ignorable_session_message(&msg) => {}
1156                other => {
1157                    return return_with_desync(
1158                        self,
1159                        unexpected_backend_message("prepared single execute", &other),
1160                    );
1161                }
1162            }
1163        }
1164    }
1165
1166    /// ZERO-HASH sequential query with explicit result-column format using
1167    /// reusable connection buffers (avoids per-call `BytesMut` allocation).
1168    #[inline]
1169    pub async fn query_prepared_single_reuse_with_result_format(
1170        &mut self,
1171        stmt: &super::PreparedStatement,
1172        params: &[Option<Vec<u8>>],
1173        result_format: i16,
1174    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
1175        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1176
1177        PgEncoder::encode_bind_to_with_result_format(
1178            &mut self.write_buf,
1179            &stmt.name,
1180            params,
1181            result_format,
1182        )
1183        .map_err(|e| PgError::Encode(e.to_string()))?;
1184        PgEncoder::encode_execute_to(&mut self.write_buf);
1185        PgEncoder::encode_sync_to(&mut self.write_buf);
1186
1187        self.flush_write_buf().await?;
1188
1189        let mut rows = Vec::with_capacity(32);
1190        let mut error: Option<PgError> = None;
1191        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1192
1193        loop {
1194            let msg = self.recv().await?;
1195            if let Err(err) = flow.validate(&msg, "prepared single reuse execute", error.is_some())
1196            {
1197                return return_with_desync(self, err);
1198            }
1199            match msg {
1200                BackendMessage::BindComplete => {}
1201                BackendMessage::RowDescription(_) => {}
1202                BackendMessage::DataRow(data) => {
1203                    if error.is_none() {
1204                        rows.push(data);
1205                    }
1206                }
1207                BackendMessage::CommandComplete(_) => {}
1208                BackendMessage::NoData => {}
1209                BackendMessage::ReadyForQuery(_) => {
1210                    if let Some(err) = error {
1211                        return Err(err);
1212                    }
1213                    return Ok(rows);
1214                }
1215                BackendMessage::ErrorResponse(err) => {
1216                    capture_query_server_error(self, &mut error, PgError::QueryServer(err.into()));
1217                }
1218                msg if is_ignorable_session_message(&msg) => {}
1219                other => {
1220                    return return_with_desync(
1221                        self,
1222                        unexpected_backend_message("prepared single reuse execute", &other),
1223                    );
1224                }
1225            }
1226        }
1227    }
1228
1229    /// Sequential prepared query using reusable connection buffers and row visitor.
1230    ///
1231    /// Rows are streamed to `on_row` as owned column buffers, avoiding
1232    /// materializing the full result set.
1233    #[inline]
1234    pub async fn query_prepared_single_reuse_visit_rows_with_result_format<F>(
1235        &mut self,
1236        stmt: &super::PreparedStatement,
1237        params: &[Option<Vec<u8>>],
1238        result_format: i16,
1239        mut on_row: F,
1240    ) -> PgResult<usize>
1241    where
1242        F: FnMut(&[Option<Vec<u8>>]) -> PgResult<()>,
1243    {
1244        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1245
1246        PgEncoder::encode_bind_to_with_result_format(
1247            &mut self.write_buf,
1248            &stmt.name,
1249            params,
1250            result_format,
1251        )
1252        .map_err(|e| PgError::Encode(e.to_string()))?;
1253        PgEncoder::encode_execute_to(&mut self.write_buf);
1254        PgEncoder::encode_sync_to(&mut self.write_buf);
1255
1256        self.flush_write_buf().await?;
1257
1258        let mut row_count = 0usize;
1259        let mut row_buf: Vec<Option<Vec<u8>>> = Vec::new();
1260        let mut error: Option<PgError> = None;
1261        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1262
1263        loop {
1264            match self.recv_fill_data_row_fast(&mut row_buf).await {
1265                Ok(msg_type) => {
1266                    if let Err(err) = flow.validate_msg_type(
1267                        msg_type,
1268                        "prepared single reuse visit execute",
1269                        error.is_some(),
1270                    ) {
1271                        return return_with_desync(self, err);
1272                    }
1273                    match msg_type {
1274                        b'2' | b'T' | b'n' => {}
1275                        b'D' => {
1276                            if error.is_none() {
1277                                if let Err(err) = on_row(row_buf.as_slice()) {
1278                                    return return_callback_error_with_desync(self, err);
1279                                }
1280                                row_count += 1;
1281                            }
1282                        }
1283                        b'C' => {}
1284                        b'Z' => {
1285                            if let Some(err) = error {
1286                                return Err(err);
1287                            }
1288                            return Ok(row_count);
1289                        }
1290                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1291                        other => {
1292                            return return_with_desync(
1293                                self,
1294                                unexpected_backend_msg_type(
1295                                    "prepared single reuse visit execute",
1296                                    other,
1297                                ),
1298                            );
1299                        }
1300                    }
1301                }
1302                Err(e) => {
1303                    if matches!(&e, PgError::QueryServer(_)) {
1304                        capture_query_server_error(self, &mut error, e);
1305                        continue;
1306                    }
1307                    return Err(e);
1308                }
1309            }
1310        }
1311    }
1312
1313    /// Sequential prepared query using reusable connection buffers and zero-copy row visitor.
1314    ///
1315    /// Rows are backed by a shared payload buffer plus column offsets, avoiding
1316    /// per-cell byte copies during receive.
1317    #[inline]
1318    pub async fn query_prepared_single_reuse_visit_bytes_rows_with_result_format<F>(
1319        &mut self,
1320        stmt: &super::PreparedStatement,
1321        params: &[Option<Vec<u8>>],
1322        result_format: i16,
1323        mut on_row: F,
1324    ) -> PgResult<usize>
1325    where
1326        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1327    {
1328        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1329
1330        PgEncoder::encode_bind_to_with_result_format(
1331            &mut self.write_buf,
1332            &stmt.name,
1333            params,
1334            result_format,
1335        )
1336        .map_err(|e| PgError::Encode(e.to_string()))?;
1337        PgEncoder::encode_execute_to(&mut self.write_buf);
1338        PgEncoder::encode_sync_to(&mut self.write_buf);
1339
1340        self.flush_write_buf().await?;
1341
1342        let mut row_count = 0usize;
1343        let mut row = super::PgBytesRow::default();
1344        let mut error: Option<PgError> = None;
1345        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1346
1347        loop {
1348            match self.recv_fill_zerocopy_row_fast(&mut row).await {
1349                Ok(msg_type) => {
1350                    if let Err(err) = flow.validate_msg_type(
1351                        msg_type,
1352                        "prepared single reuse visit bytes execute",
1353                        error.is_some(),
1354                    ) {
1355                        return return_with_desync(self, err);
1356                    }
1357                    match msg_type {
1358                        b'2' | b'T' | b'n' => {}
1359                        b'D' => {
1360                            if error.is_none() {
1361                                if let Err(err) = on_row(&row) {
1362                                    return return_callback_error_with_desync(self, err);
1363                                }
1364                                row_count += 1;
1365                                row.release_payload();
1366                            }
1367                        }
1368                        b'C' => {}
1369                        b'Z' => {
1370                            if let Some(err) = error {
1371                                return Err(err);
1372                            }
1373                            return Ok(row_count);
1374                        }
1375                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1376                        other => {
1377                            return return_with_desync(
1378                                self,
1379                                unexpected_backend_msg_type(
1380                                    "prepared single reuse visit bytes execute",
1381                                    other,
1382                                ),
1383                            );
1384                        }
1385                    }
1386                }
1387                Err(e) => {
1388                    if matches!(&e, PgError::QueryServer(_)) {
1389                        capture_query_server_error(self, &mut error, e);
1390                        continue;
1391                    }
1392                    return Err(e);
1393                }
1394            }
1395        }
1396    }
1397
1398    /// Sequential prepared query using reusable buffers and first-column visitor.
1399    #[inline]
1400    pub async fn query_prepared_single_reuse_visit_first_column_bytes_with_result_format<F>(
1401        &mut self,
1402        stmt: &super::PreparedStatement,
1403        params: &[Option<Vec<u8>>],
1404        result_format: i16,
1405        mut on_value: F,
1406    ) -> PgResult<usize>
1407    where
1408        F: FnMut(Option<&[u8]>) -> PgResult<()>,
1409    {
1410        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1411
1412        PgEncoder::encode_bind_to_with_result_format(
1413            &mut self.write_buf,
1414            &stmt.name,
1415            params,
1416            result_format,
1417        )
1418        .map_err(|e| PgError::Encode(e.to_string()))?;
1419        PgEncoder::encode_execute_to(&mut self.write_buf);
1420        PgEncoder::encode_sync_to(&mut self.write_buf);
1421
1422        self.flush_write_buf().await?;
1423
1424        let mut row_count = 0usize;
1425        let mut first_column: Option<Bytes> = None;
1426        let mut error: Option<PgError> = None;
1427        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1428
1429        loop {
1430            match self
1431                .recv_fill_first_column_zerocopy_fast(&mut first_column)
1432                .await
1433            {
1434                Ok(msg_type) => {
1435                    if let Err(err) = flow.validate_msg_type(
1436                        msg_type,
1437                        "prepared single reuse visit first-column execute",
1438                        error.is_some(),
1439                    ) {
1440                        return return_with_desync(self, err);
1441                    }
1442                    match msg_type {
1443                        b'2' | b'T' | b'n' => {}
1444                        b'D' => {
1445                            if error.is_none() {
1446                                if let Err(err) = on_value(first_column.as_deref()) {
1447                                    return return_callback_error_with_desync(self, err);
1448                                }
1449                                row_count += 1;
1450                                first_column = None;
1451                            }
1452                        }
1453                        b'C' => {}
1454                        b'Z' => {
1455                            if let Some(err) = error {
1456                                return Err(err);
1457                            }
1458                            return Ok(row_count);
1459                        }
1460                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1461                        other => {
1462                            return return_with_desync(
1463                                self,
1464                                unexpected_backend_msg_type(
1465                                    "prepared single reuse visit first-column execute",
1466                                    other,
1467                                ),
1468                            );
1469                        }
1470                    }
1471                }
1472                Err(e) => {
1473                    if matches!(&e, PgError::QueryServer(_)) {
1474                        capture_query_server_error(self, &mut error, e);
1475                        continue;
1476                    }
1477                    return Err(e);
1478                }
1479            }
1480        }
1481    }
1482
1483    /// Sequential prepared query using reusable buffers and fixed 4-column visitor.
1484    #[inline]
1485    pub async fn query_prepared_single_reuse_visit_first_four_columns_bytes_with_result_format<F>(
1486        &mut self,
1487        stmt: &super::PreparedStatement,
1488        params: &[Option<Vec<u8>>],
1489        result_format: i16,
1490        mut on_row: F,
1491    ) -> PgResult<usize>
1492    where
1493        F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1494    {
1495        reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
1496
1497        PgEncoder::encode_bind_to_with_result_format(
1498            &mut self.write_buf,
1499            &stmt.name,
1500            params,
1501            result_format,
1502        )
1503        .map_err(|e| PgError::Encode(e.to_string()))?;
1504        PgEncoder::encode_execute_to(&mut self.write_buf);
1505        PgEncoder::encode_sync_to(&mut self.write_buf);
1506
1507        self.flush_write_buf().await?;
1508
1509        let mut row_count = 0usize;
1510        let mut columns = [None, None, None, None];
1511        let mut error: Option<PgError> = None;
1512        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1513
1514        loop {
1515            match self
1516                .recv_fill_first_four_columns_zerocopy_fast(&mut columns)
1517                .await
1518            {
1519                Ok(msg_type) => {
1520                    if let Err(err) = flow.validate_msg_type(
1521                        msg_type,
1522                        "prepared single reuse visit first-four execute",
1523                        error.is_some(),
1524                    ) {
1525                        return return_with_desync(self, err);
1526                    }
1527                    match msg_type {
1528                        b'2' | b'T' | b'n' => {}
1529                        b'D' => {
1530                            if error.is_none() {
1531                                if let Err(err) = on_row([
1532                                    columns[0].as_deref(),
1533                                    columns[1].as_deref(),
1534                                    columns[2].as_deref(),
1535                                    columns[3].as_deref(),
1536                                ]) {
1537                                    return return_callback_error_with_desync(self, err);
1538                                }
1539                                columns.fill(None);
1540                                row_count += 1;
1541                            }
1542                        }
1543                        b'C' => {}
1544                        b'Z' => {
1545                            if let Some(err) = error {
1546                                return Err(err);
1547                            }
1548                            return Ok(row_count);
1549                        }
1550                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1551                        other => {
1552                            return return_with_desync(
1553                                self,
1554                                unexpected_backend_msg_type(
1555                                    "prepared single reuse visit first-four execute",
1556                                    other,
1557                                ),
1558                            );
1559                        }
1560                    }
1561                }
1562                Err(e) => {
1563                    if matches!(&e, PgError::QueryServer(_)) {
1564                        capture_query_server_error(self, &mut error, e);
1565                        continue;
1566                    }
1567                    return Err(e);
1568                }
1569            }
1570        }
1571    }
1572
1573    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1574    ///
1575    /// `wire` must contain exactly one prepared-statement execution.
1576    #[inline]
1577    pub async fn query_prepared_single_encoded_visit_bytes_rows<F>(
1578        &mut self,
1579        wire: &[u8],
1580        on_row: F,
1581    ) -> PgResult<usize>
1582    where
1583        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1584    {
1585        let (row_count, _, _) = self
1586            .query_prepared_single_encoded_visit_bytes_rows_profiled(wire, on_row)
1587            .await?;
1588        Ok(row_count)
1589    }
1590
1591    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1592    ///
1593    /// Returns `(rows, send_elapsed, consume_elapsed)`.
1594    #[inline]
1595    pub async fn query_prepared_single_encoded_visit_bytes_rows_profiled<F>(
1596        &mut self,
1597        wire: &[u8],
1598        mut on_row: F,
1599    ) -> PgResult<(usize, Duration, Duration)>
1600    where
1601        F: FnMut(&super::PgBytesRow) -> PgResult<()>,
1602    {
1603        let send_start = Instant::now();
1604        self.send_bytes(wire).await?;
1605        let send_elapsed = send_start.elapsed();
1606        let consume_start = Instant::now();
1607
1608        let mut row_count = 0usize;
1609        let mut row = super::PgBytesRow::default();
1610        let mut error: Option<PgError> = None;
1611        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1612
1613        loop {
1614            match self.recv_fill_zerocopy_row_fast(&mut row).await {
1615                Ok(msg_type) => {
1616                    if let Err(err) = flow.validate_msg_type(
1617                        msg_type,
1618                        "prepared single encoded visit bytes execute",
1619                        error.is_some(),
1620                    ) {
1621                        return return_with_desync(self, err);
1622                    }
1623                    match msg_type {
1624                        b'2' | b'T' | b'n' => {}
1625                        b'D' => {
1626                            if error.is_none() {
1627                                if let Err(err) = on_row(&row) {
1628                                    return return_callback_error_with_desync(self, err);
1629                                }
1630                                row_count += 1;
1631                                row.release_payload();
1632                            }
1633                        }
1634                        b'C' => {}
1635                        b'Z' => {
1636                            if let Some(err) = error {
1637                                return Err(err);
1638                            }
1639                            return Ok((row_count, send_elapsed, consume_start.elapsed()));
1640                        }
1641                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1642                        other => {
1643                            return return_with_desync(
1644                                self,
1645                                unexpected_backend_msg_type(
1646                                    "prepared single encoded visit bytes execute",
1647                                    other,
1648                                ),
1649                            );
1650                        }
1651                    }
1652                }
1653                Err(e) => {
1654                    if matches!(&e, PgError::QueryServer(_)) {
1655                        capture_query_server_error(self, &mut error, e);
1656                        continue;
1657                    }
1658                    return Err(e);
1659                }
1660            }
1661        }
1662    }
1663
1664    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1665    #[inline]
1666    pub async fn query_prepared_single_encoded_visit_first_column_bytes<F>(
1667        &mut self,
1668        wire: &[u8],
1669        on_value: F,
1670    ) -> PgResult<usize>
1671    where
1672        F: FnMut(Option<&[u8]>) -> PgResult<()>,
1673    {
1674        let (row_count, _, _) = self
1675            .query_prepared_single_encoded_visit_first_column_bytes_profiled(wire, on_value)
1676            .await?;
1677        Ok(row_count)
1678    }
1679
1680    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1681    ///
1682    /// Returns `(rows, send_elapsed, consume_elapsed)`.
1683    #[inline]
1684    pub async fn query_prepared_single_encoded_visit_first_column_bytes_profiled<F>(
1685        &mut self,
1686        wire: &[u8],
1687        mut on_value: F,
1688    ) -> PgResult<(usize, Duration, Duration)>
1689    where
1690        F: FnMut(Option<&[u8]>) -> PgResult<()>,
1691    {
1692        let send_start = Instant::now();
1693        self.send_bytes(wire).await?;
1694        let send_elapsed = send_start.elapsed();
1695        let consume_start = Instant::now();
1696
1697        let mut row_count = 0usize;
1698        let mut first_column: Option<Bytes> = None;
1699        let mut error: Option<PgError> = None;
1700        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1701
1702        loop {
1703            match self
1704                .recv_fill_first_column_zerocopy_fast(&mut first_column)
1705                .await
1706            {
1707                Ok(msg_type) => {
1708                    if let Err(err) = flow.validate_msg_type(
1709                        msg_type,
1710                        "prepared single encoded visit first-column execute",
1711                        error.is_some(),
1712                    ) {
1713                        return return_with_desync(self, err);
1714                    }
1715                    match msg_type {
1716                        b'2' | b'T' | b'n' => {}
1717                        b'D' => {
1718                            if error.is_none() {
1719                                if let Err(err) = on_value(first_column.as_deref()) {
1720                                    return return_callback_error_with_desync(self, err);
1721                                }
1722                                row_count += 1;
1723                                first_column = None;
1724                            }
1725                        }
1726                        b'C' => {}
1727                        b'Z' => {
1728                            if let Some(err) = error {
1729                                return Err(err);
1730                            }
1731                            return Ok((row_count, send_elapsed, consume_start.elapsed()));
1732                        }
1733                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1734                        other => {
1735                            return return_with_desync(
1736                                self,
1737                                unexpected_backend_msg_type(
1738                                    "prepared single encoded visit first-column execute",
1739                                    other,
1740                                ),
1741                            );
1742                        }
1743                    }
1744                }
1745                Err(e) => {
1746                    if matches!(&e, PgError::QueryServer(_)) {
1747                        capture_query_server_error(self, &mut error, e);
1748                        continue;
1749                    }
1750                    return Err(e);
1751                }
1752            }
1753        }
1754    }
1755
1756    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1757    #[inline]
1758    pub async fn query_prepared_single_encoded_visit_first_four_columns_bytes<F>(
1759        &mut self,
1760        wire: &[u8],
1761        on_row: F,
1762    ) -> PgResult<usize>
1763    where
1764        F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1765    {
1766        let (row_count, _, _) = self
1767            .query_prepared_single_encoded_visit_first_four_columns_bytes_profiled(wire, on_row)
1768            .await?;
1769        Ok(row_count)
1770    }
1771
1772    /// Sequential prepared query from pre-encoded Bind/Execute/Sync wire bytes.
1773    ///
1774    /// Returns `(rows, send_elapsed, consume_elapsed)`.
1775    #[inline]
1776    pub async fn query_prepared_single_encoded_visit_first_four_columns_bytes_profiled<F>(
1777        &mut self,
1778        wire: &[u8],
1779        mut on_row: F,
1780    ) -> PgResult<(usize, Duration, Duration)>
1781    where
1782        F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
1783    {
1784        let send_start = Instant::now();
1785        self.send_bytes(wire).await?;
1786        let send_elapsed = send_start.elapsed();
1787        let consume_start = Instant::now();
1788
1789        let mut row_count = 0usize;
1790        let mut columns = [None, None, None, None];
1791        let mut error: Option<PgError> = None;
1792        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
1793
1794        loop {
1795            match self
1796                .recv_fill_first_four_columns_zerocopy_fast(&mut columns)
1797                .await
1798            {
1799                Ok(msg_type) => {
1800                    if let Err(err) = flow.validate_msg_type(
1801                        msg_type,
1802                        "prepared single encoded visit first-four execute",
1803                        error.is_some(),
1804                    ) {
1805                        return return_with_desync(self, err);
1806                    }
1807                    match msg_type {
1808                        b'2' | b'T' | b'n' => {}
1809                        b'D' => {
1810                            if error.is_none() {
1811                                if let Err(err) = on_row([
1812                                    columns[0].as_deref(),
1813                                    columns[1].as_deref(),
1814                                    columns[2].as_deref(),
1815                                    columns[3].as_deref(),
1816                                ]) {
1817                                    return return_callback_error_with_desync(self, err);
1818                                }
1819                                columns.fill(None);
1820                                row_count += 1;
1821                            }
1822                        }
1823                        b'C' => {}
1824                        b'Z' => {
1825                            if let Some(err) = error {
1826                                return Err(err);
1827                            }
1828                            return Ok((row_count, send_elapsed, consume_start.elapsed()));
1829                        }
1830                        msg_type if is_ignorable_session_msg_type(msg_type) => {}
1831                        other => {
1832                            return return_with_desync(
1833                                self,
1834                                unexpected_backend_msg_type(
1835                                    "prepared single encoded visit first-four execute",
1836                                    other,
1837                                ),
1838                            );
1839                        }
1840                    }
1841                }
1842                Err(e) => {
1843                    if matches!(&e, PgError::QueryServer(_)) {
1844                        capture_query_server_error(self, &mut error, e);
1845                        continue;
1846                    }
1847                    return Err(e);
1848                }
1849            }
1850        }
1851    }
1852}
1853
1854#[cfg(test)]
1855mod tests {
1856    use super::*;
1857
1858    #[cfg(unix)]
1859    fn test_conn_with_peer() -> (PgConnection, tokio::net::UnixStream) {
1860        use crate::driver::connection::StatementCache;
1861        use crate::driver::stream::PgStream;
1862        use bytes::BytesMut;
1863        use std::collections::{HashMap, VecDeque};
1864        use std::num::NonZeroUsize;
1865        use tokio::net::UnixStream;
1866
1867        let (unix_stream, peer) = UnixStream::pair().expect("unix stream pair");
1868        (
1869            PgConnection {
1870                stream: PgStream::Unix(unix_stream),
1871                buffer: BytesMut::with_capacity(1024),
1872                write_buf: BytesMut::with_capacity(1024),
1873                sql_buf: BytesMut::with_capacity(256),
1874                params_buf: Vec::new(),
1875                prepared_statements: HashMap::new(),
1876                stmt_cache: StatementCache::new(NonZeroUsize::new(2).expect("non-zero")),
1877                column_info_cache: HashMap::new(),
1878                process_id: 0,
1879                cancel_key_bytes: Vec::new(),
1880                requested_protocol_minor: PgConnection::default_protocol_minor(),
1881                negotiated_protocol_minor: PgConnection::default_protocol_minor(),
1882                notifications: VecDeque::new(),
1883                replication_stream_active: false,
1884                replication_mode_enabled: false,
1885                last_replication_wal_end: None,
1886                io_desynced: false,
1887                pending_statement_closes: Vec::new(),
1888                draining_statement_closes: false,
1889            },
1890            peer,
1891        )
1892    }
1893
1894    #[cfg(unix)]
1895    fn test_conn() -> PgConnection {
1896        test_conn_with_peer().0
1897    }
1898
1899    #[cfg(unix)]
1900    fn push_backend_frame(conn: &mut PgConnection, msg_type: u8, payload: &[u8]) {
1901        conn.buffer.extend_from_slice(&[msg_type]);
1902        conn.buffer
1903            .extend_from_slice(&((payload.len() + 4) as u32).to_be_bytes());
1904        conn.buffer.extend_from_slice(payload);
1905    }
1906
1907    fn error_response_payload(code: &str, message: &str) -> Vec<u8> {
1908        let mut payload = Vec::new();
1909        payload.push(b'S');
1910        payload.extend_from_slice(b"ERROR\0");
1911        payload.push(b'C');
1912        payload.extend_from_slice(code.as_bytes());
1913        payload.push(0);
1914        payload.push(b'M');
1915        payload.extend_from_slice(message.as_bytes());
1916        payload.push(0);
1917        payload.push(0);
1918        payload
1919    }
1920
1921    #[test]
1922    fn prepared_buffer_sizing_rejects_too_many_params_before_allocation() {
1923        let params = vec![None; i16::MAX as usize + 1];
1924        let err = prepared_bind_execute_sync_wire_len("stmt", &params, PgEncoder::FORMAT_TEXT)
1925            .expect_err("parameter overflow must be rejected");
1926
1927        assert!(matches!(err, PgError::Encode(msg) if msg.contains("Too many parameters")));
1928    }
1929
1930    #[test]
1931    fn sql_to_stmt_name_matches_prepared_statement_identity() {
1932        let sql = "SELECT id, name FROM users WHERE id = $1";
1933        let stmt = super::super::PreparedStatement::from_sql(sql);
1934        assert_eq!(PgConnection::sql_to_stmt_name(sql), stmt.name());
1935    }
1936
1937    #[cfg(unix)]
1938    #[tokio::test]
1939    async fn streaming_callback_error_marks_query_connection_desynced() {
1940        let mut conn = test_conn();
1941
1942        let err = return_callback_error_with_desync::<()>(
1943            &mut conn,
1944            PgError::Query("consumer stopped".to_string()),
1945        )
1946        .expect_err("callback error should be returned");
1947
1948        assert!(matches!(err, PgError::Query(msg) if msg == "consumer stopped"));
1949        assert!(conn.is_io_desynced());
1950    }
1951
1952    #[cfg(unix)]
1953    #[tokio::test]
1954    async fn protocol_order_error_marks_query_connection_desynced() {
1955        let (mut conn, _peer) = test_conn_with_peer();
1956        push_backend_frame(&mut conn, b'D', &0i16.to_be_bytes());
1957        let stmt = super::super::PreparedStatement::from_sql("SELECT 1");
1958
1959        let err = conn
1960            .query_prepared_single_count(&stmt, &[])
1961            .await
1962            .expect_err("out-of-order DataRow must fail");
1963
1964        assert!(err.to_string().contains("DataRow before BindComplete"));
1965        assert!(conn.is_io_desynced());
1966    }
1967
1968    #[cfg(unix)]
1969    #[tokio::test]
1970    async fn simple_flow_error_marks_query_connection_desynced() {
1971        let (mut conn, _peer) = test_conn_with_peer();
1972        push_backend_frame(&mut conn, b'Z', b"I");
1973
1974        let err = conn
1975            .execute_simple("SELECT 1")
1976            .await
1977            .expect_err("ReadyForQuery before completion must fail");
1978
1979        assert!(err.to_string().contains("ReadyForQuery before completion"));
1980        assert!(conn.is_io_desynced());
1981    }
1982
1983    #[cfg(unix)]
1984    #[tokio::test]
1985    async fn query_cached_keeps_statement_after_post_parse_error() {
1986        let (mut conn, _peer) = test_conn_with_peer();
1987        let sql = "SELECT $1";
1988        let stmt_name = PgConnection::sql_to_stmt_name(sql);
1989        let err_payload = error_response_payload("23514", "check constraint violation");
1990
1991        push_backend_frame(&mut conn, b'1', &[]);
1992        push_backend_frame(&mut conn, b'E', &err_payload);
1993        push_backend_frame(&mut conn, b'Z', b"I");
1994
1995        let err = conn
1996            .query_cached(sql, &[Some(b"bad".to_vec())])
1997            .await
1998            .expect_err("execution-stage error should be returned");
1999
2000        assert!(matches!(err, PgError::QueryServer(_)));
2001        assert!(conn.prepared_statements.contains_key(&stmt_name));
2002        assert!(!conn.is_io_desynced());
2003    }
2004
2005    #[cfg(unix)]
2006    #[tokio::test]
2007    async fn query_cached_removes_new_statement_when_parse_fails() {
2008        let (mut conn, _peer) = test_conn_with_peer();
2009        let sql = "SELECT broken";
2010        let stmt_name = PgConnection::sql_to_stmt_name(sql);
2011        let err_payload = error_response_payload("42601", "syntax error");
2012
2013        push_backend_frame(&mut conn, b'E', &err_payload);
2014        push_backend_frame(&mut conn, b'Z', b"I");
2015
2016        let err = conn
2017            .query_cached(sql, &[])
2018            .await
2019            .expect_err("parse-stage error should be returned");
2020
2021        assert!(matches!(err, PgError::QueryServer(_)));
2022        assert!(!conn.prepared_statements.contains_key(&stmt_name));
2023        assert!(!conn.is_io_desynced());
2024    }
2025
2026    #[cfg(unix)]
2027    #[tokio::test]
2028    async fn query_prepared_single_clears_state_on_missing_statement() {
2029        let (mut conn, _peer) = test_conn_with_peer();
2030        let stmt = super::super::PreparedStatement::from_sql("SELECT 1");
2031        let stmt_name = stmt.name().to_string();
2032        conn.prepared_statements
2033            .insert(stmt_name.clone(), "SELECT 1".to_string());
2034        let err_payload = error_response_payload(
2035            "26000",
2036            &format!("prepared statement \"{}\" does not exist", stmt_name),
2037        );
2038
2039        push_backend_frame(&mut conn, b'E', &err_payload);
2040        push_backend_frame(&mut conn, b'Z', b"I");
2041
2042        let err = conn
2043            .query_prepared_single(&stmt, &[])
2044            .await
2045            .expect_err("missing server statement should be returned");
2046
2047        assert!(matches!(err, PgError::QueryServer(_)));
2048        assert!(conn.prepared_statements.is_empty());
2049        assert!(!conn.is_io_desynced());
2050    }
2051
2052    #[cfg(unix)]
2053    #[tokio::test]
2054    async fn query_prepared_single_reuse_clears_state_on_missing_statement() {
2055        let (mut conn, _peer) = test_conn_with_peer();
2056        let stmt = super::super::PreparedStatement::from_sql("SELECT 1");
2057        let stmt_name = stmt.name().to_string();
2058        conn.prepared_statements
2059            .insert(stmt_name.clone(), "SELECT 1".to_string());
2060        let err_payload = error_response_payload(
2061            "26000",
2062            &format!("prepared statement \"{}\" does not exist", stmt_name),
2063        );
2064
2065        push_backend_frame(&mut conn, b'E', &err_payload);
2066        push_backend_frame(&mut conn, b'Z', b"I");
2067
2068        let err = conn
2069            .query_prepared_single_reuse_with_result_format(&stmt, &[], PgEncoder::FORMAT_TEXT)
2070            .await
2071            .expect_err("missing server statement should be returned");
2072
2073        assert!(matches!(err, PgError::QueryServer(_)));
2074        assert!(conn.prepared_statements.is_empty());
2075        assert!(!conn.is_io_desynced());
2076    }
2077}