Skip to main content

qail_pg/driver/pool/
fetch.rs

1//! Fetch methods for PooledConnection: uncached, fast, cached, typed, and pipelined-RLS variants.
2
3use super::connection::PooledConnection;
4use super::lifecycle::MAX_HOT_STATEMENTS;
5use crate::driver::{
6    PgConnection, PgError, PgResult, ResultFormat,
7    extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8    is_ignorable_session_message, unexpected_backend_message,
9};
10use std::sync::Arc;
11
12#[inline]
13fn rollback_cache_miss_statement_registration(
14    conn: &mut PgConnection,
15    is_cache_miss: bool,
16    sql_hash: u64,
17    stmt_name: &str,
18) {
19    if is_cache_miss {
20        conn.stmt_cache.remove(&sql_hash);
21        conn.prepared_statements.remove(stmt_name);
22        conn.column_info_cache.remove(&sql_hash);
23    }
24}
25
26async fn drain_extended_responses_after_rls_setup_error(conn: &mut PgConnection) -> PgResult<()> {
27    loop {
28        let msg = conn.recv().await?;
29        match msg {
30            crate::protocol::BackendMessage::ReadyForQuery(_) => return Ok(()),
31            crate::protocol::BackendMessage::ErrorResponse(_) => {}
32            msg if is_ignorable_session_message(&msg) => {}
33            // Best-effort drain: consume everything until Sync's ReadyForQuery.
34            _ => {}
35        }
36    }
37}
38
39impl PooledConnection {
40    /// Execute a QAIL command and fetch all rows (UNCACHED).
41    /// Returns rows with column metadata for JSON serialization.
42    pub async fn fetch_all_uncached(
43        &mut self,
44        cmd: &qail_core::ast::Qail,
45    ) -> PgResult<Vec<crate::driver::PgRow>> {
46        self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
47            .await
48    }
49
50    /// Execute raw SQL with bind parameters and return raw row data.
51    ///
52    /// Uses the Extended Query Protocol so parameters are never interpolated
53    /// into the SQL string. Intended for EXPLAIN or other SQL that can't be
54    /// represented as a `Qail` AST but still needs parameterized execution.
55    ///
56    /// Returns raw column bytes; callers must decode as needed.
57    pub async fn query_raw_with_params(
58        &mut self,
59        sql: &str,
60        params: &[Option<Vec<u8>>],
61    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
62        let conn = self.conn_mut()?;
63        conn.query(sql, params).await
64    }
65
66    /// Export data using AST-native COPY TO STDOUT and collect parsed rows.
67    pub async fn copy_export(&mut self, cmd: &qail_core::ast::Qail) -> PgResult<Vec<Vec<String>>> {
68        self.conn_mut()?.copy_export(cmd).await
69    }
70
71    /// Stream AST-native COPY TO STDOUT chunks with bounded memory usage.
72    pub async fn copy_export_stream_raw<F, Fut>(
73        &mut self,
74        cmd: &qail_core::ast::Qail,
75        on_chunk: F,
76    ) -> PgResult<()>
77    where
78        F: FnMut(Vec<u8>) -> Fut,
79        Fut: std::future::Future<Output = PgResult<()>>,
80    {
81        self.conn_mut()?.copy_export_stream_raw(cmd, on_chunk).await
82    }
83
84    /// Stream AST-native COPY TO STDOUT rows with bounded memory usage.
85    pub async fn copy_export_stream_rows<F>(
86        &mut self,
87        cmd: &qail_core::ast::Qail,
88        on_row: F,
89    ) -> PgResult<()>
90    where
91        F: FnMut(Vec<String>) -> PgResult<()>,
92    {
93        self.conn_mut()?.copy_export_stream_rows(cmd, on_row).await
94    }
95
96    /// Export a table using COPY TO STDOUT and collect raw bytes.
97    pub async fn copy_export_table(
98        &mut self,
99        table: &str,
100        columns: &[String],
101    ) -> PgResult<Vec<u8>> {
102        let quote_ident = |ident: &str| -> String {
103            format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
104        };
105        let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
106        let sql = format!(
107            "COPY {} ({}) TO STDOUT",
108            quote_ident(table),
109            cols.join(", ")
110        );
111        self.conn_mut()?.copy_out_raw(&sql).await
112    }
113
114    /// Stream a table export using COPY TO STDOUT with bounded memory usage.
115    pub async fn copy_export_table_stream<F, Fut>(
116        &mut self,
117        table: &str,
118        columns: &[String],
119        on_chunk: F,
120    ) -> PgResult<()>
121    where
122        F: FnMut(Vec<u8>) -> Fut,
123        Fut: std::future::Future<Output = PgResult<()>>,
124    {
125        let quote_ident = |ident: &str| -> String {
126            format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
127        };
128        let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
129        let sql = format!(
130            "COPY {} ({}) TO STDOUT",
131            quote_ident(table),
132            cols.join(", ")
133        );
134        self.conn_mut()?.copy_out_raw_stream(&sql, on_chunk).await
135    }
136
137    /// Execute a QAIL command and fetch all rows (UNCACHED) with explicit result format.
138    pub async fn fetch_all_uncached_with_format(
139        &mut self,
140        cmd: &qail_core::ast::Qail,
141        result_format: ResultFormat,
142    ) -> PgResult<Vec<crate::driver::PgRow>> {
143        use crate::driver::ColumnInfo;
144        use crate::protocol::AstEncoder;
145
146        let conn = self.conn_mut()?;
147
148        AstEncoder::encode_cmd_reuse_into_with_result_format(
149            cmd,
150            &mut conn.sql_buf,
151            &mut conn.params_buf,
152            &mut conn.write_buf,
153            result_format.as_wire_code(),
154        )
155        .map_err(|e| PgError::Encode(e.to_string()))?;
156
157        conn.flush_write_buf().await?;
158
159        let mut rows: Vec<crate::driver::PgRow> = Vec::new();
160        let mut column_info: Option<Arc<ColumnInfo>> = None;
161        let mut error: Option<PgError> = None;
162        let mut flow =
163            ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
164
165        loop {
166            let msg = conn.recv().await?;
167            flow.validate(&msg, "pool fetch_all execute", error.is_some())?;
168            match msg {
169                crate::protocol::BackendMessage::ParseComplete
170                | crate::protocol::BackendMessage::BindComplete => {}
171                crate::protocol::BackendMessage::RowDescription(fields) => {
172                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
173                }
174                crate::protocol::BackendMessage::DataRow(data) => {
175                    if error.is_none() {
176                        rows.push(crate::driver::PgRow {
177                            columns: data,
178                            column_info: column_info.clone(),
179                        });
180                    }
181                }
182                crate::protocol::BackendMessage::NoData => {}
183                crate::protocol::BackendMessage::CommandComplete(_) => {}
184                crate::protocol::BackendMessage::ReadyForQuery(_) => {
185                    if let Some(err) = error {
186                        return Err(err);
187                    }
188                    return Ok(rows);
189                }
190                crate::protocol::BackendMessage::ErrorResponse(err) => {
191                    if error.is_none() {
192                        error = Some(PgError::QueryServer(err.into()));
193                    }
194                }
195                msg if is_ignorable_session_message(&msg) => {}
196                other => {
197                    return Err(unexpected_backend_message("pool fetch_all execute", &other));
198                }
199            }
200        }
201    }
202
203    /// Execute a QAIL command and fetch all rows (FAST VERSION).
204    /// Uses native AST-to-wire encoding and optimized recv_with_data_fast.
205    /// Skips column metadata for maximum speed.
206    pub async fn fetch_all_fast(
207        &mut self,
208        cmd: &qail_core::ast::Qail,
209    ) -> PgResult<Vec<crate::driver::PgRow>> {
210        self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
211            .await
212    }
213
214    /// Execute a QAIL command and fetch all rows (FAST VERSION) with explicit result format.
215    pub async fn fetch_all_fast_with_format(
216        &mut self,
217        cmd: &qail_core::ast::Qail,
218        result_format: ResultFormat,
219    ) -> PgResult<Vec<crate::driver::PgRow>> {
220        use crate::protocol::AstEncoder;
221
222        let conn = self.conn_mut()?;
223
224        AstEncoder::encode_cmd_reuse_into_with_result_format(
225            cmd,
226            &mut conn.sql_buf,
227            &mut conn.params_buf,
228            &mut conn.write_buf,
229            result_format.as_wire_code(),
230        )
231        .map_err(|e| PgError::Encode(e.to_string()))?;
232
233        conn.flush_write_buf().await?;
234
235        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
236        let mut error: Option<PgError> = None;
237        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
238
239        loop {
240            let res = conn.recv_with_data_fast().await;
241            match res {
242                Ok((msg_type, data)) => {
243                    flow.validate_msg_type(
244                        msg_type,
245                        "pool fetch_all_fast execute",
246                        error.is_some(),
247                    )?;
248                    match msg_type {
249                        b'D' => {
250                            if error.is_none()
251                                && let Some(columns) = data
252                            {
253                                rows.push(crate::driver::PgRow {
254                                    columns,
255                                    column_info: None,
256                                });
257                            }
258                        }
259                        b'Z' => {
260                            if let Some(err) = error {
261                                return Err(err);
262                            }
263                            return Ok(rows);
264                        }
265                        _ => {}
266                    }
267                }
268                Err(e) => {
269                    if matches!(&e, PgError::QueryServer(_)) {
270                        if error.is_none() {
271                            error = Some(e);
272                        }
273                        continue;
274                    }
275                    return Err(e);
276                }
277            }
278        }
279    }
280
281    /// Execute a QAIL command and fetch all rows (CACHED).
282    /// Uses prepared statement caching: Parse+Describe on first call,
283    /// then Bind+Execute only on subsequent calls with the same SQL shape.
284    /// This matches PostgREST's behavior for fair benchmarks.
285    pub async fn fetch_all_cached(
286        &mut self,
287        cmd: &qail_core::ast::Qail,
288    ) -> PgResult<Vec<crate::driver::PgRow>> {
289        self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
290            .await
291    }
292
293    /// Execute a QAIL command and fetch all rows (CACHED) with explicit result format.
294    pub async fn fetch_all_cached_with_format(
295        &mut self,
296        cmd: &qail_core::ast::Qail,
297        result_format: ResultFormat,
298    ) -> PgResult<Vec<crate::driver::PgRow>> {
299        let mut retried = false;
300        loop {
301            match self
302                .fetch_all_cached_with_format_once(cmd, result_format)
303                .await
304            {
305                Ok(rows) => return Ok(rows),
306                Err(err)
307                    if !retried
308                        && (err.is_prepared_statement_retryable()
309                            || err.is_prepared_statement_already_exists()) =>
310                {
311                    retried = true;
312                    if err.is_prepared_statement_retryable()
313                        && let Some(conn) = self.conn.as_mut()
314                    {
315                        conn.clear_prepared_statement_state();
316                    }
317                }
318                Err(err) => return Err(err),
319            }
320        }
321    }
322
323    /// Execute a QAIL command and decode rows into typed structs (CACHED, text format).
324    pub async fn fetch_typed<T: crate::driver::row::QailRow>(
325        &mut self,
326        cmd: &qail_core::ast::Qail,
327    ) -> PgResult<Vec<T>> {
328        self.fetch_typed_with_format(cmd, ResultFormat::Text).await
329    }
330
331    /// Execute a QAIL command and decode rows into typed structs with explicit result format.
332    ///
333    /// Use [`ResultFormat::Binary`] for binary wire values; row decoders should use
334    /// metadata-aware helpers like `PgRow::try_get()` / `try_get_by_name()`.
335    pub async fn fetch_typed_with_format<T: crate::driver::row::QailRow>(
336        &mut self,
337        cmd: &qail_core::ast::Qail,
338        result_format: ResultFormat,
339    ) -> PgResult<Vec<T>> {
340        let rows = self
341            .fetch_all_cached_with_format(cmd, result_format)
342            .await?;
343        Ok(rows.iter().map(T::from_row).collect())
344    }
345
346    /// Execute a QAIL command and decode one typed row (CACHED, text format).
347    pub async fn fetch_one_typed<T: crate::driver::row::QailRow>(
348        &mut self,
349        cmd: &qail_core::ast::Qail,
350    ) -> PgResult<Option<T>> {
351        self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
352            .await
353    }
354
355    /// Execute a QAIL command and decode one typed row with explicit result format.
356    pub async fn fetch_one_typed_with_format<T: crate::driver::row::QailRow>(
357        &mut self,
358        cmd: &qail_core::ast::Qail,
359        result_format: ResultFormat,
360    ) -> PgResult<Option<T>> {
361        let rows = self
362            .fetch_all_cached_with_format(cmd, result_format)
363            .await?;
364        Ok(rows.first().map(T::from_row))
365    }
366
367    async fn fetch_all_cached_with_format_once(
368        &mut self,
369        cmd: &qail_core::ast::Qail,
370        result_format: ResultFormat,
371    ) -> PgResult<Vec<crate::driver::PgRow>> {
372        use crate::driver::ColumnInfo;
373        use std::collections::hash_map::DefaultHasher;
374        use std::hash::{Hash, Hasher};
375
376        let conn = self.conn.as_mut().ok_or_else(|| {
377            PgError::Connection("Connection already released back to pool".into())
378        })?;
379
380        conn.sql_buf.clear();
381        conn.params_buf.clear();
382
383        // Encode SQL + params to reusable buffers
384        match cmd.action {
385            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
386                crate::protocol::ast_encoder::dml::encode_select(
387                    cmd,
388                    &mut conn.sql_buf,
389                    &mut conn.params_buf,
390                )?;
391            }
392            qail_core::ast::Action::Add => {
393                crate::protocol::ast_encoder::dml::encode_insert(
394                    cmd,
395                    &mut conn.sql_buf,
396                    &mut conn.params_buf,
397                )?;
398            }
399            qail_core::ast::Action::Set => {
400                crate::protocol::ast_encoder::dml::encode_update(
401                    cmd,
402                    &mut conn.sql_buf,
403                    &mut conn.params_buf,
404                )?;
405            }
406            qail_core::ast::Action::Del => {
407                crate::protocol::ast_encoder::dml::encode_delete(
408                    cmd,
409                    &mut conn.sql_buf,
410                    &mut conn.params_buf,
411                )?;
412            }
413            _ => {
414                // Fallback: unsupported actions go through uncached path
415                return self
416                    .fetch_all_uncached_with_format(cmd, result_format)
417                    .await;
418            }
419        }
420
421        let mut hasher = DefaultHasher::new();
422        conn.sql_buf.hash(&mut hasher);
423        let sql_hash = hasher.finish();
424
425        let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
426
427        conn.write_buf.clear();
428
429        let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
430            name
431        } else {
432            let name = format!("qail_{:x}", sql_hash);
433
434            conn.evict_prepared_if_full();
435
436            let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
437
438            use crate::protocol::PgEncoder;
439            let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
440            let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
441            conn.write_buf.extend_from_slice(&parse_msg);
442            conn.write_buf.extend_from_slice(&describe_msg);
443
444            conn.stmt_cache.put(sql_hash, name.clone());
445            conn.prepared_statements
446                .insert(name.clone(), sql_str.to_string());
447
448            // Register in global hot-statement registry for cross-connection sharing
449            if let Ok(mut hot) = self.pool.hot_statements.write()
450                && hot.len() < MAX_HOT_STATEMENTS
451            {
452                hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
453            }
454
455            name
456        };
457
458        use crate::protocol::PgEncoder;
459        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
460            &mut conn.write_buf,
461            &stmt_name,
462            &conn.params_buf,
463            result_format.as_wire_code(),
464        ) {
465            if is_cache_miss {
466                conn.stmt_cache.remove(&sql_hash);
467                conn.prepared_statements.remove(&stmt_name);
468                conn.column_info_cache.remove(&sql_hash);
469            }
470            return Err(PgError::Encode(e.to_string()));
471        }
472        PgEncoder::encode_execute_to(&mut conn.write_buf);
473        PgEncoder::encode_sync_to(&mut conn.write_buf);
474
475        if let Err(err) = conn.flush_write_buf().await {
476            if is_cache_miss {
477                conn.stmt_cache.remove(&sql_hash);
478                conn.prepared_statements.remove(&stmt_name);
479                conn.column_info_cache.remove(&sql_hash);
480            }
481            return Err(err);
482        }
483
484        let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
485
486        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
487        let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
488        let mut error: Option<PgError> = None;
489        let mut flow = ExtendedFlowTracker::new(
490            ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
491        );
492
493        loop {
494            let msg = match conn.recv().await {
495                Ok(msg) => msg,
496                Err(err) => {
497                    if is_cache_miss && !flow.saw_parse_complete() {
498                        conn.stmt_cache.remove(&sql_hash);
499                        conn.prepared_statements.remove(&stmt_name);
500                        conn.column_info_cache.remove(&sql_hash);
501                    }
502                    return Err(err);
503                }
504            };
505            if let Err(err) = flow.validate(&msg, "pool fetch_all_cached execute", error.is_some())
506            {
507                if is_cache_miss && !flow.saw_parse_complete() {
508                    conn.stmt_cache.remove(&sql_hash);
509                    conn.prepared_statements.remove(&stmt_name);
510                    conn.column_info_cache.remove(&sql_hash);
511                }
512                return Err(err);
513            }
514            match msg {
515                crate::protocol::BackendMessage::ParseComplete => {}
516                crate::protocol::BackendMessage::BindComplete => {}
517                crate::protocol::BackendMessage::ParameterDescription(_) => {}
518                crate::protocol::BackendMessage::RowDescription(fields) => {
519                    let info = Arc::new(ColumnInfo::from_fields(&fields));
520                    if is_cache_miss {
521                        conn.column_info_cache.insert(sql_hash, info.clone());
522                    }
523                    column_info = Some(info);
524                }
525                crate::protocol::BackendMessage::DataRow(data) => {
526                    if error.is_none() {
527                        rows.push(crate::driver::PgRow {
528                            columns: data,
529                            column_info: column_info.clone(),
530                        });
531                    }
532                }
533                crate::protocol::BackendMessage::CommandComplete(_) => {}
534                crate::protocol::BackendMessage::ReadyForQuery(_) => {
535                    if let Some(err) = error {
536                        if is_cache_miss
537                            && !flow.saw_parse_complete()
538                            && !err.is_prepared_statement_already_exists()
539                        {
540                            conn.stmt_cache.remove(&sql_hash);
541                            conn.prepared_statements.remove(&stmt_name);
542                            conn.column_info_cache.remove(&sql_hash);
543                        }
544                        return Err(err);
545                    }
546                    if is_cache_miss && !flow.saw_parse_complete() {
547                        conn.stmt_cache.remove(&sql_hash);
548                        conn.prepared_statements.remove(&stmt_name);
549                        conn.column_info_cache.remove(&sql_hash);
550                        return Err(PgError::Protocol(
551                            "Cache miss query reached ReadyForQuery without ParseComplete"
552                                .to_string(),
553                        ));
554                    }
555                    return Ok(rows);
556                }
557                crate::protocol::BackendMessage::ErrorResponse(err) => {
558                    if error.is_none() {
559                        error = Some(PgError::QueryServer(err.into()));
560                    }
561                }
562                msg if is_ignorable_session_message(&msg) => {}
563                other => {
564                    if is_cache_miss && !flow.saw_parse_complete() {
565                        conn.stmt_cache.remove(&sql_hash);
566                        conn.prepared_statements.remove(&stmt_name);
567                        conn.column_info_cache.remove(&sql_hash);
568                    }
569                    return Err(unexpected_backend_message(
570                        "pool fetch_all_cached execute",
571                        &other,
572                    ));
573                }
574            }
575        }
576    }
577
578    /// Execute a QAIL command with RLS context in a SINGLE roundtrip.
579    ///
580    /// Pipelines the RLS setup (BEGIN + set_config) and the query
581    /// (Parse/Bind/Execute/Sync) into one `write_all` syscall.
582    /// PG processes messages in order, so the BEGIN + set_config
583    /// completes before the query executes — security is preserved.
584    ///
585    /// Wire layout:
586    /// ```text
587    /// [SimpleQuery: "BEGIN; SET LOCAL...; SELECT set_config(...)"]
588    /// [Parse (if cache miss)]
589    /// [Describe (if cache miss)]
590    /// [Bind]
591    /// [Execute]
592    /// [Sync]
593    /// ```
594    ///
595    /// Response processing: consume 2× ReadyForQuery (SimpleQuery + Sync).
596    pub async fn fetch_all_with_rls(
597        &mut self,
598        cmd: &qail_core::ast::Qail,
599        rls_sql: &str,
600    ) -> PgResult<Vec<crate::driver::PgRow>> {
601        self.fetch_all_with_rls_with_format(cmd, rls_sql, ResultFormat::Text)
602            .await
603    }
604
605    /// Execute a QAIL command with RLS context in a SINGLE roundtrip with explicit result format.
606    pub async fn fetch_all_with_rls_with_format(
607        &mut self,
608        cmd: &qail_core::ast::Qail,
609        rls_sql: &str,
610        result_format: ResultFormat,
611    ) -> PgResult<Vec<crate::driver::PgRow>> {
612        let mut retried = false;
613        loop {
614            match self
615                .fetch_all_with_rls_with_format_once(cmd, rls_sql, result_format)
616                .await
617            {
618                Ok(rows) => return Ok(rows),
619                Err(err)
620                    if !retried
621                        && (err.is_prepared_statement_retryable()
622                            || err.is_prepared_statement_already_exists()) =>
623                {
624                    retried = true;
625                    if let Some(conn) = self.conn.as_mut() {
626                        if err.is_prepared_statement_retryable() {
627                            conn.clear_prepared_statement_state();
628                        }
629                        // Always rollback transaction state before a retried RLS pipeline
630                        // attempt, including 42P05 "prepared statement already exists".
631                        let _ = conn.execute_simple("ROLLBACK").await;
632                    }
633                    self.rls_dirty = false;
634                }
635                Err(err) => return Err(err),
636            }
637        }
638    }
639
640    async fn fetch_all_with_rls_with_format_once(
641        &mut self,
642        cmd: &qail_core::ast::Qail,
643        rls_sql: &str,
644        result_format: ResultFormat,
645    ) -> PgResult<Vec<crate::driver::PgRow>> {
646        use crate::driver::ColumnInfo;
647        use std::collections::hash_map::DefaultHasher;
648        use std::hash::{Hash, Hasher};
649
650        let conn = self.conn.as_mut().ok_or_else(|| {
651            PgError::Connection("Connection already released back to pool".into())
652        })?;
653
654        conn.sql_buf.clear();
655        conn.params_buf.clear();
656
657        // Encode SQL + params to reusable buffers
658        match cmd.action {
659            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
660                crate::protocol::ast_encoder::dml::encode_select(
661                    cmd,
662                    &mut conn.sql_buf,
663                    &mut conn.params_buf,
664                )?;
665            }
666            qail_core::ast::Action::Add => {
667                crate::protocol::ast_encoder::dml::encode_insert(
668                    cmd,
669                    &mut conn.sql_buf,
670                    &mut conn.params_buf,
671                )?;
672            }
673            qail_core::ast::Action::Set => {
674                crate::protocol::ast_encoder::dml::encode_update(
675                    cmd,
676                    &mut conn.sql_buf,
677                    &mut conn.params_buf,
678                )?;
679            }
680            qail_core::ast::Action::Del => {
681                crate::protocol::ast_encoder::dml::encode_delete(
682                    cmd,
683                    &mut conn.sql_buf,
684                    &mut conn.params_buf,
685                )?;
686            }
687            _ => {
688                // Fallback: RLS setup must happen synchronously for unsupported actions
689                conn.execute_simple(rls_sql).await?;
690                self.rls_dirty = true;
691                return self
692                    .fetch_all_uncached_with_format(cmd, result_format)
693                    .await;
694            }
695        }
696
697        let mut hasher = DefaultHasher::new();
698        conn.sql_buf.hash(&mut hasher);
699        let sql_hash = hasher.finish();
700
701        let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
702
703        conn.write_buf.clear();
704
705        // ── Prepend RLS Simple Query message ─────────────────────────
706        // NOTE: this is PostgreSQL SimpleQuery text, so the backend still
707        // parses this segment on every request. The optimization here is
708        // batching RLS + query protocol messages into one network flush.
709        let rls_msg = crate::protocol::PgEncoder::try_encode_query_string(rls_sql)?;
710        conn.write_buf.extend_from_slice(&rls_msg);
711
712        // ── Then append the query messages (same as fetch_all_cached) ──
713        let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
714            name
715        } else {
716            let name = format!("qail_{:x}", sql_hash);
717
718            conn.evict_prepared_if_full();
719
720            let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
721
722            use crate::protocol::PgEncoder;
723            let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
724            let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
725            conn.write_buf.extend_from_slice(&parse_msg);
726            conn.write_buf.extend_from_slice(&describe_msg);
727
728            conn.stmt_cache.put(sql_hash, name.clone());
729            conn.prepared_statements
730                .insert(name.clone(), sql_str.to_string());
731
732            if let Ok(mut hot) = self.pool.hot_statements.write()
733                && hot.len() < MAX_HOT_STATEMENTS
734            {
735                hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
736            }
737
738            name
739        };
740
741        use crate::protocol::PgEncoder;
742        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
743            &mut conn.write_buf,
744            &stmt_name,
745            &conn.params_buf,
746            result_format.as_wire_code(),
747        ) {
748            rollback_cache_miss_statement_registration(conn, is_cache_miss, sql_hash, &stmt_name);
749            return Err(PgError::Encode(e.to_string()));
750        }
751        PgEncoder::encode_execute_to(&mut conn.write_buf);
752        PgEncoder::encode_sync_to(&mut conn.write_buf);
753
754        // ── Single write_all for RLS + Query ────────────────────────
755        if let Err(err) = conn.flush_write_buf().await {
756            rollback_cache_miss_statement_registration(conn, is_cache_miss, sql_hash, &stmt_name);
757            return Err(err);
758        }
759
760        // Mark connection as RLS-dirty (needs COMMIT on release)
761        self.rls_dirty = true;
762
763        // ── Phase 1: Consume Simple Query responses (RLS setup) ─────
764        // Simple Query produces: CommandComplete × N, then ReadyForQuery.
765        // set_config results and BEGIN/SET LOCAL responses are all here.
766        let mut rls_error: Option<PgError> = None;
767        loop {
768            let msg = match conn.recv().await {
769                Ok(msg) => msg,
770                Err(err) => {
771                    rollback_cache_miss_statement_registration(
772                        conn,
773                        is_cache_miss,
774                        sql_hash,
775                        &stmt_name,
776                    );
777                    return Err(err);
778                }
779            };
780            match msg {
781                crate::protocol::BackendMessage::ReadyForQuery(_) => {
782                    // RLS setup done — break to Extended Query phase
783                    if let Some(err) = rls_error {
784                        rollback_cache_miss_statement_registration(
785                            conn,
786                            is_cache_miss,
787                            sql_hash,
788                            &stmt_name,
789                        );
790                        if let Err(drain_err) =
791                            drain_extended_responses_after_rls_setup_error(conn).await
792                        {
793                            tracing::warn!(
794                                error = %drain_err,
795                                "failed to drain pipelined extended responses after RLS setup error"
796                            );
797                        }
798                        return Err(err);
799                    }
800                    break;
801                }
802                crate::protocol::BackendMessage::ErrorResponse(err) => {
803                    if rls_error.is_none() {
804                        rls_error = Some(PgError::QueryServer(err.into()));
805                    }
806                }
807                // CommandComplete, DataRow (from set_config), RowDescription — ignore
808                crate::protocol::BackendMessage::CommandComplete(_)
809                | crate::protocol::BackendMessage::DataRow(_)
810                | crate::protocol::BackendMessage::RowDescription(_)
811                | crate::protocol::BackendMessage::ParseComplete
812                | crate::protocol::BackendMessage::BindComplete => {}
813                msg if is_ignorable_session_message(&msg) => {}
814                other => return Err(unexpected_backend_message("pool rls setup", &other)),
815            }
816        }
817
818        // ── Phase 2: Consume Extended Query responses (actual data) ──
819        let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
820
821        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
822        let mut column_info: Option<std::sync::Arc<ColumnInfo>> = cached_column_info;
823        let mut error: Option<PgError> = None;
824        let mut flow = ExtendedFlowTracker::new(
825            ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
826        );
827
828        loop {
829            let msg = match conn.recv().await {
830                Ok(msg) => msg,
831                Err(err) => {
832                    if is_cache_miss && !flow.saw_parse_complete() {
833                        rollback_cache_miss_statement_registration(
834                            conn,
835                            is_cache_miss,
836                            sql_hash,
837                            &stmt_name,
838                        );
839                    }
840                    return Err(err);
841                }
842            };
843            if let Err(err) =
844                flow.validate(&msg, "pool fetch_all_with_rls execute", error.is_some())
845            {
846                if is_cache_miss && !flow.saw_parse_complete() {
847                    rollback_cache_miss_statement_registration(
848                        conn,
849                        is_cache_miss,
850                        sql_hash,
851                        &stmt_name,
852                    );
853                }
854                return Err(err);
855            }
856            match msg {
857                crate::protocol::BackendMessage::ParseComplete => {}
858                crate::protocol::BackendMessage::BindComplete => {}
859                crate::protocol::BackendMessage::ParameterDescription(_) => {}
860                crate::protocol::BackendMessage::RowDescription(fields) => {
861                    let info = std::sync::Arc::new(ColumnInfo::from_fields(&fields));
862                    if is_cache_miss {
863                        conn.column_info_cache.insert(sql_hash, info.clone());
864                    }
865                    column_info = Some(info);
866                }
867                crate::protocol::BackendMessage::DataRow(data) => {
868                    if error.is_none() {
869                        rows.push(crate::driver::PgRow {
870                            columns: data,
871                            column_info: column_info.clone(),
872                        });
873                    }
874                }
875                crate::protocol::BackendMessage::CommandComplete(_) => {}
876                crate::protocol::BackendMessage::ReadyForQuery(_) => {
877                    if let Some(err) = error {
878                        if is_cache_miss
879                            && !flow.saw_parse_complete()
880                            && !err.is_prepared_statement_already_exists()
881                        {
882                            rollback_cache_miss_statement_registration(
883                                conn,
884                                is_cache_miss,
885                                sql_hash,
886                                &stmt_name,
887                            );
888                        }
889                        return Err(err);
890                    }
891                    if is_cache_miss && !flow.saw_parse_complete() {
892                        rollback_cache_miss_statement_registration(
893                            conn,
894                            is_cache_miss,
895                            sql_hash,
896                            &stmt_name,
897                        );
898                        return Err(PgError::Protocol(
899                            "Cache miss query reached ReadyForQuery without ParseComplete"
900                                .to_string(),
901                        ));
902                    }
903                    return Ok(rows);
904                }
905                crate::protocol::BackendMessage::ErrorResponse(err) => {
906                    if error.is_none() {
907                        error = Some(PgError::QueryServer(err.into()));
908                    }
909                }
910                msg if is_ignorable_session_message(&msg) => {}
911                other => {
912                    if is_cache_miss && !flow.saw_parse_complete() {
913                        rollback_cache_miss_statement_registration(
914                            conn,
915                            is_cache_miss,
916                            sql_hash,
917                            &stmt_name,
918                        );
919                    }
920                    return Err(unexpected_backend_message(
921                        "pool fetch_all_with_rls execute",
922                        &other,
923                    ));
924                }
925            }
926        }
927    }
928}