Skip to main content

qail_pg/driver/pool/
fetch.rs

1//! Fetch methods for PooledConnection: uncached, fast, cached, typed, and pipelined-RLS variants.
2
3use super::connection::PooledConnection;
4use super::lifecycle::MAX_HOT_STATEMENTS;
5use crate::driver::{
6    PgError, PgResult, ResultFormat,
7    extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
8    is_ignorable_session_message, unexpected_backend_message,
9};
10use std::sync::Arc;
11
12impl PooledConnection {
13    /// Execute a QAIL command and fetch all rows (UNCACHED).
14    /// Returns rows with column metadata for JSON serialization.
15    pub async fn fetch_all_uncached(
16        &mut self,
17        cmd: &qail_core::ast::Qail,
18    ) -> PgResult<Vec<crate::driver::PgRow>> {
19        self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
20            .await
21    }
22
23    /// Execute raw SQL with bind parameters and return raw row data.
24    ///
25    /// Uses the Extended Query Protocol so parameters are never interpolated
26    /// into the SQL string. Intended for EXPLAIN or other SQL that can't be
27    /// represented as a `Qail` AST but still needs parameterized execution.
28    ///
29    /// Returns raw column bytes; callers must decode as needed.
30    pub async fn query_raw_with_params(
31        &mut self,
32        sql: &str,
33        params: &[Option<Vec<u8>>],
34    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
35        let conn = self.conn_mut()?;
36        conn.query(sql, params).await
37    }
38
39    /// Export data using AST-native COPY TO STDOUT and collect parsed rows.
40    pub async fn copy_export(&mut self, cmd: &qail_core::ast::Qail) -> PgResult<Vec<Vec<String>>> {
41        self.conn_mut()?.copy_export(cmd).await
42    }
43
44    /// Stream AST-native COPY TO STDOUT chunks with bounded memory usage.
45    pub async fn copy_export_stream_raw<F, Fut>(
46        &mut self,
47        cmd: &qail_core::ast::Qail,
48        on_chunk: F,
49    ) -> PgResult<()>
50    where
51        F: FnMut(Vec<u8>) -> Fut,
52        Fut: std::future::Future<Output = PgResult<()>>,
53    {
54        self.conn_mut()?.copy_export_stream_raw(cmd, on_chunk).await
55    }
56
57    /// Stream AST-native COPY TO STDOUT rows with bounded memory usage.
58    pub async fn copy_export_stream_rows<F>(
59        &mut self,
60        cmd: &qail_core::ast::Qail,
61        on_row: F,
62    ) -> PgResult<()>
63    where
64        F: FnMut(Vec<String>) -> PgResult<()>,
65    {
66        self.conn_mut()?.copy_export_stream_rows(cmd, on_row).await
67    }
68
69    /// Export a table using COPY TO STDOUT and collect raw bytes.
70    pub async fn copy_export_table(
71        &mut self,
72        table: &str,
73        columns: &[String],
74    ) -> PgResult<Vec<u8>> {
75        let quote_ident = |ident: &str| -> String {
76            format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
77        };
78        let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
79        let sql = format!(
80            "COPY {} ({}) TO STDOUT",
81            quote_ident(table),
82            cols.join(", ")
83        );
84        self.conn_mut()?.copy_out_raw(&sql).await
85    }
86
87    /// Stream a table export using COPY TO STDOUT with bounded memory usage.
88    pub async fn copy_export_table_stream<F, Fut>(
89        &mut self,
90        table: &str,
91        columns: &[String],
92        on_chunk: F,
93    ) -> PgResult<()>
94    where
95        F: FnMut(Vec<u8>) -> Fut,
96        Fut: std::future::Future<Output = PgResult<()>>,
97    {
98        let quote_ident = |ident: &str| -> String {
99            format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
100        };
101        let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
102        let sql = format!(
103            "COPY {} ({}) TO STDOUT",
104            quote_ident(table),
105            cols.join(", ")
106        );
107        self.conn_mut()?.copy_out_raw_stream(&sql, on_chunk).await
108    }
109
110    /// Execute a QAIL command and fetch all rows (UNCACHED) with explicit result format.
111    pub async fn fetch_all_uncached_with_format(
112        &mut self,
113        cmd: &qail_core::ast::Qail,
114        result_format: ResultFormat,
115    ) -> PgResult<Vec<crate::driver::PgRow>> {
116        use crate::driver::ColumnInfo;
117        use crate::protocol::AstEncoder;
118
119        let conn = self.conn_mut()?;
120
121        AstEncoder::encode_cmd_reuse_into_with_result_format(
122            cmd,
123            &mut conn.sql_buf,
124            &mut conn.params_buf,
125            &mut conn.write_buf,
126            result_format.as_wire_code(),
127        )
128        .map_err(|e| PgError::Encode(e.to_string()))?;
129
130        conn.flush_write_buf().await?;
131
132        let mut rows: Vec<crate::driver::PgRow> = Vec::new();
133        let mut column_info: Option<Arc<ColumnInfo>> = None;
134        let mut error: Option<PgError> = None;
135        let mut flow =
136            ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
137
138        loop {
139            let msg = conn.recv().await?;
140            flow.validate(&msg, "pool fetch_all execute", error.is_some())?;
141            match msg {
142                crate::protocol::BackendMessage::ParseComplete
143                | crate::protocol::BackendMessage::BindComplete => {}
144                crate::protocol::BackendMessage::RowDescription(fields) => {
145                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
146                }
147                crate::protocol::BackendMessage::DataRow(data) => {
148                    if error.is_none() {
149                        rows.push(crate::driver::PgRow {
150                            columns: data,
151                            column_info: column_info.clone(),
152                        });
153                    }
154                }
155                crate::protocol::BackendMessage::NoData => {}
156                crate::protocol::BackendMessage::CommandComplete(_) => {}
157                crate::protocol::BackendMessage::ReadyForQuery(_) => {
158                    if let Some(err) = error {
159                        return Err(err);
160                    }
161                    return Ok(rows);
162                }
163                crate::protocol::BackendMessage::ErrorResponse(err) => {
164                    if error.is_none() {
165                        error = Some(PgError::QueryServer(err.into()));
166                    }
167                }
168                msg if is_ignorable_session_message(&msg) => {}
169                other => {
170                    return Err(unexpected_backend_message("pool fetch_all execute", &other));
171                }
172            }
173        }
174    }
175
176    /// Execute a QAIL command and fetch all rows (FAST VERSION).
177    /// Uses native AST-to-wire encoding and optimized recv_with_data_fast.
178    /// Skips column metadata for maximum speed.
179    pub async fn fetch_all_fast(
180        &mut self,
181        cmd: &qail_core::ast::Qail,
182    ) -> PgResult<Vec<crate::driver::PgRow>> {
183        self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
184            .await
185    }
186
187    /// Execute a QAIL command and fetch all rows (FAST VERSION) with explicit result format.
188    pub async fn fetch_all_fast_with_format(
189        &mut self,
190        cmd: &qail_core::ast::Qail,
191        result_format: ResultFormat,
192    ) -> PgResult<Vec<crate::driver::PgRow>> {
193        use crate::protocol::AstEncoder;
194
195        let conn = self.conn_mut()?;
196
197        AstEncoder::encode_cmd_reuse_into_with_result_format(
198            cmd,
199            &mut conn.sql_buf,
200            &mut conn.params_buf,
201            &mut conn.write_buf,
202            result_format.as_wire_code(),
203        )
204        .map_err(|e| PgError::Encode(e.to_string()))?;
205
206        conn.flush_write_buf().await?;
207
208        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
209        let mut error: Option<PgError> = None;
210        let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
211
212        loop {
213            let res = conn.recv_with_data_fast().await;
214            match res {
215                Ok((msg_type, data)) => {
216                    flow.validate_msg_type(
217                        msg_type,
218                        "pool fetch_all_fast execute",
219                        error.is_some(),
220                    )?;
221                    match msg_type {
222                        b'D' => {
223                            if error.is_none()
224                                && let Some(columns) = data
225                            {
226                                rows.push(crate::driver::PgRow {
227                                    columns,
228                                    column_info: None,
229                                });
230                            }
231                        }
232                        b'Z' => {
233                            if let Some(err) = error {
234                                return Err(err);
235                            }
236                            return Ok(rows);
237                        }
238                        _ => {}
239                    }
240                }
241                Err(e) => {
242                    if matches!(&e, PgError::QueryServer(_)) {
243                        if error.is_none() {
244                            error = Some(e);
245                        }
246                        continue;
247                    }
248                    return Err(e);
249                }
250            }
251        }
252    }
253
254    /// Execute a QAIL command and fetch all rows (CACHED).
255    /// Uses prepared statement caching: Parse+Describe on first call,
256    /// then Bind+Execute only on subsequent calls with the same SQL shape.
257    /// This matches PostgREST's behavior for fair benchmarks.
258    pub async fn fetch_all_cached(
259        &mut self,
260        cmd: &qail_core::ast::Qail,
261    ) -> PgResult<Vec<crate::driver::PgRow>> {
262        self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
263            .await
264    }
265
266    /// Execute a QAIL command and fetch all rows (CACHED) with explicit result format.
267    pub async fn fetch_all_cached_with_format(
268        &mut self,
269        cmd: &qail_core::ast::Qail,
270        result_format: ResultFormat,
271    ) -> PgResult<Vec<crate::driver::PgRow>> {
272        let mut retried = false;
273        loop {
274            match self
275                .fetch_all_cached_with_format_once(cmd, result_format)
276                .await
277            {
278                Ok(rows) => return Ok(rows),
279                Err(err)
280                    if !retried
281                        && (err.is_prepared_statement_retryable()
282                            || err.is_prepared_statement_already_exists()) =>
283                {
284                    retried = true;
285                    if err.is_prepared_statement_retryable()
286                        && let Some(conn) = self.conn.as_mut()
287                    {
288                        conn.clear_prepared_statement_state();
289                    }
290                }
291                Err(err) => return Err(err),
292            }
293        }
294    }
295
296    /// Execute a QAIL command and decode rows into typed structs (CACHED, text format).
297    pub async fn fetch_typed<T: crate::driver::row::QailRow>(
298        &mut self,
299        cmd: &qail_core::ast::Qail,
300    ) -> PgResult<Vec<T>> {
301        self.fetch_typed_with_format(cmd, ResultFormat::Text).await
302    }
303
304    /// Execute a QAIL command and decode rows into typed structs with explicit result format.
305    ///
306    /// Use [`ResultFormat::Binary`] for binary wire values; row decoders should use
307    /// metadata-aware helpers like `PgRow::try_get()` / `try_get_by_name()`.
308    pub async fn fetch_typed_with_format<T: crate::driver::row::QailRow>(
309        &mut self,
310        cmd: &qail_core::ast::Qail,
311        result_format: ResultFormat,
312    ) -> PgResult<Vec<T>> {
313        let rows = self
314            .fetch_all_cached_with_format(cmd, result_format)
315            .await?;
316        Ok(rows.iter().map(T::from_row).collect())
317    }
318
319    /// Execute a QAIL command and decode one typed row (CACHED, text format).
320    pub async fn fetch_one_typed<T: crate::driver::row::QailRow>(
321        &mut self,
322        cmd: &qail_core::ast::Qail,
323    ) -> PgResult<Option<T>> {
324        self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
325            .await
326    }
327
328    /// Execute a QAIL command and decode one typed row with explicit result format.
329    pub async fn fetch_one_typed_with_format<T: crate::driver::row::QailRow>(
330        &mut self,
331        cmd: &qail_core::ast::Qail,
332        result_format: ResultFormat,
333    ) -> PgResult<Option<T>> {
334        let rows = self
335            .fetch_all_cached_with_format(cmd, result_format)
336            .await?;
337        Ok(rows.first().map(T::from_row))
338    }
339
340    async fn fetch_all_cached_with_format_once(
341        &mut self,
342        cmd: &qail_core::ast::Qail,
343        result_format: ResultFormat,
344    ) -> PgResult<Vec<crate::driver::PgRow>> {
345        use crate::driver::ColumnInfo;
346        use std::collections::hash_map::DefaultHasher;
347        use std::hash::{Hash, Hasher};
348
349        let conn = self.conn.as_mut().ok_or_else(|| {
350            PgError::Connection("Connection already released back to pool".into())
351        })?;
352
353        conn.sql_buf.clear();
354        conn.params_buf.clear();
355
356        // Encode SQL + params to reusable buffers
357        match cmd.action {
358            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
359                crate::protocol::ast_encoder::dml::encode_select(
360                    cmd,
361                    &mut conn.sql_buf,
362                    &mut conn.params_buf,
363                )?;
364            }
365            qail_core::ast::Action::Add => {
366                crate::protocol::ast_encoder::dml::encode_insert(
367                    cmd,
368                    &mut conn.sql_buf,
369                    &mut conn.params_buf,
370                )?;
371            }
372            qail_core::ast::Action::Set => {
373                crate::protocol::ast_encoder::dml::encode_update(
374                    cmd,
375                    &mut conn.sql_buf,
376                    &mut conn.params_buf,
377                )?;
378            }
379            qail_core::ast::Action::Del => {
380                crate::protocol::ast_encoder::dml::encode_delete(
381                    cmd,
382                    &mut conn.sql_buf,
383                    &mut conn.params_buf,
384                )?;
385            }
386            _ => {
387                // Fallback: unsupported actions go through uncached path
388                return self
389                    .fetch_all_uncached_with_format(cmd, result_format)
390                    .await;
391            }
392        }
393
394        let mut hasher = DefaultHasher::new();
395        conn.sql_buf.hash(&mut hasher);
396        let sql_hash = hasher.finish();
397
398        let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
399
400        conn.write_buf.clear();
401
402        let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
403            name
404        } else {
405            let name = format!("qail_{:x}", sql_hash);
406
407            conn.evict_prepared_if_full();
408
409            let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
410
411            use crate::protocol::PgEncoder;
412            let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
413            let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
414            conn.write_buf.extend_from_slice(&parse_msg);
415            conn.write_buf.extend_from_slice(&describe_msg);
416
417            conn.stmt_cache.put(sql_hash, name.clone());
418            conn.prepared_statements
419                .insert(name.clone(), sql_str.to_string());
420
421            // Register in global hot-statement registry for cross-connection sharing
422            if let Ok(mut hot) = self.pool.hot_statements.write()
423                && hot.len() < MAX_HOT_STATEMENTS
424            {
425                hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
426            }
427
428            name
429        };
430
431        use crate::protocol::PgEncoder;
432        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
433            &mut conn.write_buf,
434            &stmt_name,
435            &conn.params_buf,
436            result_format.as_wire_code(),
437        ) {
438            if is_cache_miss {
439                conn.stmt_cache.remove(&sql_hash);
440                conn.prepared_statements.remove(&stmt_name);
441                conn.column_info_cache.remove(&sql_hash);
442            }
443            return Err(PgError::Encode(e.to_string()));
444        }
445        PgEncoder::encode_execute_to(&mut conn.write_buf);
446        PgEncoder::encode_sync_to(&mut conn.write_buf);
447
448        if let Err(err) = conn.flush_write_buf().await {
449            if is_cache_miss {
450                conn.stmt_cache.remove(&sql_hash);
451                conn.prepared_statements.remove(&stmt_name);
452                conn.column_info_cache.remove(&sql_hash);
453            }
454            return Err(err);
455        }
456
457        let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
458
459        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
460        let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
461        let mut error: Option<PgError> = None;
462        let mut flow = ExtendedFlowTracker::new(
463            ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
464        );
465
466        loop {
467            let msg = match conn.recv().await {
468                Ok(msg) => msg,
469                Err(err) => {
470                    if is_cache_miss && !flow.saw_parse_complete() {
471                        conn.stmt_cache.remove(&sql_hash);
472                        conn.prepared_statements.remove(&stmt_name);
473                        conn.column_info_cache.remove(&sql_hash);
474                    }
475                    return Err(err);
476                }
477            };
478            if let Err(err) = flow.validate(&msg, "pool fetch_all_cached execute", error.is_some())
479            {
480                if is_cache_miss && !flow.saw_parse_complete() {
481                    conn.stmt_cache.remove(&sql_hash);
482                    conn.prepared_statements.remove(&stmt_name);
483                    conn.column_info_cache.remove(&sql_hash);
484                }
485                return Err(err);
486            }
487            match msg {
488                crate::protocol::BackendMessage::ParseComplete => {}
489                crate::protocol::BackendMessage::BindComplete => {}
490                crate::protocol::BackendMessage::ParameterDescription(_) => {}
491                crate::protocol::BackendMessage::RowDescription(fields) => {
492                    let info = Arc::new(ColumnInfo::from_fields(&fields));
493                    if is_cache_miss {
494                        conn.column_info_cache.insert(sql_hash, info.clone());
495                    }
496                    column_info = Some(info);
497                }
498                crate::protocol::BackendMessage::DataRow(data) => {
499                    if error.is_none() {
500                        rows.push(crate::driver::PgRow {
501                            columns: data,
502                            column_info: column_info.clone(),
503                        });
504                    }
505                }
506                crate::protocol::BackendMessage::CommandComplete(_) => {}
507                crate::protocol::BackendMessage::ReadyForQuery(_) => {
508                    if let Some(err) = error {
509                        if is_cache_miss
510                            && !flow.saw_parse_complete()
511                            && !err.is_prepared_statement_already_exists()
512                        {
513                            conn.stmt_cache.remove(&sql_hash);
514                            conn.prepared_statements.remove(&stmt_name);
515                            conn.column_info_cache.remove(&sql_hash);
516                        }
517                        return Err(err);
518                    }
519                    if is_cache_miss && !flow.saw_parse_complete() {
520                        conn.stmt_cache.remove(&sql_hash);
521                        conn.prepared_statements.remove(&stmt_name);
522                        conn.column_info_cache.remove(&sql_hash);
523                        return Err(PgError::Protocol(
524                            "Cache miss query reached ReadyForQuery without ParseComplete"
525                                .to_string(),
526                        ));
527                    }
528                    return Ok(rows);
529                }
530                crate::protocol::BackendMessage::ErrorResponse(err) => {
531                    if error.is_none() {
532                        error = Some(PgError::QueryServer(err.into()));
533                    }
534                }
535                msg if is_ignorable_session_message(&msg) => {}
536                other => {
537                    if is_cache_miss && !flow.saw_parse_complete() {
538                        conn.stmt_cache.remove(&sql_hash);
539                        conn.prepared_statements.remove(&stmt_name);
540                        conn.column_info_cache.remove(&sql_hash);
541                    }
542                    return Err(unexpected_backend_message(
543                        "pool fetch_all_cached execute",
544                        &other,
545                    ));
546                }
547            }
548        }
549    }
550
551    /// Execute a QAIL command with RLS context in a SINGLE roundtrip.
552    ///
553    /// Pipelines the RLS setup (BEGIN + set_config) and the query
554    /// (Parse/Bind/Execute/Sync) into one `write_all` syscall.
555    /// PG processes messages in order, so the BEGIN + set_config
556    /// completes before the query executes — security is preserved.
557    ///
558    /// Wire layout:
559    /// ```text
560    /// [SimpleQuery: "BEGIN; SET LOCAL...; SELECT set_config(...)"]
561    /// [Parse (if cache miss)]
562    /// [Describe (if cache miss)]
563    /// [Bind]
564    /// [Execute]
565    /// [Sync]
566    /// ```
567    ///
568    /// Response processing: consume 2× ReadyForQuery (SimpleQuery + Sync).
569    pub async fn fetch_all_with_rls(
570        &mut self,
571        cmd: &qail_core::ast::Qail,
572        rls_sql: &str,
573    ) -> PgResult<Vec<crate::driver::PgRow>> {
574        self.fetch_all_with_rls_with_format(cmd, rls_sql, ResultFormat::Text)
575            .await
576    }
577
578    /// Execute a QAIL command with RLS context in a SINGLE roundtrip with explicit result format.
579    pub async fn fetch_all_with_rls_with_format(
580        &mut self,
581        cmd: &qail_core::ast::Qail,
582        rls_sql: &str,
583        result_format: ResultFormat,
584    ) -> PgResult<Vec<crate::driver::PgRow>> {
585        let mut retried = false;
586        loop {
587            match self
588                .fetch_all_with_rls_with_format_once(cmd, rls_sql, result_format)
589                .await
590            {
591                Ok(rows) => return Ok(rows),
592                Err(err)
593                    if !retried
594                        && (err.is_prepared_statement_retryable()
595                            || err.is_prepared_statement_already_exists()) =>
596                {
597                    retried = true;
598                    if err.is_prepared_statement_retryable()
599                        && let Some(conn) = self.conn.as_mut()
600                    {
601                        conn.clear_prepared_statement_state();
602                        let _ = conn.execute_simple("ROLLBACK").await;
603                    }
604                    self.rls_dirty = false;
605                }
606                Err(err) => return Err(err),
607            }
608        }
609    }
610
611    async fn fetch_all_with_rls_with_format_once(
612        &mut self,
613        cmd: &qail_core::ast::Qail,
614        rls_sql: &str,
615        result_format: ResultFormat,
616    ) -> PgResult<Vec<crate::driver::PgRow>> {
617        use crate::driver::ColumnInfo;
618        use std::collections::hash_map::DefaultHasher;
619        use std::hash::{Hash, Hasher};
620
621        let conn = self.conn.as_mut().ok_or_else(|| {
622            PgError::Connection("Connection already released back to pool".into())
623        })?;
624
625        conn.sql_buf.clear();
626        conn.params_buf.clear();
627
628        // Encode SQL + params to reusable buffers
629        if cmd.is_raw_sql() {
630            // Raw SQL pass-through: write verbatim, RLS context already set above
631            conn.sql_buf.clear();
632            conn.params_buf.clear();
633            conn.sql_buf.extend_from_slice(cmd.table.as_bytes());
634        } else {
635            match cmd.action {
636                qail_core::ast::Action::Get | qail_core::ast::Action::With => {
637                    crate::protocol::ast_encoder::dml::encode_select(
638                        cmd,
639                        &mut conn.sql_buf,
640                        &mut conn.params_buf,
641                    )?;
642                }
643                qail_core::ast::Action::Add => {
644                    crate::protocol::ast_encoder::dml::encode_insert(
645                        cmd,
646                        &mut conn.sql_buf,
647                        &mut conn.params_buf,
648                    )?;
649                }
650                qail_core::ast::Action::Set => {
651                    crate::protocol::ast_encoder::dml::encode_update(
652                        cmd,
653                        &mut conn.sql_buf,
654                        &mut conn.params_buf,
655                    )?;
656                }
657                qail_core::ast::Action::Del => {
658                    crate::protocol::ast_encoder::dml::encode_delete(
659                        cmd,
660                        &mut conn.sql_buf,
661                        &mut conn.params_buf,
662                    )?;
663                }
664                _ => {
665                    // Fallback: RLS setup must happen synchronously for unsupported actions
666                    conn.execute_simple(rls_sql).await?;
667                    self.rls_dirty = true;
668                    return self
669                        .fetch_all_uncached_with_format(cmd, result_format)
670                        .await;
671                }
672            }
673        }
674
675        let mut hasher = DefaultHasher::new();
676        conn.sql_buf.hash(&mut hasher);
677        let sql_hash = hasher.finish();
678
679        let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
680
681        conn.write_buf.clear();
682
683        // ── Prepend RLS Simple Query message ─────────────────────────
684        // This is the key optimization: RLS setup bytes go first in the
685        // same buffer as the query messages.
686        let rls_msg = crate::protocol::PgEncoder::try_encode_query_string(rls_sql)?;
687        conn.write_buf.extend_from_slice(&rls_msg);
688
689        // ── Then append the query messages (same as fetch_all_cached) ──
690        let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
691            name
692        } else {
693            let name = format!("qail_{:x}", sql_hash);
694
695            conn.evict_prepared_if_full();
696
697            let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
698
699            use crate::protocol::PgEncoder;
700            let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
701            let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
702            conn.write_buf.extend_from_slice(&parse_msg);
703            conn.write_buf.extend_from_slice(&describe_msg);
704
705            conn.stmt_cache.put(sql_hash, name.clone());
706            conn.prepared_statements
707                .insert(name.clone(), sql_str.to_string());
708
709            if let Ok(mut hot) = self.pool.hot_statements.write()
710                && hot.len() < MAX_HOT_STATEMENTS
711            {
712                hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
713            }
714
715            name
716        };
717
718        use crate::protocol::PgEncoder;
719        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
720            &mut conn.write_buf,
721            &stmt_name,
722            &conn.params_buf,
723            result_format.as_wire_code(),
724        ) {
725            if is_cache_miss {
726                conn.stmt_cache.remove(&sql_hash);
727                conn.prepared_statements.remove(&stmt_name);
728                conn.column_info_cache.remove(&sql_hash);
729            }
730            return Err(PgError::Encode(e.to_string()));
731        }
732        PgEncoder::encode_execute_to(&mut conn.write_buf);
733        PgEncoder::encode_sync_to(&mut conn.write_buf);
734
735        // ── Single write_all for RLS + Query ────────────────────────
736        if let Err(err) = conn.flush_write_buf().await {
737            if is_cache_miss {
738                conn.stmt_cache.remove(&sql_hash);
739                conn.prepared_statements.remove(&stmt_name);
740                conn.column_info_cache.remove(&sql_hash);
741            }
742            return Err(err);
743        }
744
745        // Mark connection as RLS-dirty (needs COMMIT on release)
746        self.rls_dirty = true;
747
748        // ── Phase 1: Consume Simple Query responses (RLS setup) ─────
749        // Simple Query produces: CommandComplete × N, then ReadyForQuery.
750        // set_config results and BEGIN/SET LOCAL responses are all here.
751        let mut rls_error: Option<PgError> = None;
752        loop {
753            let msg = match conn.recv().await {
754                Ok(msg) => msg,
755                Err(err) => {
756                    if is_cache_miss {
757                        conn.stmt_cache.remove(&sql_hash);
758                        conn.prepared_statements.remove(&stmt_name);
759                        conn.column_info_cache.remove(&sql_hash);
760                    }
761                    return Err(err);
762                }
763            };
764            match msg {
765                crate::protocol::BackendMessage::ReadyForQuery(_) => {
766                    // RLS setup done — break to Extended Query phase
767                    if let Some(err) = rls_error {
768                        return Err(err);
769                    }
770                    break;
771                }
772                crate::protocol::BackendMessage::ErrorResponse(err) => {
773                    if rls_error.is_none() {
774                        rls_error = Some(PgError::QueryServer(err.into()));
775                    }
776                }
777                // CommandComplete, DataRow (from set_config), RowDescription — ignore
778                crate::protocol::BackendMessage::CommandComplete(_)
779                | crate::protocol::BackendMessage::DataRow(_)
780                | crate::protocol::BackendMessage::RowDescription(_)
781                | crate::protocol::BackendMessage::ParseComplete
782                | crate::protocol::BackendMessage::BindComplete => {}
783                msg if is_ignorable_session_message(&msg) => {}
784                other => return Err(unexpected_backend_message("pool rls setup", &other)),
785            }
786        }
787
788        // ── Phase 2: Consume Extended Query responses (actual data) ──
789        let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
790
791        let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
792        let mut column_info: Option<std::sync::Arc<ColumnInfo>> = cached_column_info;
793        let mut error: Option<PgError> = None;
794        let mut flow = ExtendedFlowTracker::new(
795            ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
796        );
797
798        loop {
799            let msg = match conn.recv().await {
800                Ok(msg) => msg,
801                Err(err) => {
802                    if is_cache_miss && !flow.saw_parse_complete() {
803                        conn.stmt_cache.remove(&sql_hash);
804                        conn.prepared_statements.remove(&stmt_name);
805                        conn.column_info_cache.remove(&sql_hash);
806                    }
807                    return Err(err);
808                }
809            };
810            if let Err(err) =
811                flow.validate(&msg, "pool fetch_all_with_rls execute", error.is_some())
812            {
813                if is_cache_miss && !flow.saw_parse_complete() {
814                    conn.stmt_cache.remove(&sql_hash);
815                    conn.prepared_statements.remove(&stmt_name);
816                    conn.column_info_cache.remove(&sql_hash);
817                }
818                return Err(err);
819            }
820            match msg {
821                crate::protocol::BackendMessage::ParseComplete => {}
822                crate::protocol::BackendMessage::BindComplete => {}
823                crate::protocol::BackendMessage::ParameterDescription(_) => {}
824                crate::protocol::BackendMessage::RowDescription(fields) => {
825                    let info = std::sync::Arc::new(ColumnInfo::from_fields(&fields));
826                    if is_cache_miss {
827                        conn.column_info_cache.insert(sql_hash, info.clone());
828                    }
829                    column_info = Some(info);
830                }
831                crate::protocol::BackendMessage::DataRow(data) => {
832                    if error.is_none() {
833                        rows.push(crate::driver::PgRow {
834                            columns: data,
835                            column_info: column_info.clone(),
836                        });
837                    }
838                }
839                crate::protocol::BackendMessage::CommandComplete(_) => {}
840                crate::protocol::BackendMessage::ReadyForQuery(_) => {
841                    if let Some(err) = error {
842                        if is_cache_miss
843                            && !flow.saw_parse_complete()
844                            && !err.is_prepared_statement_already_exists()
845                        {
846                            conn.stmt_cache.remove(&sql_hash);
847                            conn.prepared_statements.remove(&stmt_name);
848                            conn.column_info_cache.remove(&sql_hash);
849                        }
850                        return Err(err);
851                    }
852                    if is_cache_miss && !flow.saw_parse_complete() {
853                        conn.stmt_cache.remove(&sql_hash);
854                        conn.prepared_statements.remove(&stmt_name);
855                        conn.column_info_cache.remove(&sql_hash);
856                        return Err(PgError::Protocol(
857                            "Cache miss query reached ReadyForQuery without ParseComplete"
858                                .to_string(),
859                        ));
860                    }
861                    return Ok(rows);
862                }
863                crate::protocol::BackendMessage::ErrorResponse(err) => {
864                    if error.is_none() {
865                        error = Some(PgError::QueryServer(err.into()));
866                    }
867                }
868                msg if is_ignorable_session_message(&msg) => {}
869                other => {
870                    if is_cache_miss && !flow.saw_parse_complete() {
871                        conn.stmt_cache.remove(&sql_hash);
872                        conn.prepared_statements.remove(&stmt_name);
873                        conn.column_info_cache.remove(&sql_hash);
874                    }
875                    return Err(unexpected_backend_message(
876                        "pool fetch_all_with_rls execute",
877                        &other,
878                    ));
879                }
880            }
881        }
882    }
883}