Skip to main content

qail_pg/driver/
fetch.rs

1//! PgDriver fetch methods: fetch_all (cached/uncached/fast), fetch_typed,
2//! fetch_one, execute, and query_ast.
3
4use super::core::PgDriver;
5use super::types::*;
6use qail_core::ast::Qail;
7use std::sync::Arc;
8
9impl PgDriver {
10    /// Execute a QAIL command and fetch all rows (CACHED + ZERO-ALLOC).
11    /// **Default method** - uses prepared statement caching for best performance.
12    /// On first call: sends Parse + Bind + Execute + Sync
13    /// On subsequent calls with same SQL: sends only Bind + Execute (SKIPS Parse!)
14    /// Uses LRU cache with max 1000 statements (auto-evicts oldest).
15    pub async fn fetch_all(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
16        self.fetch_all_with_format(cmd, ResultFormat::Text).await
17    }
18
19    /// Execute a QAIL command and fetch all rows using a specific result format.
20    ///
21    /// `result_format` controls server result-column encoding:
22    /// - [`ResultFormat::Text`] for standard text decoding.
23    /// - [`ResultFormat::Binary`] for binary wire values.
24    pub async fn fetch_all_with_format(
25        &mut self,
26        cmd: &Qail,
27        result_format: ResultFormat,
28    ) -> PgResult<Vec<PgRow>> {
29        // Delegate to cached-by-default behavior.
30        self.fetch_all_cached_with_format(cmd, result_format).await
31    }
32
33    /// Execute a QAIL command and fetch all rows as a typed struct (text format).
34    /// Requires the target type to implement `QailRow` trait.
35    ///
36    /// # Example
37    /// ```ignore
38    /// let users: Vec<User> = driver.fetch_typed::<User>(&query).await?;
39    /// ```
40    pub async fn fetch_typed<T: super::row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Vec<T>> {
41        self.fetch_typed_with_format(cmd, ResultFormat::Text).await
42    }
43
44    /// Execute a QAIL command and fetch all rows as a typed struct with explicit result format.
45    ///
46    /// Use [`ResultFormat::Binary`] to get binary wire values; row decoding should use
47    /// metadata-aware accessors such as `PgRow::try_get()` / `try_get_by_name()`.
48    pub async fn fetch_typed_with_format<T: super::row::QailRow>(
49        &mut self,
50        cmd: &Qail,
51        result_format: ResultFormat,
52    ) -> PgResult<Vec<T>> {
53        let rows = self.fetch_all_with_format(cmd, result_format).await?;
54        Ok(rows.iter().map(T::from_row).collect())
55    }
56
57    /// Execute a QAIL command and fetch a single row as a typed struct (text format).
58    /// Returns None if no rows are returned.
59    pub async fn fetch_one_typed<T: super::row::QailRow>(
60        &mut self,
61        cmd: &Qail,
62    ) -> PgResult<Option<T>> {
63        self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
64            .await
65    }
66
67    /// Execute a QAIL command and fetch a single row as a typed struct with explicit result format.
68    pub async fn fetch_one_typed_with_format<T: super::row::QailRow>(
69        &mut self,
70        cmd: &Qail,
71        result_format: ResultFormat,
72    ) -> PgResult<Option<T>> {
73        let rows = self.fetch_all_with_format(cmd, result_format).await?;
74        Ok(rows.first().map(T::from_row))
75    }
76
77    /// Execute a QAIL command and fetch all rows (UNCACHED).
78    /// Sends Parse + Bind + Execute on every call.
79    /// Use for one-off queries or when caching is not desired.
80    ///
81    /// Optimized: encodes wire bytes into reusable write_buf (zero-alloc).
82    pub async fn fetch_all_uncached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
83        self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
84            .await
85    }
86
87    /// Execute a QAIL command and fetch all rows (UNCACHED) with explicit result format.
88    pub async fn fetch_all_uncached_with_format(
89        &mut self,
90        cmd: &Qail,
91        result_format: ResultFormat,
92    ) -> PgResult<Vec<PgRow>> {
93        use crate::protocol::AstEncoder;
94
95        AstEncoder::encode_cmd_reuse_into_with_result_format(
96            cmd,
97            &mut self.connection.sql_buf,
98            &mut self.connection.params_buf,
99            &mut self.connection.write_buf,
100            result_format.as_wire_code(),
101        )
102        .map_err(|e| PgError::Encode(e.to_string()))?;
103
104        self.connection.flush_write_buf().await?;
105
106        let mut rows: Vec<PgRow> = Vec::with_capacity(32);
107        let mut column_info: Option<Arc<ColumnInfo>> = None;
108
109        let mut error: Option<PgError> = None;
110        let mut flow = super::extended_flow::ExtendedFlowTracker::new(
111            super::extended_flow::ExtendedFlowConfig::parse_bind_describe_portal_execute(),
112        );
113
114        loop {
115            let msg = self.connection.recv().await?;
116            flow.validate(&msg, "driver fetch_all execute", error.is_some())?;
117            match msg {
118                crate::protocol::BackendMessage::ParseComplete
119                | crate::protocol::BackendMessage::BindComplete => {}
120                crate::protocol::BackendMessage::RowDescription(fields) => {
121                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
122                }
123                crate::protocol::BackendMessage::DataRow(data) => {
124                    if error.is_none() {
125                        rows.push(PgRow {
126                            columns: data,
127                            column_info: column_info.clone(),
128                        });
129                    }
130                }
131                crate::protocol::BackendMessage::NoData => {}
132                crate::protocol::BackendMessage::CommandComplete(_) => {}
133                crate::protocol::BackendMessage::ReadyForQuery(_) => {
134                    if let Some(err) = error {
135                        return Err(err);
136                    }
137                    return Ok(rows);
138                }
139                crate::protocol::BackendMessage::ErrorResponse(err) => {
140                    if error.is_none() {
141                        error = Some(PgError::QueryServer(err.into()));
142                    }
143                }
144                msg if is_ignorable_session_message(&msg) => {}
145                other => {
146                    return Err(unexpected_backend_message(
147                        "driver fetch_all execute",
148                        &other,
149                    ));
150                }
151            }
152        }
153    }
154
155    /// Execute a QAIL command and fetch all rows (FAST VERSION).
156    /// Uses optimized recv_with_data_fast for faster response parsing.
157    /// Skips column metadata collection for maximum speed.
158    pub async fn fetch_all_fast(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
159        self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
160            .await
161    }
162
163    /// Execute a QAIL command and fetch all rows (FAST VERSION) with explicit result format.
164    pub async fn fetch_all_fast_with_format(
165        &mut self,
166        cmd: &Qail,
167        result_format: ResultFormat,
168    ) -> PgResult<Vec<PgRow>> {
169        use crate::protocol::AstEncoder;
170
171        AstEncoder::encode_cmd_reuse_into_with_result_format(
172            cmd,
173            &mut self.connection.sql_buf,
174            &mut self.connection.params_buf,
175            &mut self.connection.write_buf,
176            result_format.as_wire_code(),
177        )
178        .map_err(|e| PgError::Encode(e.to_string()))?;
179
180        self.connection.flush_write_buf().await?;
181
182        // Collect results using FAST receiver
183        let mut rows: Vec<PgRow> = Vec::with_capacity(32);
184        let mut error: Option<PgError> = None;
185        let mut flow = super::extended_flow::ExtendedFlowTracker::new(
186            super::extended_flow::ExtendedFlowConfig::parse_bind_execute(true),
187        );
188
189        loop {
190            let res = self.connection.recv_with_data_fast().await;
191            match res {
192                Ok((msg_type, data)) => {
193                    flow.validate_msg_type(
194                        msg_type,
195                        "driver fetch_all_fast execute",
196                        error.is_some(),
197                    )?;
198                    match msg_type {
199                        b'D' => {
200                            if error.is_none()
201                                && let Some(columns) = data
202                            {
203                                rows.push(PgRow {
204                                    columns,
205                                    column_info: None,
206                                });
207                            }
208                        }
209                        b'Z' => {
210                            if let Some(err) = error {
211                                return Err(err);
212                            }
213                            return Ok(rows);
214                        }
215                        _ => {}
216                    }
217                }
218                Err(e) => {
219                    // QueryServer means backend sent ErrorResponse; keep draining to ReadyForQuery.
220                    if matches!(&e, PgError::QueryServer(_)) {
221                        if error.is_none() {
222                            error = Some(e);
223                        }
224                        continue;
225                    }
226                    return Err(e);
227                }
228            }
229        }
230    }
231
232    /// Execute a QAIL command and fetch one row.
233    pub async fn fetch_one(&mut self, cmd: &Qail) -> PgResult<PgRow> {
234        let rows = self.fetch_all(cmd).await?;
235        rows.into_iter().next().ok_or(PgError::NoRows)
236    }
237
238    /// Execute a QAIL command with PREPARED STATEMENT CACHING.
239    /// Like fetch_all(), but caches the prepared statement on the server.
240    /// On first call: sends Parse + Describe + Bind + Execute + Sync
241    /// On subsequent calls: sends only Bind + Execute + Sync (SKIPS Parse!)
242    /// Column metadata (RowDescription) is cached alongside the statement
243    /// so that by-name column access works on every call.
244    ///
245    /// Optimized: all wire messages are batched into a single write_all syscall.
246    pub async fn fetch_all_cached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
247        self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
248            .await
249    }
250
251    /// Execute a QAIL command with prepared statement caching and explicit result format.
252    pub async fn fetch_all_cached_with_format(
253        &mut self,
254        cmd: &Qail,
255        result_format: ResultFormat,
256    ) -> PgResult<Vec<PgRow>> {
257        let mut retried = false;
258        loop {
259            match self
260                .fetch_all_cached_with_format_once(cmd, result_format)
261                .await
262            {
263                Ok(rows) => return Ok(rows),
264                Err(err)
265                    if !retried
266                        && (err.is_prepared_statement_retryable()
267                            || err.is_prepared_statement_already_exists()) =>
268                {
269                    retried = true;
270                    if err.is_prepared_statement_retryable() {
271                        self.connection.clear_prepared_statement_state();
272                    }
273                }
274                Err(err) => return Err(err),
275            }
276        }
277    }
278
279    async fn fetch_all_cached_with_format_once(
280        &mut self,
281        cmd: &Qail,
282        result_format: ResultFormat,
283    ) -> PgResult<Vec<PgRow>> {
284        use crate::protocol::AstEncoder;
285        use std::collections::hash_map::DefaultHasher;
286        use std::hash::{Hash, Hasher};
287
288        self.connection.sql_buf.clear();
289        self.connection.params_buf.clear();
290
291        // Encode SQL to reusable buffer
292        match cmd.action {
293            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
294                crate::protocol::ast_encoder::dml::encode_select(
295                    cmd,
296                    &mut self.connection.sql_buf,
297                    &mut self.connection.params_buf,
298                )?;
299            }
300            qail_core::ast::Action::Add => {
301                crate::protocol::ast_encoder::dml::encode_insert(
302                    cmd,
303                    &mut self.connection.sql_buf,
304                    &mut self.connection.params_buf,
305                )?;
306            }
307            qail_core::ast::Action::Set => {
308                crate::protocol::ast_encoder::dml::encode_update(
309                    cmd,
310                    &mut self.connection.sql_buf,
311                    &mut self.connection.params_buf,
312                )?;
313            }
314            qail_core::ast::Action::Del => {
315                crate::protocol::ast_encoder::dml::encode_delete(
316                    cmd,
317                    &mut self.connection.sql_buf,
318                    &mut self.connection.params_buf,
319                )?;
320            }
321            _ => {
322                // Fallback for unsupported actions
323                let (sql, params) =
324                    AstEncoder::encode_cmd_sql(cmd).map_err(|e| PgError::Encode(e.to_string()))?;
325                let raw_rows = self
326                    .connection
327                    .query_cached_with_result_format(&sql, &params, result_format.as_wire_code())
328                    .await?;
329                return Ok(raw_rows
330                    .into_iter()
331                    .map(|data| PgRow {
332                        columns: data,
333                        column_info: None,
334                    })
335                    .collect());
336            }
337        }
338
339        let mut hasher = DefaultHasher::new();
340        self.connection.sql_buf.hash(&mut hasher);
341        let sql_hash = hasher.finish();
342
343        let is_cache_miss = !self.connection.stmt_cache.contains(&sql_hash);
344
345        // Build ALL wire messages into write_buf (single syscall)
346        self.connection.write_buf.clear();
347
348        let stmt_name = if let Some(name) = self.connection.stmt_cache.get(&sql_hash) {
349            name
350        } else {
351            let name = format!("qail_{:x}", sql_hash);
352
353            // Evict LRU before borrowing sql_buf to avoid borrow conflict
354            self.connection.evict_prepared_if_full();
355
356            let sql_str = std::str::from_utf8(&self.connection.sql_buf).unwrap_or("");
357
358            // Buffer Parse + Describe(Statement) for first call
359            use crate::protocol::PgEncoder;
360            let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
361            let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
362            self.connection.write_buf.extend_from_slice(&parse_msg);
363            self.connection.write_buf.extend_from_slice(&describe_msg);
364
365            self.connection.stmt_cache.put(sql_hash, name.clone());
366            self.connection
367                .prepared_statements
368                .insert(name.clone(), sql_str.to_string());
369
370            name
371        };
372
373        // Append Bind + Execute + Sync to same buffer
374        use crate::protocol::PgEncoder;
375        if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
376            &mut self.connection.write_buf,
377            &stmt_name,
378            &self.connection.params_buf,
379            result_format.as_wire_code(),
380        ) {
381            if is_cache_miss {
382                self.connection.stmt_cache.remove(&sql_hash);
383                self.connection.prepared_statements.remove(&stmt_name);
384                self.connection.column_info_cache.remove(&sql_hash);
385            }
386            return Err(PgError::Encode(e.to_string()));
387        }
388        PgEncoder::encode_execute_to(&mut self.connection.write_buf);
389        PgEncoder::encode_sync_to(&mut self.connection.write_buf);
390
391        // Single write_all syscall for all messages
392        if let Err(err) = self.connection.flush_write_buf().await {
393            if is_cache_miss {
394                self.connection.stmt_cache.remove(&sql_hash);
395                self.connection.prepared_statements.remove(&stmt_name);
396                self.connection.column_info_cache.remove(&sql_hash);
397            }
398            return Err(err);
399        }
400
401        // On cache hit, use the previously cached ColumnInfo
402        let cached_column_info = self.connection.column_info_cache.get(&sql_hash).cloned();
403
404        let mut rows: Vec<PgRow> = Vec::with_capacity(32);
405        let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
406        let mut error: Option<PgError> = None;
407        let mut flow = super::extended_flow::ExtendedFlowTracker::new(
408            super::extended_flow::ExtendedFlowConfig::parse_describe_statement_bind_execute(
409                is_cache_miss,
410            ),
411        );
412
413        loop {
414            let msg = match self.connection.recv().await {
415                Ok(msg) => msg,
416                Err(err) => {
417                    if is_cache_miss && !flow.saw_parse_complete() {
418                        self.connection.stmt_cache.remove(&sql_hash);
419                        self.connection.prepared_statements.remove(&stmt_name);
420                        self.connection.column_info_cache.remove(&sql_hash);
421                    }
422                    return Err(err);
423                }
424            };
425            if let Err(err) =
426                flow.validate(&msg, "driver fetch_all_cached execute", error.is_some())
427            {
428                if is_cache_miss && !flow.saw_parse_complete() {
429                    self.connection.stmt_cache.remove(&sql_hash);
430                    self.connection.prepared_statements.remove(&stmt_name);
431                    self.connection.column_info_cache.remove(&sql_hash);
432                }
433                return Err(err);
434            }
435            match msg {
436                crate::protocol::BackendMessage::ParseComplete => {}
437                crate::protocol::BackendMessage::BindComplete => {}
438                crate::protocol::BackendMessage::ParameterDescription(_) => {
439                    // Sent after Describe(Statement) — ignore
440                }
441                crate::protocol::BackendMessage::RowDescription(fields) => {
442                    // Received after Describe(Statement) on cache miss
443                    let info = Arc::new(ColumnInfo::from_fields(&fields));
444                    if is_cache_miss {
445                        self.connection
446                            .column_info_cache
447                            .insert(sql_hash, info.clone());
448                    }
449                    column_info = Some(info);
450                }
451                crate::protocol::BackendMessage::DataRow(data) => {
452                    if error.is_none() {
453                        rows.push(PgRow {
454                            columns: data,
455                            column_info: column_info.clone(),
456                        });
457                    }
458                }
459                crate::protocol::BackendMessage::CommandComplete(_) => {}
460                crate::protocol::BackendMessage::NoData => {
461                    // Sent by Describe for statements that return no data (e.g. pure UPDATE without RETURNING)
462                }
463                crate::protocol::BackendMessage::ReadyForQuery(_) => {
464                    if let Some(err) = error {
465                        if is_cache_miss
466                            && !flow.saw_parse_complete()
467                            && !err.is_prepared_statement_already_exists()
468                        {
469                            self.connection.stmt_cache.remove(&sql_hash);
470                            self.connection.prepared_statements.remove(&stmt_name);
471                            self.connection.column_info_cache.remove(&sql_hash);
472                        }
473                        return Err(err);
474                    }
475                    if is_cache_miss && !flow.saw_parse_complete() {
476                        self.connection.stmt_cache.remove(&sql_hash);
477                        self.connection.prepared_statements.remove(&stmt_name);
478                        self.connection.column_info_cache.remove(&sql_hash);
479                        return Err(PgError::Protocol(
480                            "Cache miss query reached ReadyForQuery without ParseComplete"
481                                .to_string(),
482                        ));
483                    }
484                    return Ok(rows);
485                }
486                crate::protocol::BackendMessage::ErrorResponse(err) => {
487                    if error.is_none() {
488                        let query_err = PgError::QueryServer(err.into());
489                        if query_err.is_prepared_statement_retryable() {
490                            self.connection.clear_prepared_statement_state();
491                        }
492                        error = Some(query_err);
493                    }
494                }
495                msg if is_ignorable_session_message(&msg) => {}
496                other => {
497                    if is_cache_miss && !flow.saw_parse_complete() {
498                        self.connection.stmt_cache.remove(&sql_hash);
499                        self.connection.prepared_statements.remove(&stmt_name);
500                        self.connection.column_info_cache.remove(&sql_hash);
501                    }
502                    return Err(unexpected_backend_message(
503                        "driver fetch_all_cached execute",
504                        &other,
505                    ));
506                }
507            }
508        }
509    }
510
511    /// Execute a QAIL command (for mutations) - ZERO-ALLOC.
512    pub async fn execute(&mut self, cmd: &Qail) -> PgResult<u64> {
513        use crate::protocol::AstEncoder;
514
515        let wire_bytes = AstEncoder::encode_cmd_reuse(
516            cmd,
517            &mut self.connection.sql_buf,
518            &mut self.connection.params_buf,
519        )
520        .map_err(|e| PgError::Encode(e.to_string()))?;
521
522        self.connection.send_bytes(&wire_bytes).await?;
523
524        let mut affected = 0u64;
525        let mut error: Option<PgError> = None;
526        let mut flow = super::extended_flow::ExtendedFlowTracker::new(
527            super::extended_flow::ExtendedFlowConfig::parse_bind_describe_portal_execute(),
528        );
529
530        loop {
531            let msg = self.connection.recv().await?;
532            flow.validate(&msg, "driver execute mutation", error.is_some())?;
533            match msg {
534                crate::protocol::BackendMessage::ParseComplete
535                | crate::protocol::BackendMessage::BindComplete => {}
536                crate::protocol::BackendMessage::RowDescription(_) => {}
537                crate::protocol::BackendMessage::DataRow(_) => {}
538                crate::protocol::BackendMessage::NoData => {}
539                crate::protocol::BackendMessage::CommandComplete(tag) => {
540                    if error.is_none()
541                        && let Some(n) = tag.split_whitespace().last()
542                    {
543                        affected = n.parse().unwrap_or(0);
544                    }
545                }
546                crate::protocol::BackendMessage::ReadyForQuery(_) => {
547                    if let Some(err) = error {
548                        return Err(err);
549                    }
550                    return Ok(affected);
551                }
552                crate::protocol::BackendMessage::ErrorResponse(err) => {
553                    if error.is_none() {
554                        error = Some(PgError::QueryServer(err.into()));
555                    }
556                }
557                msg if is_ignorable_session_message(&msg) => {}
558                other => {
559                    return Err(unexpected_backend_message(
560                        "driver execute mutation",
561                        &other,
562                    ));
563                }
564            }
565        }
566    }
567
568    /// Query a QAIL command and return rows (for SELECT/GET queries).
569    /// Like `execute()` but collects RowDescription + DataRow messages
570    /// instead of discarding them.
571    pub async fn query_ast(&mut self, cmd: &Qail) -> PgResult<QueryResult> {
572        self.query_ast_with_format(cmd, ResultFormat::Text).await
573    }
574
575    /// Query a QAIL command and return rows using an explicit result format.
576    pub async fn query_ast_with_format(
577        &mut self,
578        cmd: &Qail,
579        result_format: ResultFormat,
580    ) -> PgResult<QueryResult> {
581        use crate::protocol::AstEncoder;
582
583        let wire_bytes = AstEncoder::encode_cmd_reuse_with_result_format(
584            cmd,
585            &mut self.connection.sql_buf,
586            &mut self.connection.params_buf,
587            result_format.as_wire_code(),
588        )
589        .map_err(|e| PgError::Encode(e.to_string()))?;
590
591        self.connection.send_bytes(&wire_bytes).await?;
592
593        let mut columns: Vec<String> = Vec::new();
594        let mut rows: Vec<Vec<Option<String>>> = Vec::new();
595        let mut error: Option<PgError> = None;
596        let mut flow = super::extended_flow::ExtendedFlowTracker::new(
597            super::extended_flow::ExtendedFlowConfig::parse_bind_describe_portal_execute(),
598        );
599
600        loop {
601            let msg = self.connection.recv().await?;
602            flow.validate(&msg, "driver query_ast", error.is_some())?;
603            match msg {
604                crate::protocol::BackendMessage::ParseComplete
605                | crate::protocol::BackendMessage::BindComplete => {}
606                crate::protocol::BackendMessage::RowDescription(fields) => {
607                    columns = fields.into_iter().map(|f| f.name).collect();
608                }
609                crate::protocol::BackendMessage::DataRow(data) => {
610                    if error.is_none() {
611                        let row: Vec<Option<String>> = data
612                            .into_iter()
613                            .map(|col| col.map(|bytes| String::from_utf8_lossy(&bytes).to_string()))
614                            .collect();
615                        rows.push(row);
616                    }
617                }
618                crate::protocol::BackendMessage::CommandComplete(_) => {}
619                crate::protocol::BackendMessage::NoData => {}
620                crate::protocol::BackendMessage::ReadyForQuery(_) => {
621                    if let Some(err) = error {
622                        return Err(err);
623                    }
624                    return Ok(QueryResult { columns, rows });
625                }
626                crate::protocol::BackendMessage::ErrorResponse(err) => {
627                    if error.is_none() {
628                        error = Some(PgError::QueryServer(err.into()));
629                    }
630                }
631                msg if is_ignorable_session_message(&msg) => {}
632                other => return Err(unexpected_backend_message("driver query_ast", &other)),
633            }
634        }
635    }
636}