Skip to main content

qail_pg/driver/pool/
fetch.rs

1//! Fetch methods for PooledConnection: uncached, fast, cached, typed, and pipelined-RLS variants.
2
3use super::connection::PooledConnection;
4use super::lifecycle::MAX_HOT_STATEMENTS;
5use crate::driver::{
6    PgError, PgResult, ResultFormat,
7    extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8    is_ignorable_session_message, unexpected_backend_message,
9};
10use std::sync::Arc;
11
12impl PooledConnection {
13    /// Execute a QAIL command and fetch all rows (UNCACHED).
14    /// Returns rows with column metadata for JSON serialization.
15    pub async fn fetch_all_uncached(
16        &mut self,
17        cmd: &qail_core::ast::Qail,
18    ) -> PgResult<Vec<crate::driver::PgRow>> {
19        self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
20            .await
21    }
22
23    /// Execute raw SQL with bind parameters and return raw row data.
24    ///
25    /// Uses the Extended Query Protocol so parameters are never interpolated
26    /// into the SQL string. Intended for EXPLAIN or other SQL that can't be
27    /// represented as a `Qail` AST but still needs parameterized execution.
28    ///
29    /// Returns raw column bytes; callers must decode as needed.
30    pub async fn query_raw_with_params(
31        &mut self,
32        sql: &str,
33        params: &[Option<Vec<u8>>],
34    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
35        let conn = self.conn_mut()?;
36        conn.query(sql, params).await
37    }
38
39    /// Export data using AST-native COPY TO STDOUT and collect parsed rows.
40    pub async fn copy_export(&mut self, cmd: &qail_core::ast::Qail) -> PgResult<Vec<Vec<String>>> {
41        self.conn_mut()?.copy_export(cmd).await
42    }
43
44    /// Stream AST-native COPY TO STDOUT chunks with bounded memory usage.
45    pub async fn copy_export_stream_raw<F, Fut>(
46        &mut self,
47        cmd: &qail_core::ast::Qail,
48        on_chunk: F,
49    ) -> PgResult<()>
50    where
51        F: FnMut(Vec<u8>) -> Fut,
52        Fut: std::future::Future<Output = PgResult<()>>,
53    {
54        self.conn_mut()?.copy_export_stream_raw(cmd, on_chunk).await
55    }
56
57    /// Stream AST-native COPY TO STDOUT rows with bounded memory usage.
58    pub async fn copy_export_stream_rows<F>(
59        &mut self,
60        cmd: &qail_core::ast::Qail,
61        on_row: F,
62    ) -> PgResult<()>
63    where
64        F: FnMut(Vec<String>) -> PgResult<()>,
65    {
66        self.conn_mut()?.copy_export_stream_rows(cmd, on_row).await
67    }
68
69    /// Export a table using COPY TO STDOUT and collect raw bytes.
70    pub async fn copy_export_table(
71        &mut self,
72        table: &str,
73        columns: &[String],
74    ) -> PgResult<Vec<u8>> {
75        let quote_ident = |ident: &str| -> String {
76            format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
77        };
78        let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
79        let sql = format!(
80            "COPY {} ({}) TO STDOUT",
81            quote_ident(table),
82            cols.join(", ")
83        );
84        self.conn_mut()?.copy_out_raw(&sql).await
85    }
86
87    /// Stream a table export using COPY TO STDOUT with bounded memory usage.
88    pub async fn copy_export_table_stream<F, Fut>(
89        &mut self,
90        table: &str,
91        columns: &[String],
92        on_chunk: F,
93    ) -> PgResult<()>
94    where
95        F: FnMut(Vec<u8>) -> Fut,
96        Fut: std::future::Future<Output = PgResult<()>>,
97    {
98        let quote_ident = |ident: &str| -> String {
99            format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
100        };
101        let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
102        let sql = format!(
103            "COPY {} ({}) TO STDOUT",
104            quote_ident(table),
105            cols.join(", ")
106        );
107        self.conn_mut()?.copy_out_raw_stream(&sql, on_chunk).await
108    }
109
110    /// Execute a QAIL command and fetch all rows (UNCACHED) with explicit result format.
111    pub async fn fetch_all_uncached_with_format(
112        &mut self,
113        cmd: &qail_core::ast::Qail,
114        result_format: ResultFormat,
115    ) -> PgResult<Vec<crate::driver::PgRow>> {
116        use crate::driver::ColumnInfo;
117        use crate::protocol::AstEncoder;
118
119        let conn = self.conn_mut()?;
120
121        AstEncoder::encode_cmd_reuse_into_with_result_format(
122            cmd,
123            &mut conn.sql_buf,
124            &mut conn.params_buf,
125            &mut conn.write_buf,
126            result_format.as_wire_code(),
127        )
128        .map_err(|e| PgError::Encode(e.to_string()))?;
129
130        conn.flush_write_buf().await?;
131
132        let mut rows: Vec<crate::driver::PgRow> = Vec::new();
133        let mut column_info: Option<Arc<ColumnInfo>> = None;
134        let mut error: Option<PgError> = None;
135        let mut flow =
136            ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
137
138        loop {
139            let msg = conn.recv().await?;
140            flow.validate(&msg, "pool fetch_all execute", error.is_some())?;
141            match msg {
142                crate::protocol::BackendMessage::ParseComplete
143                | crate::protocol::BackendMessage::BindComplete => {}
144                crate::protocol::BackendMessage::RowDescription(fields) => {
145                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
146                }
147                crate::protocol::BackendMessage::DataRow(data) => {
148                    if error.is_none() {
149                        rows.push(crate::driver::PgRow {
150                            columns: data,
151                            column_info: column_info.clone(),
152                        });
153                    }
154                }
155                crate::protocol::BackendMessage::NoData => {}
156                crate::protocol::BackendMessage::CommandComplete(_) => {}
157                crate::protocol::BackendMessage::ReadyForQuery(_) => {
158                    if let Some(err) = error {
159                        return Err(err);
160                    }
161                    return Ok(rows);
162                }
163                crate::protocol::BackendMessage::ErrorResponse(err) => {
164                    if error.is_none() {
165                        error = Some(PgError::QueryServer(err.into()));
166                    }
167                }
168                msg if is_ignorable_session_message(&msg) => {}
169                other => {
170                    return Err(unexpected_backend_message("pool fetch_all execute", &other));
171                }
172            }
173        }
174    }
175
176    /// Execute a QAIL command and fetch all rows (FAST VERSION).
177    /// Uses native AST-to-wire encoding and optimized recv_with_data_fast.
178    /// Skips column metadata for maximum speed.
179    pub async fn fetch_all_fast(
180        &mut self,
181        cmd: &qail_core::ast::Qail,
182    ) -> PgResult<Vec<crate::driver::PgRow>> {
183        self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
184            .await
185    }
186
187    /// Execute a QAIL command and fetch all rows (FAST VERSION) with explicit result format.
188    pub async fn fetch_all_fast_with_format(
189        &mut self,
190        cmd: &qail_core::ast::Qail,
191        result_format: ResultFormat,
192    ) -> PgResult<Vec<crate::driver::PgRow>> {
193        use crate::protocol::AstEncoder;
194
195        let conn = self.conn_mut()?;
196
197        AstEncoder::encode_cmd_reuse_into_with_result_format(
198            cmd,
199            &mut conn.sql_buf,
200            &mut conn.params_buf,
201            &mut conn.write_buf,
202            result_format.as_wire_code(),
203        )
204        .map_err(|e| PgError::Encode(e.to_string()))?;
205
206        conn.flush_write_buf().await?;
207
208        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
209        let mut error: Option<PgError> = None;
210        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
211
212        loop {
213            let res = conn.recv_with_data_fast().await;
214            match res {
215                Ok((msg_type, data)) => {
216                    flow.validate_msg_type(
217                        msg_type,
218                        "pool fetch_all_fast execute",
219                        error.is_some(),
220                    )?;
221                    match msg_type {
222                        b'D' => {
223                            if error.is_none()
224                                && let Some(columns) = data
225                            {
226                                rows.push(crate::driver::PgRow {
227                                    columns,
228                                    column_info: None,
229                                });
230                            }
231                        }
232                        b'Z' => {
233                            if let Some(err) = error {
234                                return Err(err);
235                            }
236                            return Ok(rows);
237                        }
238                        _ => {}
239                    }
240                }
241                Err(e) => {
242                    if matches!(&e, PgError::QueryServer(_)) {
243                        if error.is_none() {
244                            error = Some(e);
245                        }
246                        continue;
247                    }
248                    return Err(e);
249                }
250            }
251        }
252    }
253
254    /// Execute a QAIL command and fetch all rows (CACHED).
255    /// Uses prepared statement caching: Parse+Describe on first call,
256    /// then Bind+Execute only on subsequent calls with the same SQL shape.
257    /// This matches PostgREST's behavior for fair benchmarks.
258    pub async fn fetch_all_cached(
259        &mut self,
260        cmd: &qail_core::ast::Qail,
261    ) -> PgResult<Vec<crate::driver::PgRow>> {
262        self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
263            .await
264    }
265
266    /// Execute a QAIL command and fetch all rows (CACHED) with explicit result format.
267    pub async fn fetch_all_cached_with_format(
268        &mut self,
269        cmd: &qail_core::ast::Qail,
270        result_format: ResultFormat,
271    ) -> PgResult<Vec<crate::driver::PgRow>> {
272        let mut retried = false;
273        loop {
274            match self
275                .fetch_all_cached_with_format_once(cmd, result_format)
276                .await
277            {
278                Ok(rows) => return Ok(rows),
279                Err(err)
280                    if !retried
281                        && (err.is_prepared_statement_retryable()
282                            || err.is_prepared_statement_already_exists()) =>
283                {
284                    retried = true;
285                    if err.is_prepared_statement_retryable()
286                        && let Some(conn) = self.conn.as_mut()
287                    {
288                        conn.clear_prepared_statement_state();
289                    }
290                }
291                Err(err) => return Err(err),
292            }
293        }
294    }
295
296    /// Execute a QAIL command and decode rows into typed structs (CACHED, text format).
297    pub async fn fetch_typed<T: crate::driver::row::QailRow>(
298        &mut self,
299        cmd: &qail_core::ast::Qail,
300    ) -> PgResult<Vec<T>> {
301        self.fetch_typed_with_format(cmd, ResultFormat::Text).await
302    }
303
304    /// Execute a QAIL command and decode rows into typed structs with explicit result format.
305    ///
306    /// Use [`ResultFormat::Binary`] for binary wire values; row decoders should use
307    /// metadata-aware helpers like `PgRow::try_get()` / `try_get_by_name()`.
308    pub async fn fetch_typed_with_format<T: crate::driver::row::QailRow>(
309        &mut self,
310        cmd: &qail_core::ast::Qail,
311        result_format: ResultFormat,
312    ) -> PgResult<Vec<T>> {
313        let rows = self
314            .fetch_all_cached_with_format(cmd, result_format)
315            .await?;
316        Ok(rows.iter().map(T::from_row).collect())
317    }
318
319    /// Execute a QAIL command and decode one typed row (CACHED, text format).
320    pub async fn fetch_one_typed<T: crate::driver::row::QailRow>(
321        &mut self,
322        cmd: &qail_core::ast::Qail,
323    ) -> PgResult<Option<T>> {
324        self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
325            .await
326    }
327
328    /// Execute a QAIL command and decode one typed row with explicit result format.
329    pub async fn fetch_one_typed_with_format<T: crate::driver::row::QailRow>(
330        &mut self,
331        cmd: &qail_core::ast::Qail,
332        result_format: ResultFormat,
333    ) -> PgResult<Option<T>> {
334        let rows = self
335            .fetch_all_cached_with_format(cmd, result_format)
336            .await?;
337        Ok(rows.first().map(T::from_row))
338    }
339
340    async fn fetch_all_cached_with_format_once(
341        &mut self,
342        cmd: &qail_core::ast::Qail,
343        result_format: ResultFormat,
344    ) -> PgResult<Vec<crate::driver::PgRow>> {
345        use crate::driver::ColumnInfo;
346        use std::collections::hash_map::DefaultHasher;
347        use std::hash::{Hash, Hasher};
348
349        let conn = self.conn.as_mut().ok_or_else(|| {
350            PgError::Connection("Connection already released back to pool".into())
351        })?;
352
353        conn.sql_buf.clear();
354        conn.params_buf.clear();
355
356        // Encode SQL + params to reusable buffers
357        match cmd.action {
358            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
359                crate::protocol::ast_encoder::dml::encode_select(
360                    cmd,
361                    &mut conn.sql_buf,
362                    &mut conn.params_buf,
363                )?;
364            }
365            qail_core::ast::Action::Add => {
366                crate::protocol::ast_encoder::dml::encode_insert(
367                    cmd,
368                    &mut conn.sql_buf,
369                    &mut conn.params_buf,
370                )?;
371            }
372            qail_core::ast::Action::Set => {
373                crate::protocol::ast_encoder::dml::encode_update(
374                    cmd,
375                    &mut conn.sql_buf,
376                    &mut conn.params_buf,
377                )?;
378            }
379            qail_core::ast::Action::Del => {
380                crate::protocol::ast_encoder::dml::encode_delete(
381                    cmd,
382                    &mut conn.sql_buf,
383                    &mut conn.params_buf,
384                )?;
385            }
386            _ => {
387                // Fallback: unsupported actions go through uncached path
388                return self
389                    .fetch_all_uncached_with_format(cmd, result_format)
390                    .await;
391            }
392        }
393
394        let mut hasher = DefaultHasher::new();
395        conn.sql_buf.hash(&mut hasher);
396        let sql_hash = hasher.finish();
397
398        let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
399
400        conn.write_buf.clear();
401
402        let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
403            name
404        } else {
405            let name = format!("qail_{:x}", sql_hash);
406
407            conn.evict_prepared_if_full();
408
409            let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
410
411            use crate::protocol::PgEncoder;
412            let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
413            let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
414            conn.write_buf.extend_from_slice(&parse_msg);
415            conn.write_buf.extend_from_slice(&describe_msg);
416
417            conn.stmt_cache.put(sql_hash, name.clone());
418            conn.prepared_statements
419                .insert(name.clone(), sql_str.to_string());
420
421            // Register in global hot-statement registry for cross-connection sharing
422            if let Ok(mut hot) = self.pool.hot_statements.write()
423                && hot.len() < MAX_HOT_STATEMENTS
424            {
425                hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
426            }
427
428            name
429        };
430
431        use crate::protocol::PgEncoder;
432        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
433            &mut conn.write_buf,
434            &stmt_name,
435            &conn.params_buf,
436            result_format.as_wire_code(),
437        ) {
438            if is_cache_miss {
439                conn.stmt_cache.remove(&sql_hash);
440                conn.prepared_statements.remove(&stmt_name);
441                conn.column_info_cache.remove(&sql_hash);
442            }
443            return Err(PgError::Encode(e.to_string()));
444        }
445        PgEncoder::encode_execute_to(&mut conn.write_buf);
446        PgEncoder::encode_sync_to(&mut conn.write_buf);
447
448        if let Err(err) = conn.flush_write_buf().await {
449            if is_cache_miss {
450                conn.stmt_cache.remove(&sql_hash);
451                conn.prepared_statements.remove(&stmt_name);
452                conn.column_info_cache.remove(&sql_hash);
453            }
454            return Err(err);
455        }
456
457        let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
458
459        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
460        let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
461        let mut error: Option<PgError> = None;
462        let mut flow = ExtendedFlowTracker::new(
463            ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
464        );
465
466        loop {
467            let msg = match conn.recv().await {
468                Ok(msg) => msg,
469                Err(err) => {
470                    if is_cache_miss && !flow.saw_parse_complete() {
471                        conn.stmt_cache.remove(&sql_hash);
472                        conn.prepared_statements.remove(&stmt_name);
473                        conn.column_info_cache.remove(&sql_hash);
474                    }
475                    return Err(err);
476                }
477            };
478            if let Err(err) = flow.validate(&msg, "pool fetch_all_cached execute", error.is_some())
479            {
480                if is_cache_miss && !flow.saw_parse_complete() {
481                    conn.stmt_cache.remove(&sql_hash);
482                    conn.prepared_statements.remove(&stmt_name);
483                    conn.column_info_cache.remove(&sql_hash);
484                }
485                return Err(err);
486            }
487            match msg {
488                crate::protocol::BackendMessage::ParseComplete => {}
489                crate::protocol::BackendMessage::BindComplete => {}
490                crate::protocol::BackendMessage::ParameterDescription(_) => {}
491                crate::protocol::BackendMessage::RowDescription(fields) => {
492                    let info = Arc::new(ColumnInfo::from_fields(&fields));
493                    if is_cache_miss {
494                        conn.column_info_cache.insert(sql_hash, info.clone());
495                    }
496                    column_info = Some(info);
497                }
498                crate::protocol::BackendMessage::DataRow(data) => {
499                    if error.is_none() {
500                        rows.push(crate::driver::PgRow {
501                            columns: data,
502                            column_info: column_info.clone(),
503                        });
504                    }
505                }
506                crate::protocol::BackendMessage::CommandComplete(_) => {}
507                crate::protocol::BackendMessage::ReadyForQuery(_) => {
508                    if let Some(err) = error {
509                        if is_cache_miss
510                            && !flow.saw_parse_complete()
511                            && !err.is_prepared_statement_already_exists()
512                        {
513                            conn.stmt_cache.remove(&sql_hash);
514                            conn.prepared_statements.remove(&stmt_name);
515                            conn.column_info_cache.remove(&sql_hash);
516                        }
517                        return Err(err);
518                    }
519                    if is_cache_miss && !flow.saw_parse_complete() {
520                        conn.stmt_cache.remove(&sql_hash);
521                        conn.prepared_statements.remove(&stmt_name);
522                        conn.column_info_cache.remove(&sql_hash);
523                        return Err(PgError::Protocol(
524                            "Cache miss query reached ReadyForQuery without ParseComplete"
525                                .to_string(),
526                        ));
527                    }
528                    return Ok(rows);
529                }
530                crate::protocol::BackendMessage::ErrorResponse(err) => {
531                    if error.is_none() {
532                        error = Some(PgError::QueryServer(err.into()));
533                    }
534                }
535                msg if is_ignorable_session_message(&msg) => {}
536                other => {
537                    if is_cache_miss && !flow.saw_parse_complete() {
538                        conn.stmt_cache.remove(&sql_hash);
539                        conn.prepared_statements.remove(&stmt_name);
540                        conn.column_info_cache.remove(&sql_hash);
541                    }
542                    return Err(unexpected_backend_message(
543                        "pool fetch_all_cached execute",
544                        &other,
545                    ));
546                }
547            }
548        }
549    }
550
551    /// Execute a QAIL command with RLS context in a SINGLE roundtrip.
552    ///
553    /// Pipelines the RLS setup (BEGIN + set_config) and the query
554    /// (Parse/Bind/Execute/Sync) into one `write_all` syscall.
555    /// PG processes messages in order, so the BEGIN + set_config
556    /// completes before the query executes — security is preserved.
557    ///
558    /// Wire layout:
559    /// ```text
560    /// [SimpleQuery: "BEGIN; SET LOCAL...; SELECT set_config(...)"]
561    /// [Parse (if cache miss)]
562    /// [Describe (if cache miss)]
563    /// [Bind]
564    /// [Execute]
565    /// [Sync]
566    /// ```
567    ///
568    /// Response processing: consume 2× ReadyForQuery (SimpleQuery + Sync).
569    pub async fn fetch_all_with_rls(
570        &mut self,
571        cmd: &qail_core::ast::Qail,
572        rls_sql: &str,
573    ) -> PgResult<Vec<crate::driver::PgRow>> {
574        self.fetch_all_with_rls_with_format(cmd, rls_sql, ResultFormat::Text)
575            .await
576    }
577
578    /// Execute a QAIL command with RLS context in a SINGLE roundtrip with explicit result format.
579    pub async fn fetch_all_with_rls_with_format(
580        &mut self,
581        cmd: &qail_core::ast::Qail,
582        rls_sql: &str,
583        result_format: ResultFormat,
584    ) -> PgResult<Vec<crate::driver::PgRow>> {
585        let mut retried = false;
586        loop {
587            match self
588                .fetch_all_with_rls_with_format_once(cmd, rls_sql, result_format)
589                .await
590            {
591                Ok(rows) => return Ok(rows),
592                Err(err)
593                    if !retried
594                        && (err.is_prepared_statement_retryable()
595                            || err.is_prepared_statement_already_exists()) =>
596                {
597                    retried = true;
598                    if err.is_prepared_statement_retryable()
599                        && let Some(conn) = self.conn.as_mut()
600                    {
601                        conn.clear_prepared_statement_state();
602                        let _ = conn.execute_simple("ROLLBACK").await;
603                    }
604                    self.rls_dirty = false;
605                }
606                Err(err) => return Err(err),
607            }
608        }
609    }
610
611    async fn fetch_all_with_rls_with_format_once(
612        &mut self,
613        cmd: &qail_core::ast::Qail,
614        rls_sql: &str,
615        result_format: ResultFormat,
616    ) -> PgResult<Vec<crate::driver::PgRow>> {
617        use crate::driver::ColumnInfo;
618        use std::collections::hash_map::DefaultHasher;
619        use std::hash::{Hash, Hasher};
620
621        let conn = self.conn.as_mut().ok_or_else(|| {
622            PgError::Connection("Connection already released back to pool".into())
623        })?;
624
625        conn.sql_buf.clear();
626        conn.params_buf.clear();
627
628        // Encode SQL + params to reusable buffers
629        match cmd.action {
630            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
631                crate::protocol::ast_encoder::dml::encode_select(
632                    cmd,
633                    &mut conn.sql_buf,
634                    &mut conn.params_buf,
635                )?;
636            }
637            qail_core::ast::Action::Add => {
638                crate::protocol::ast_encoder::dml::encode_insert(
639                    cmd,
640                    &mut conn.sql_buf,
641                    &mut conn.params_buf,
642                )?;
643            }
644            qail_core::ast::Action::Set => {
645                crate::protocol::ast_encoder::dml::encode_update(
646                    cmd,
647                    &mut conn.sql_buf,
648                    &mut conn.params_buf,
649                )?;
650            }
651            qail_core::ast::Action::Del => {
652                crate::protocol::ast_encoder::dml::encode_delete(
653                    cmd,
654                    &mut conn.sql_buf,
655                    &mut conn.params_buf,
656                )?;
657            }
658            _ => {
659                // Fallback: RLS setup must happen synchronously for unsupported actions
660                conn.execute_simple(rls_sql).await?;
661                self.rls_dirty = true;
662                return self
663                    .fetch_all_uncached_with_format(cmd, result_format)
664                    .await;
665            }
666        }
667
668        let mut hasher = DefaultHasher::new();
669        conn.sql_buf.hash(&mut hasher);
670        let sql_hash = hasher.finish();
671
672        let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
673
674        conn.write_buf.clear();
675
676        // ── Prepend RLS Simple Query message ─────────────────────────
677        // This is the key optimization: RLS setup bytes go first in the
678        // same buffer as the query messages.
679        let rls_msg = crate::protocol::PgEncoder::try_encode_query_string(rls_sql)?;
680        conn.write_buf.extend_from_slice(&rls_msg);
681
682        // ── Then append the query messages (same as fetch_all_cached) ──
683        let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
684            name
685        } else {
686            let name = format!("qail_{:x}", sql_hash);
687
688            conn.evict_prepared_if_full();
689
690            let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
691
692            use crate::protocol::PgEncoder;
693            let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
694            let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
695            conn.write_buf.extend_from_slice(&parse_msg);
696            conn.write_buf.extend_from_slice(&describe_msg);
697
698            conn.stmt_cache.put(sql_hash, name.clone());
699            conn.prepared_statements
700                .insert(name.clone(), sql_str.to_string());
701
702            if let Ok(mut hot) = self.pool.hot_statements.write()
703                && hot.len() < MAX_HOT_STATEMENTS
704            {
705                hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
706            }
707
708            name
709        };
710
711        use crate::protocol::PgEncoder;
712        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
713            &mut conn.write_buf,
714            &stmt_name,
715            &conn.params_buf,
716            result_format.as_wire_code(),
717        ) {
718            if is_cache_miss {
719                conn.stmt_cache.remove(&sql_hash);
720                conn.prepared_statements.remove(&stmt_name);
721                conn.column_info_cache.remove(&sql_hash);
722            }
723            return Err(PgError::Encode(e.to_string()));
724        }
725        PgEncoder::encode_execute_to(&mut conn.write_buf);
726        PgEncoder::encode_sync_to(&mut conn.write_buf);
727
728        // ── Single write_all for RLS + Query ────────────────────────
729        if let Err(err) = conn.flush_write_buf().await {
730            if is_cache_miss {
731                conn.stmt_cache.remove(&sql_hash);
732                conn.prepared_statements.remove(&stmt_name);
733                conn.column_info_cache.remove(&sql_hash);
734            }
735            return Err(err);
736        }
737
738        // Mark connection as RLS-dirty (needs COMMIT on release)
739        self.rls_dirty = true;
740
741        // ── Phase 1: Consume Simple Query responses (RLS setup) ─────
742        // Simple Query produces: CommandComplete × N, then ReadyForQuery.
743        // set_config results and BEGIN/SET LOCAL responses are all here.
744        let mut rls_error: Option<PgError> = None;
745        loop {
746            let msg = match conn.recv().await {
747                Ok(msg) => msg,
748                Err(err) => {
749                    if is_cache_miss {
750                        conn.stmt_cache.remove(&sql_hash);
751                        conn.prepared_statements.remove(&stmt_name);
752                        conn.column_info_cache.remove(&sql_hash);
753                    }
754                    return Err(err);
755                }
756            };
757            match msg {
758                crate::protocol::BackendMessage::ReadyForQuery(_) => {
759                    // RLS setup done — break to Extended Query phase
760                    if let Some(err) = rls_error {
761                        return Err(err);
762                    }
763                    break;
764                }
765                crate::protocol::BackendMessage::ErrorResponse(err) => {
766                    if rls_error.is_none() {
767                        rls_error = Some(PgError::QueryServer(err.into()));
768                    }
769                }
770                // CommandComplete, DataRow (from set_config), RowDescription — ignore
771                crate::protocol::BackendMessage::CommandComplete(_)
772                | crate::protocol::BackendMessage::DataRow(_)
773                | crate::protocol::BackendMessage::RowDescription(_)
774                | crate::protocol::BackendMessage::ParseComplete
775                | crate::protocol::BackendMessage::BindComplete => {}
776                msg if is_ignorable_session_message(&msg) => {}
777                other => return Err(unexpected_backend_message("pool rls setup", &other)),
778            }
779        }
780
781        // ── Phase 2: Consume Extended Query responses (actual data) ──
782        let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
783
784        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
785        let mut column_info: Option<std::sync::Arc<ColumnInfo>> = cached_column_info;
786        let mut error: Option<PgError> = None;
787        let mut flow = ExtendedFlowTracker::new(
788            ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
789        );
790
791        loop {
792            let msg = match conn.recv().await {
793                Ok(msg) => msg,
794                Err(err) => {
795                    if is_cache_miss && !flow.saw_parse_complete() {
796                        conn.stmt_cache.remove(&sql_hash);
797                        conn.prepared_statements.remove(&stmt_name);
798                        conn.column_info_cache.remove(&sql_hash);
799                    }
800                    return Err(err);
801                }
802            };
803            if let Err(err) =
804                flow.validate(&msg, "pool fetch_all_with_rls execute", error.is_some())
805            {
806                if is_cache_miss && !flow.saw_parse_complete() {
807                    conn.stmt_cache.remove(&sql_hash);
808                    conn.prepared_statements.remove(&stmt_name);
809                    conn.column_info_cache.remove(&sql_hash);
810                }
811                return Err(err);
812            }
813            match msg {
814                crate::protocol::BackendMessage::ParseComplete => {}
815                crate::protocol::BackendMessage::BindComplete => {}
816                crate::protocol::BackendMessage::ParameterDescription(_) => {}
817                crate::protocol::BackendMessage::RowDescription(fields) => {
818                    let info = std::sync::Arc::new(ColumnInfo::from_fields(&fields));
819                    if is_cache_miss {
820                        conn.column_info_cache.insert(sql_hash, info.clone());
821                    }
822                    column_info = Some(info);
823                }
824                crate::protocol::BackendMessage::DataRow(data) => {
825                    if error.is_none() {
826                        rows.push(crate::driver::PgRow {
827                            columns: data,
828                            column_info: column_info.clone(),
829                        });
830                    }
831                }
832                crate::protocol::BackendMessage::CommandComplete(_) => {}
833                crate::protocol::BackendMessage::ReadyForQuery(_) => {
834                    if let Some(err) = error {
835                        if is_cache_miss
836                            && !flow.saw_parse_complete()
837                            && !err.is_prepared_statement_already_exists()
838                        {
839                            conn.stmt_cache.remove(&sql_hash);
840                            conn.prepared_statements.remove(&stmt_name);
841                            conn.column_info_cache.remove(&sql_hash);
842                        }
843                        return Err(err);
844                    }
845                    if is_cache_miss && !flow.saw_parse_complete() {
846                        conn.stmt_cache.remove(&sql_hash);
847                        conn.prepared_statements.remove(&stmt_name);
848                        conn.column_info_cache.remove(&sql_hash);
849                        return Err(PgError::Protocol(
850                            "Cache miss query reached ReadyForQuery without ParseComplete"
851                                .to_string(),
852                        ));
853                    }
854                    return Ok(rows);
855                }
856                crate::protocol::BackendMessage::ErrorResponse(err) => {
857                    if error.is_none() {
858                        error = Some(PgError::QueryServer(err.into()));
859                    }
860                }
861                msg if is_ignorable_session_message(&msg) => {}
862                other => {
863                    if is_cache_miss && !flow.saw_parse_complete() {
864                        conn.stmt_cache.remove(&sql_hash);
865                        conn.prepared_statements.remove(&stmt_name);
866                        conn.column_info_cache.remove(&sql_hash);
867                    }
868                    return Err(unexpected_backend_message(
869                        "pool fetch_all_with_rls execute",
870                        &other,
871                    ));
872                }
873            }
874        }
875    }
876}