qail_pg/driver/ops.rs
1//! PgDriver operations: transaction control, batch execution, statement timeout,
2//! RLS context, pipeline, COPY bulk/export, and cursor streaming.
3
4use super::core::PgDriver;
5use super::prepared::PreparedStatement;
6use super::rls;
7use super::types::*;
8use qail_core::ast::Qail;
9
10impl PgDriver {
11 // ==================== RAW SQL COMPAT ====================
12
13 /// Execute raw SQL using PostgreSQL Simple Query protocol.
14 ///
15 /// Returns number of rows returned by the statement (or `0` for statements
16 /// that do not produce rows).
17 ///
18 /// This compatibility API exists for legacy tests/examples; prefer AST APIs
19 /// (`execute`, `fetch_all`) for application code.
20 pub async fn execute_raw(&mut self, sql: &str) -> PgResult<u64> {
21 let rows = self.connection.simple_query(sql).await?;
22 Ok(rows.len() as u64)
23 }
24
25 /// Execute raw SQL using PostgreSQL Simple Query protocol and return rows.
26 ///
27 /// This compatibility API exists for legacy tests/examples; prefer AST APIs
28 /// (`execute`, `fetch_all`) for application code.
29 pub async fn fetch_raw(&mut self, sql: &str) -> PgResult<Vec<PgRow>> {
30 self.connection.simple_query(sql).await
31 }
32
33 // ==================== TRANSACTION CONTROL ====================
34
35 /// Begin a transaction (AST-native).
36 pub async fn begin(&mut self) -> PgResult<()> {
37 self.connection.begin_transaction().await
38 }
39
40 /// Commit the current transaction (AST-native).
41 pub async fn commit(&mut self) -> PgResult<()> {
42 self.connection.commit().await
43 }
44
45 /// Rollback the current transaction (AST-native).
46 pub async fn rollback(&mut self) -> PgResult<()> {
47 self.connection.rollback().await
48 }
49
50 /// Create a named savepoint within the current transaction.
51 /// Savepoints allow partial rollback within a transaction.
52 /// Use `rollback_to()` to return to this savepoint.
53 /// # Example
54 /// ```ignore
55 /// driver.begin().await?;
56 /// driver.execute(&insert1).await?;
57 /// driver.savepoint("sp1").await?;
58 /// driver.execute(&insert2).await?;
59 /// driver.rollback_to("sp1").await?; // Undo insert2, keep insert1
60 /// driver.commit().await?;
61 /// ```
62 pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
63 self.connection.savepoint(name).await
64 }
65
66 /// Rollback to a previously created savepoint.
67 /// Discards all changes since the named savepoint was created,
68 /// but keeps the transaction open.
69 pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
70 self.connection.rollback_to(name).await
71 }
72
73 /// Release a savepoint (free resources, if no longer needed).
74 /// After release, the savepoint cannot be rolled back to.
75 pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
76 self.connection.release_savepoint(name).await
77 }
78
79 // ==================== BATCH TRANSACTIONS ====================
80
81 /// Execute multiple commands in a single atomic transaction.
82 /// All commands succeed or all are rolled back.
83 /// # Example
84 /// ```ignore
85 /// let cmds = vec![
86 /// Qail::add("users").columns(["name"]).values(["Alice"]),
87 /// Qail::add("users").columns(["name"]).values(["Bob"]),
88 /// ];
89 /// let results = driver.execute_batch(&cmds).await?;
90 /// // results = [1, 1] (rows affected)
91 /// ```
92 pub async fn execute_batch(&mut self, cmds: &[Qail]) -> PgResult<Vec<u64>> {
93 self.begin().await?;
94 let mut results = Vec::with_capacity(cmds.len());
95 for cmd in cmds {
96 match self.execute(cmd).await {
97 Ok(n) => results.push(n),
98 Err(e) => {
99 self.rollback().await?;
100 return Err(e);
101 }
102 }
103 }
104 self.commit().await?;
105 Ok(results)
106 }
107
108 // ==================== STATEMENT TIMEOUT ====================
109
110 /// Set statement timeout for this connection (in milliseconds).
111 /// # Example
112 /// ```ignore
113 /// driver.set_statement_timeout(30_000).await?; // 30 seconds
114 /// ```
115 pub async fn set_statement_timeout(&mut self, ms: u32) -> PgResult<()> {
116 let cmd = Qail::session_set("statement_timeout", ms.to_string());
117 self.execute(&cmd).await.map(|_| ())
118 }
119
120 /// Reset statement timeout to default (no limit).
121 pub async fn reset_statement_timeout(&mut self) -> PgResult<()> {
122 let cmd = Qail::session_reset("statement_timeout");
123 self.execute(&cmd).await.map(|_| ())
124 }
125
126 // ==================== RLS (MULTI-TENANT) ====================
127
128 /// Set the RLS context for multi-tenant data isolation.
129 ///
130 /// Configures PostgreSQL session variables (`app.current_tenant_id`, etc.)
131 /// so that RLS policies automatically filter data by tenant.
132 ///
133 /// Since `PgDriver` takes `&mut self`, the borrow checker guarantees
134 /// that `set_config` and all subsequent queries execute on the **same
135 /// connection** — no pool race conditions possible.
136 ///
137 /// # Example
138 /// ```ignore
139 /// driver.set_rls_context(RlsContext::tenant("tenant-123")).await?;
140 /// let orders = driver.fetch_all(&Qail::get("orders")).await?;
141 /// // orders only contains rows for tenant-123
142 /// ```
143 pub async fn set_rls_context(&mut self, ctx: rls::RlsContext) -> PgResult<()> {
144 let sql = rls::context_to_sql(&ctx);
145 if sql.as_bytes().contains(&0) {
146 return Err(crate::PgError::Protocol(
147 "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
148 ));
149 }
150 self.connection.execute_simple(&sql).await?;
151 self.rls_context = Some(ctx);
152 Ok(())
153 }
154
155 /// Clear the RLS context, resetting session variables to safe defaults.
156 ///
157 /// After clearing, all RLS-protected queries will return zero rows
158 /// (empty tenant scope matches nothing).
159 pub async fn clear_rls_context(&mut self) -> PgResult<()> {
160 let sql = rls::reset_sql();
161 if sql.as_bytes().contains(&0) {
162 return Err(crate::PgError::Protocol(
163 "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
164 ));
165 }
166 self.connection.execute_simple(sql).await?;
167 self.rls_context = None;
168 Ok(())
169 }
170
171 /// Get the current RLS context, if any.
172 pub fn rls_context(&self) -> Option<&rls::RlsContext> {
173 self.rls_context.as_ref()
174 }
175
176 // ==================== PIPELINE (BATCH) ====================
177
178 /// Execute multiple Qail ASTs in a single network round-trip (PIPELINING).
179 /// # Example
180 /// ```ignore
181 /// let cmds: Vec<Qail> = (1..=1000)
182 /// .map(|i| Qail::get("harbors").columns(["id", "name"]).limit(i))
183 /// .collect();
184 /// let count = driver.pipeline_batch(&cmds).await?;
185 /// assert_eq!(count, 1000);
186 /// ```
187 pub async fn pipeline_batch(&mut self, cmds: &[Qail]) -> PgResult<usize> {
188 self.connection.pipeline_ast_fast(cmds).await
189 }
190
191 /// Execute multiple Qail ASTs and return full row data.
192 pub async fn pipeline_fetch(&mut self, cmds: &[Qail]) -> PgResult<Vec<Vec<PgRow>>> {
193 let raw_results = self.connection.pipeline_ast(cmds).await?;
194
195 let results: Vec<Vec<PgRow>> = raw_results
196 .into_iter()
197 .map(|rows| {
198 rows.into_iter()
199 .map(|columns| PgRow {
200 columns,
201 column_info: None,
202 })
203 .collect()
204 })
205 .collect();
206
207 Ok(results)
208 }
209
210 /// Prepare a SQL statement for repeated execution.
211 pub async fn prepare(&mut self, sql: &str) -> PgResult<PreparedStatement> {
212 self.connection.prepare(sql).await
213 }
214
215 /// Execute a prepared statement pipeline in FAST mode (count only).
216 pub async fn pipeline_prepared_fast(
217 &mut self,
218 stmt: &PreparedStatement,
219 params_batch: &[Vec<Option<Vec<u8>>>],
220 ) -> PgResult<usize> {
221 self.connection
222 .pipeline_prepared_fast(stmt, params_batch)
223 .await
224 }
225
226 /// Bulk insert data using PostgreSQL COPY protocol (AST-native).
227 /// Uses a Qail::Add to get validated table and column names from the AST,
228 /// not user-provided strings. This is the sound, AST-native approach.
229 /// # Example
230 /// ```ignore
231 /// // Create a Qail::Add to define table and columns
232 /// let cmd = Qail::add("users")
233 /// .columns(["id", "name", "email"]);
234 /// // Bulk insert rows
235 /// let rows: Vec<Vec<Value>> = vec![
236 /// vec![Value::Int(1), Value::String("Alice"), Value::String("alice@ex.com")],
237 /// vec![Value::Int(2), Value::String("Bob"), Value::String("bob@ex.com")],
238 /// ];
239 /// driver.copy_bulk(&cmd, &rows).await?;
240 /// ```
241 pub async fn copy_bulk(
242 &mut self,
243 cmd: &Qail,
244 rows: &[Vec<qail_core::ast::Value>],
245 ) -> PgResult<u64> {
246 use qail_core::ast::Action;
247
248 if cmd.action != Action::Add {
249 return Err(PgError::Query(
250 "copy_bulk requires Qail::Add action".to_string(),
251 ));
252 }
253
254 let table = &cmd.table;
255
256 let columns: Vec<String> = cmd
257 .columns
258 .iter()
259 .filter_map(|expr| {
260 use qail_core::ast::Expr;
261 match expr {
262 Expr::Named(name) => Some(name.clone()),
263 Expr::Aliased { name, .. } => Some(name.clone()),
264 Expr::Star => None, // Can't COPY with *
265 _ => None,
266 }
267 })
268 .collect();
269
270 if columns.is_empty() {
271 return Err(PgError::Query(
272 "copy_bulk requires columns in Qail".to_string(),
273 ));
274 }
275
276 // Use optimized COPY path: direct Value → bytes encoding, single syscall
277 self.connection.copy_in_fast(table, &columns, rows).await
278 }
279
280 /// **Fastest** bulk insert using pre-encoded COPY data.
281 /// Accepts raw COPY text format bytes. Use when caller has already
282 /// encoded rows to avoid any encoding overhead.
283 /// # Format
284 /// Data should be tab-separated rows with newlines (COPY text format):
285 /// `1\thello\t3.14\n2\tworld\t2.71\n`
286 /// # Example
287 /// ```ignore
288 /// let cmd = Qail::add("users").columns(["id", "name"]);
289 /// let data = b"1\tAlice\n2\tBob\n";
290 /// driver.copy_bulk_bytes(&cmd, data).await?;
291 /// ```
292 pub async fn copy_bulk_bytes(&mut self, cmd: &Qail, data: &[u8]) -> PgResult<u64> {
293 use qail_core::ast::Action;
294
295 if cmd.action != Action::Add {
296 return Err(PgError::Query(
297 "copy_bulk_bytes requires Qail::Add action".to_string(),
298 ));
299 }
300
301 let table = &cmd.table;
302 let columns: Vec<String> = cmd
303 .columns
304 .iter()
305 .filter_map(|expr| {
306 use qail_core::ast::Expr;
307 match expr {
308 Expr::Named(name) => Some(name.clone()),
309 Expr::Aliased { name, .. } => Some(name.clone()),
310 _ => None,
311 }
312 })
313 .collect();
314
315 if columns.is_empty() {
316 return Err(PgError::Query(
317 "copy_bulk_bytes requires columns in Qail".to_string(),
318 ));
319 }
320
321 // Direct to raw COPY - zero encoding!
322 self.connection.copy_in_raw(table, &columns, data).await
323 }
324
325 /// Export table data using PostgreSQL COPY TO STDOUT (zero-copy streaming).
326 /// Returns rows as tab-separated bytes for direct re-import via copy_bulk_bytes.
327 /// # Example
328 /// ```ignore
329 /// let data = driver.copy_export_table("users", &["id", "name"]).await?;
330 /// shadow_driver.copy_bulk_bytes(&cmd, &data).await?;
331 /// ```
332 pub async fn copy_export_table(
333 &mut self,
334 table: &str,
335 columns: &[String],
336 ) -> PgResult<Vec<u8>> {
337 let quote_ident = |ident: &str| -> String {
338 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
339 };
340 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
341 let sql = format!(
342 "COPY {} ({}) TO STDOUT",
343 quote_ident(table),
344 cols.join(", ")
345 );
346
347 self.connection.copy_out_raw(&sql).await
348 }
349
350 /// Stream table export using COPY TO STDOUT with bounded memory usage.
351 ///
352 /// Chunks are forwarded directly from PostgreSQL to `on_chunk`.
353 pub async fn copy_export_table_stream<F, Fut>(
354 &mut self,
355 table: &str,
356 columns: &[String],
357 on_chunk: F,
358 ) -> PgResult<()>
359 where
360 F: FnMut(Vec<u8>) -> Fut,
361 Fut: std::future::Future<Output = PgResult<()>>,
362 {
363 let quote_ident = |ident: &str| -> String {
364 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
365 };
366 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
367 let sql = format!(
368 "COPY {} ({}) TO STDOUT",
369 quote_ident(table),
370 cols.join(", ")
371 );
372 self.connection.copy_out_raw_stream(&sql, on_chunk).await
373 }
374
375 /// Stream an AST-native `Qail::Export` command as raw COPY chunks.
376 pub async fn copy_export_cmd_stream<F, Fut>(&mut self, cmd: &Qail, on_chunk: F) -> PgResult<()>
377 where
378 F: FnMut(Vec<u8>) -> Fut,
379 Fut: std::future::Future<Output = PgResult<()>>,
380 {
381 self.connection.copy_export_stream_raw(cmd, on_chunk).await
382 }
383
384 /// Stream an AST-native `Qail::Export` command as parsed text rows.
385 pub async fn copy_export_cmd_stream_rows<F>(&mut self, cmd: &Qail, on_row: F) -> PgResult<()>
386 where
387 F: FnMut(Vec<String>) -> PgResult<()>,
388 {
389 self.connection.copy_export_stream_rows(cmd, on_row).await
390 }
391
392 /// Stream large result sets using PostgreSQL cursors.
393 /// This method uses DECLARE CURSOR internally to stream rows in batches,
394 /// avoiding loading the entire result set into memory.
395 /// # Example
396 /// ```ignore
397 /// let cmd = Qail::get("large_table");
398 /// let batches = driver.stream_cmd(&cmd, 100).await?;
399 /// for batch in batches {
400 /// for row in batch {
401 /// // process row
402 /// }
403 /// }
404 /// ```
405 pub async fn stream_cmd(&mut self, cmd: &Qail, batch_size: usize) -> PgResult<Vec<Vec<PgRow>>> {
406 use std::sync::atomic::{AtomicU64, Ordering};
407 static CURSOR_ID: AtomicU64 = AtomicU64::new(0);
408
409 let cursor_name = format!("qail_cursor_{}", CURSOR_ID.fetch_add(1, Ordering::SeqCst));
410
411 // AST-NATIVE: Generate SQL directly from AST (no to_sql_parameterized!)
412 use crate::protocol::AstEncoder;
413 let mut sql_buf = bytes::BytesMut::with_capacity(256);
414 let mut params: Vec<Option<Vec<u8>>> = Vec::new();
415 AstEncoder::encode_select_sql(cmd, &mut sql_buf, &mut params)
416 .map_err(|e| PgError::Encode(e.to_string()))?;
417 let sql = String::from_utf8_lossy(&sql_buf).to_string();
418
419 // Must be in a transaction for cursors
420 self.connection.begin_transaction().await?;
421
422 // Declare cursor
423 // Declare cursor with bind params — Extended Query Protocol handles $1, $2 etc.
424 self.connection
425 .declare_cursor(&cursor_name, &sql, ¶ms)
426 .await?;
427
428 // Fetch all batches
429 let mut all_batches = Vec::new();
430 while let Some(rows) = self
431 .connection
432 .fetch_cursor(&cursor_name, batch_size)
433 .await?
434 {
435 let pg_rows: Vec<PgRow> = rows
436 .into_iter()
437 .map(|cols| PgRow {
438 columns: cols,
439 column_info: None,
440 })
441 .collect();
442 all_batches.push(pg_rows);
443 }
444
445 self.connection.close_cursor(&cursor_name).await?;
446 self.connection.commit().await?;
447
448 Ok(all_batches)
449 }
450}