Skip to main content

qail_pg/driver/
ops.rs

1//! PgDriver operations: transaction control, batch execution, statement timeout,
2//! RLS context, pipeline, COPY bulk/export, and cursor streaming.
3
4use super::core::PgDriver;
5use super::prepared::PreparedStatement;
6use super::rls;
7use super::types::*;
8use qail_core::ast::Qail;
9
10impl PgDriver {
11    // ==================== RAW SQL COMPAT ====================
12
13    /// Execute raw SQL using PostgreSQL Simple Query protocol.
14    ///
15    /// Returns number of rows returned by the statement (or `0` for statements
16    /// that do not produce rows).
17    ///
18    /// This compatibility API exists for legacy tests/examples; prefer AST APIs
19    /// (`execute`, `fetch_all`) for application code.
20    pub async fn execute_raw(&mut self, sql: &str) -> PgResult<u64> {
21        let rows = self.connection.simple_query(sql).await?;
22        Ok(rows.len() as u64)
23    }
24
25    /// Execute raw SQL using PostgreSQL Simple Query protocol and return rows.
26    ///
27    /// This compatibility API exists for legacy tests/examples; prefer AST APIs
28    /// (`execute`, `fetch_all`) for application code.
29    pub async fn fetch_raw(&mut self, sql: &str) -> PgResult<Vec<PgRow>> {
30        self.connection.simple_query(sql).await
31    }
32
33    // ==================== TRANSACTION CONTROL ====================
34
35    /// Begin a transaction (AST-native).
36    pub async fn begin(&mut self) -> PgResult<()> {
37        self.connection.begin_transaction().await
38    }
39
40    /// Commit the current transaction (AST-native).
41    pub async fn commit(&mut self) -> PgResult<()> {
42        self.connection.commit().await
43    }
44
45    /// Rollback the current transaction (AST-native).
46    pub async fn rollback(&mut self) -> PgResult<()> {
47        self.connection.rollback().await
48    }
49
50    /// Create a named savepoint within the current transaction.
51    /// Savepoints allow partial rollback within a transaction.
52    /// Use `rollback_to()` to return to this savepoint.
53    /// # Example
54    /// ```ignore
55    /// driver.begin().await?;
56    /// driver.execute(&insert1).await?;
57    /// driver.savepoint("sp1").await?;
58    /// driver.execute(&insert2).await?;
59    /// driver.rollback_to("sp1").await?; // Undo insert2, keep insert1
60    /// driver.commit().await?;
61    /// ```
62    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
63        self.connection.savepoint(name).await
64    }
65
66    /// Rollback to a previously created savepoint.
67    /// Discards all changes since the named savepoint was created,
68    /// but keeps the transaction open.
69    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
70        self.connection.rollback_to(name).await
71    }
72
73    /// Release a savepoint (free resources, if no longer needed).
74    /// After release, the savepoint cannot be rolled back to.
75    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
76        self.connection.release_savepoint(name).await
77    }
78
79    // ==================== BATCH TRANSACTIONS ====================
80
81    /// Execute multiple commands in a single atomic transaction.
82    /// All commands succeed or all are rolled back.
83    /// # Example
84    /// ```ignore
85    /// let cmds = vec![
86    ///     Qail::add("users").columns(["name"]).values(["Alice"]),
87    ///     Qail::add("users").columns(["name"]).values(["Bob"]),
88    /// ];
89    /// let results = driver.execute_batch(&cmds).await?;
90    /// // results = [1, 1] (rows affected)
91    /// ```
92    pub async fn execute_batch(&mut self, cmds: &[Qail]) -> PgResult<Vec<u64>> {
93        self.begin().await?;
94        let mut results = Vec::with_capacity(cmds.len());
95        for cmd in cmds {
96            match self.execute(cmd).await {
97                Ok(n) => results.push(n),
98                Err(e) => {
99                    self.rollback().await?;
100                    return Err(e);
101                }
102            }
103        }
104        self.commit().await?;
105        Ok(results)
106    }
107
108    // ==================== STATEMENT TIMEOUT ====================
109
110    /// Set statement timeout for this connection (in milliseconds).
111    /// # Example
112    /// ```ignore
113    /// driver.set_statement_timeout(30_000).await?; // 30 seconds
114    /// ```
115    pub async fn set_statement_timeout(&mut self, ms: u32) -> PgResult<()> {
116        let cmd = Qail::session_set("statement_timeout", ms.to_string());
117        self.execute(&cmd).await.map(|_| ())
118    }
119
120    /// Reset statement timeout to default (no limit).
121    pub async fn reset_statement_timeout(&mut self) -> PgResult<()> {
122        let cmd = Qail::session_reset("statement_timeout");
123        self.execute(&cmd).await.map(|_| ())
124    }
125
126    // ==================== RLS (MULTI-TENANT) ====================
127
128    /// Set the RLS context for multi-tenant data isolation.
129    ///
130    /// Configures PostgreSQL session variables (`app.current_tenant_id`, etc.)
131    /// so that RLS policies automatically filter data by tenant.
132    ///
133    /// Since `PgDriver` takes `&mut self`, the borrow checker guarantees
134    /// that `set_config` and all subsequent queries execute on the **same
135    /// connection** — no pool race conditions possible.
136    ///
137    /// # Example
138    /// ```ignore
139    /// driver.set_rls_context(RlsContext::tenant("tenant-123")).await?;
140    /// let orders = driver.fetch_all(&Qail::get("orders")).await?;
141    /// // orders only contains rows for tenant-123
142    /// ```
143    pub async fn set_rls_context(&mut self, ctx: rls::RlsContext) -> PgResult<()> {
144        let sql = rls::context_to_sql(&ctx);
145        if sql.as_bytes().contains(&0) {
146            return Err(crate::PgError::Protocol(
147                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
148            ));
149        }
150        self.connection.execute_simple(&sql).await?;
151        self.rls_context = Some(ctx);
152        Ok(())
153    }
154
155    /// Clear the RLS context, resetting session variables to safe defaults.
156    ///
157    /// After clearing, all RLS-protected queries will return zero rows
158    /// (empty tenant scope matches nothing).
159    pub async fn clear_rls_context(&mut self) -> PgResult<()> {
160        let sql = rls::reset_sql();
161        if sql.as_bytes().contains(&0) {
162            return Err(crate::PgError::Protocol(
163                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
164            ));
165        }
166        self.connection.execute_simple(sql).await?;
167        self.rls_context = None;
168        Ok(())
169    }
170
171    /// Get the current RLS context, if any.
172    pub fn rls_context(&self) -> Option<&rls::RlsContext> {
173        self.rls_context.as_ref()
174    }
175
176    // ==================== PIPELINE (BATCH) ====================
177
178    /// Execute multiple Qail ASTs in a single network round-trip (PIPELINING).
179    /// # Example
180    /// ```ignore
181    /// let cmds: Vec<Qail> = (1..=1000)
182    ///     .map(|i| Qail::get("harbors").columns(["id", "name"]).limit(i))
183    ///     .collect();
184    /// let count = driver.pipeline_batch(&cmds).await?;
185    /// assert_eq!(count, 1000);
186    /// ```
187    pub async fn pipeline_batch(&mut self, cmds: &[Qail]) -> PgResult<usize> {
188        self.connection.pipeline_ast_fast(cmds).await
189    }
190
191    /// Execute multiple Qail ASTs and return full row data.
192    pub async fn pipeline_fetch(&mut self, cmds: &[Qail]) -> PgResult<Vec<Vec<PgRow>>> {
193        let raw_results = self.connection.pipeline_ast(cmds).await?;
194
195        let results: Vec<Vec<PgRow>> = raw_results
196            .into_iter()
197            .map(|rows| {
198                rows.into_iter()
199                    .map(|columns| PgRow {
200                        columns,
201                        column_info: None,
202                    })
203                    .collect()
204            })
205            .collect();
206
207        Ok(results)
208    }
209
210    /// Prepare a SQL statement for repeated execution.
211    pub async fn prepare(&mut self, sql: &str) -> PgResult<PreparedStatement> {
212        self.connection.prepare(sql).await
213    }
214
215    /// Execute a prepared statement pipeline in FAST mode (count only).
216    pub async fn pipeline_prepared_fast(
217        &mut self,
218        stmt: &PreparedStatement,
219        params_batch: &[Vec<Option<Vec<u8>>>],
220    ) -> PgResult<usize> {
221        self.connection
222            .pipeline_prepared_fast(stmt, params_batch)
223            .await
224    }
225
226    /// Bulk insert data using PostgreSQL COPY protocol (AST-native).
227    /// Uses a Qail::Add to get validated table and column names from the AST,
228    /// not user-provided strings. This is the sound, AST-native approach.
229    /// # Example
230    /// ```ignore
231    /// // Create a Qail::Add to define table and columns
232    /// let cmd = Qail::add("users")
233    ///     .columns(["id", "name", "email"]);
234    /// // Bulk insert rows
235    /// let rows: Vec<Vec<Value>> = vec![
236    ///     vec![Value::Int(1), Value::String("Alice"), Value::String("alice@ex.com")],
237    ///     vec![Value::Int(2), Value::String("Bob"), Value::String("bob@ex.com")],
238    /// ];
239    /// driver.copy_bulk(&cmd, &rows).await?;
240    /// ```
241    pub async fn copy_bulk(
242        &mut self,
243        cmd: &Qail,
244        rows: &[Vec<qail_core::ast::Value>],
245    ) -> PgResult<u64> {
246        use qail_core::ast::Action;
247
248        if cmd.action != Action::Add {
249            return Err(PgError::Query(
250                "copy_bulk requires Qail::Add action".to_string(),
251            ));
252        }
253
254        let table = &cmd.table;
255
256        let columns: Vec<String> = cmd
257            .columns
258            .iter()
259            .filter_map(|expr| {
260                use qail_core::ast::Expr;
261                match expr {
262                    Expr::Named(name) => Some(name.clone()),
263                    Expr::Aliased { name, .. } => Some(name.clone()),
264                    Expr::Star => None, // Can't COPY with *
265                    _ => None,
266                }
267            })
268            .collect();
269
270        if columns.is_empty() {
271            return Err(PgError::Query(
272                "copy_bulk requires columns in Qail".to_string(),
273            ));
274        }
275
276        // Use optimized COPY path: direct Value → bytes encoding, single syscall
277        self.connection.copy_in_fast(table, &columns, rows).await
278    }
279
280    /// **Fastest** bulk insert using pre-encoded COPY data.
281    /// Accepts raw COPY text format bytes. Use when caller has already
282    /// encoded rows to avoid any encoding overhead.
283    /// # Format
284    /// Data should be tab-separated rows with newlines (COPY text format):
285    /// `1\thello\t3.14\n2\tworld\t2.71\n`
286    /// # Example
287    /// ```ignore
288    /// let cmd = Qail::add("users").columns(["id", "name"]);
289    /// let data = b"1\tAlice\n2\tBob\n";
290    /// driver.copy_bulk_bytes(&cmd, data).await?;
291    /// ```
292    pub async fn copy_bulk_bytes(&mut self, cmd: &Qail, data: &[u8]) -> PgResult<u64> {
293        use qail_core::ast::Action;
294
295        if cmd.action != Action::Add {
296            return Err(PgError::Query(
297                "copy_bulk_bytes requires Qail::Add action".to_string(),
298            ));
299        }
300
301        let table = &cmd.table;
302        let columns: Vec<String> = cmd
303            .columns
304            .iter()
305            .filter_map(|expr| {
306                use qail_core::ast::Expr;
307                match expr {
308                    Expr::Named(name) => Some(name.clone()),
309                    Expr::Aliased { name, .. } => Some(name.clone()),
310                    _ => None,
311                }
312            })
313            .collect();
314
315        if columns.is_empty() {
316            return Err(PgError::Query(
317                "copy_bulk_bytes requires columns in Qail".to_string(),
318            ));
319        }
320
321        // Direct to raw COPY - zero encoding!
322        self.connection.copy_in_raw(table, &columns, data).await
323    }
324
325    /// Export table data using PostgreSQL COPY TO STDOUT (zero-copy streaming).
326    /// Returns rows as tab-separated bytes for direct re-import via copy_bulk_bytes.
327    /// # Example
328    /// ```ignore
329    /// let data = driver.copy_export_table("users", &["id", "name"]).await?;
330    /// shadow_driver.copy_bulk_bytes(&cmd, &data).await?;
331    /// ```
332    pub async fn copy_export_table(
333        &mut self,
334        table: &str,
335        columns: &[String],
336    ) -> PgResult<Vec<u8>> {
337        let quote_ident = |ident: &str| -> String {
338            format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
339        };
340        let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
341        let sql = format!(
342            "COPY {} ({}) TO STDOUT",
343            quote_ident(table),
344            cols.join(", ")
345        );
346
347        self.connection.copy_out_raw(&sql).await
348    }
349
350    /// Stream table export using COPY TO STDOUT with bounded memory usage.
351    ///
352    /// Chunks are forwarded directly from PostgreSQL to `on_chunk`.
353    pub async fn copy_export_table_stream<F, Fut>(
354        &mut self,
355        table: &str,
356        columns: &[String],
357        on_chunk: F,
358    ) -> PgResult<()>
359    where
360        F: FnMut(Vec<u8>) -> Fut,
361        Fut: std::future::Future<Output = PgResult<()>>,
362    {
363        let quote_ident = |ident: &str| -> String {
364            format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
365        };
366        let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
367        let sql = format!(
368            "COPY {} ({}) TO STDOUT",
369            quote_ident(table),
370            cols.join(", ")
371        );
372        self.connection.copy_out_raw_stream(&sql, on_chunk).await
373    }
374
375    /// Stream an AST-native `Qail::Export` command as raw COPY chunks.
376    pub async fn copy_export_cmd_stream<F, Fut>(&mut self, cmd: &Qail, on_chunk: F) -> PgResult<()>
377    where
378        F: FnMut(Vec<u8>) -> Fut,
379        Fut: std::future::Future<Output = PgResult<()>>,
380    {
381        self.connection.copy_export_stream_raw(cmd, on_chunk).await
382    }
383
384    /// Stream an AST-native `Qail::Export` command as parsed text rows.
385    pub async fn copy_export_cmd_stream_rows<F>(&mut self, cmd: &Qail, on_row: F) -> PgResult<()>
386    where
387        F: FnMut(Vec<String>) -> PgResult<()>,
388    {
389        self.connection.copy_export_stream_rows(cmd, on_row).await
390    }
391
392    /// Stream large result sets using PostgreSQL cursors.
393    /// This method uses DECLARE CURSOR internally to stream rows in batches,
394    /// avoiding loading the entire result set into memory.
395    /// # Example
396    /// ```ignore
397    /// let cmd = Qail::get("large_table");
398    /// let batches = driver.stream_cmd(&cmd, 100).await?;
399    /// for batch in batches {
400    ///     for row in batch {
401    ///         // process row
402    ///     }
403    /// }
404    /// ```
405    pub async fn stream_cmd(&mut self, cmd: &Qail, batch_size: usize) -> PgResult<Vec<Vec<PgRow>>> {
406        use std::sync::atomic::{AtomicU64, Ordering};
407        static CURSOR_ID: AtomicU64 = AtomicU64::new(0);
408
409        let cursor_name = format!("qail_cursor_{}", CURSOR_ID.fetch_add(1, Ordering::SeqCst));
410
411        // AST-NATIVE: Generate SQL directly from AST (no to_sql_parameterized!)
412        use crate::protocol::AstEncoder;
413        let mut sql_buf = bytes::BytesMut::with_capacity(256);
414        let mut params: Vec<Option<Vec<u8>>> = Vec::new();
415        AstEncoder::encode_select_sql(cmd, &mut sql_buf, &mut params)
416            .map_err(|e| PgError::Encode(e.to_string()))?;
417        let sql = String::from_utf8_lossy(&sql_buf).to_string();
418
419        // Must be in a transaction for cursors
420        self.connection.begin_transaction().await?;
421
422        // Declare cursor
423        // Declare cursor with bind params — Extended Query Protocol handles $1, $2 etc.
424        self.connection
425            .declare_cursor(&cursor_name, &sql, &params)
426            .await?;
427
428        // Fetch all batches
429        let mut all_batches = Vec::new();
430        while let Some(rows) = self
431            .connection
432            .fetch_cursor(&cursor_name, batch_size)
433            .await?
434        {
435            let pg_rows: Vec<PgRow> = rows
436                .into_iter()
437                .map(|cols| PgRow {
438                    columns: cols,
439                    column_info: None,
440                })
441                .collect();
442            all_batches.push(pg_rows);
443        }
444
445        self.connection.close_cursor(&cursor_name).await?;
446        self.connection.commit().await?;
447
448        Ok(all_batches)
449    }
450}