Skip to main content

qail_pg/driver/
mod.rs

1//! PostgreSQL Driver Module (Layer 3: Async I/O)
2//!
3//! Auto-detects the best I/O backend:
4//! - Linux 5.1+: io_uring (fastest)
5//! - Linux < 5.1 / macOS / Windows: tokio
6//!
7//! Connection methods are split across modules for easier maintenance:
8//! - `connection.rs` - Core struct and connect methods
9//! - `io.rs` - send, recv, recv_msg_type_fast
10//! - `query.rs` - query, query_cached, execute_simple
11//! - `transaction.rs` - begin_transaction, commit, rollback
12//! - `cursor.rs` - declare_cursor, fetch_cursor, close_cursor  
13//! - `copy.rs` - COPY protocol for bulk operations
14//! - `pipeline.rs` - High-performance pipelining (275k q/s)
15//! - `cancel.rs` - Query cancellation
16//! - `io_backend.rs` - Runtime I/O backend detection
17
18mod cancel;
19mod connection;
20mod copy;
21mod cursor;
22mod io;
23pub mod io_backend;
24mod pipeline;
25mod pool;
26mod prepared;
27mod query;
28pub mod rls;
29mod row;
30mod stream;
31mod transaction;
32
33pub use connection::PgConnection;
34pub use connection::TlsConfig;
35pub(crate) use connection::{CANCEL_REQUEST_CODE, parse_affected_rows};
36pub use cancel::CancelToken;
37pub use io_backend::{IoBackend, backend_name, detect as detect_io_backend};
38pub use pool::{PgPool, PoolConfig, PoolStats, PooledConnection};
39pub use prepared::PreparedStatement;
40pub use rls::RlsContext;
41pub use row::QailRow;
42
43use qail_core::ast::Qail;
44use std::collections::HashMap;
45use std::sync::Arc;
46
47#[derive(Debug, Clone)]
48pub struct ColumnInfo {
49    pub name_to_index: HashMap<String, usize>,
50    pub oids: Vec<u32>,
51    pub formats: Vec<i16>,
52}
53
54impl ColumnInfo {
55    pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
56        let mut name_to_index = HashMap::with_capacity(fields.len());
57        let mut oids = Vec::with_capacity(fields.len());
58        let mut formats = Vec::with_capacity(fields.len());
59
60        for (i, field) in fields.iter().enumerate() {
61            name_to_index.insert(field.name.clone(), i);
62            oids.push(field.type_oid);
63            formats.push(field.format);
64        }
65
66        Self {
67            name_to_index,
68            oids,
69            formats,
70        }
71    }
72}
73
74/// PostgreSQL row with column data and metadata.
75pub struct PgRow {
76    pub columns: Vec<Option<Vec<u8>>>,
77    pub column_info: Option<Arc<ColumnInfo>>,
78}
79
80/// Error type for PostgreSQL driver operations.
81#[derive(Debug)]
82pub enum PgError {
83    Connection(String),
84    Protocol(String),
85    Auth(String),
86    Query(String),
87    NoRows,
88    /// I/O error
89    Io(std::io::Error),
90    /// Encoding error (parameter limit, etc.)
91    Encode(String),
92}
93
94impl std::fmt::Display for PgError {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            PgError::Connection(e) => write!(f, "Connection error: {}", e),
98            PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
99            PgError::Auth(e) => write!(f, "Auth error: {}", e),
100            PgError::Query(e) => write!(f, "Query error: {}", e),
101            PgError::NoRows => write!(f, "No rows returned"),
102            PgError::Io(e) => write!(f, "I/O error: {}", e),
103            PgError::Encode(e) => write!(f, "Encode error: {}", e),
104        }
105    }
106}
107
108impl std::error::Error for PgError {}
109
110impl From<std::io::Error> for PgError {
111    fn from(e: std::io::Error) -> Self {
112        PgError::Io(e)
113    }
114}
115
116/// Result type for PostgreSQL operations.
117pub type PgResult<T> = Result<T, PgError>;
118
119/// Combines the pure encoder (Layer 2) with async I/O (Layer 3).
120pub struct PgDriver {
121    #[allow(dead_code)]
122    connection: PgConnection,
123    /// Current RLS context, if set. Used for multi-tenant data isolation.
124    rls_context: Option<RlsContext>,
125}
126
127impl PgDriver {
128    /// Create a new driver with an existing connection.
129    pub fn new(connection: PgConnection) -> Self {
130        Self { connection, rls_context: None }
131    }
132
133    /// Builder pattern for ergonomic connection configuration.
134    /// # Example
135    /// ```ignore
136    /// let driver = PgDriver::builder()
137    ///     .host("localhost")
138    ///     .port(5432)
139    ///     .user("admin")
140    ///     .database("mydb")
141    ///     .password("secret")  // Optional
142    ///     .connect()
143    ///     .await?;
144    /// ```
145    pub fn builder() -> PgDriverBuilder {
146        PgDriverBuilder::new()
147    }
148
149    /// Connect to PostgreSQL and create a driver (trust mode, no password).
150    pub async fn connect(host: &str, port: u16, user: &str, database: &str) -> PgResult<Self> {
151        let connection = PgConnection::connect(host, port, user, database).await?;
152        Ok(Self::new(connection))
153    }
154
155    /// Connect to PostgreSQL with password authentication (SCRAM-SHA-256).
156    pub async fn connect_with_password(
157        host: &str,
158        port: u16,
159        user: &str,
160        database: &str,
161        password: &str,
162    ) -> PgResult<Self> {
163        let connection =
164            PgConnection::connect_with_password(host, port, user, database, Some(password)).await?;
165        Ok(Self::new(connection))
166    }
167
168    /// Connect using DATABASE_URL environment variable.
169    /// 
170    /// Parses the URL format: `postgresql://user:password@host:port/database`
171    /// or `postgres://user:password@host:port/database`
172    /// 
173    /// # Example
174    /// ```ignore
175    /// // Set DATABASE_URL=postgresql://user:pass@localhost:5432/mydb
176    /// let driver = PgDriver::connect_env().await?;
177    /// ```
178    pub async fn connect_env() -> PgResult<Self> {
179        let url = std::env::var("DATABASE_URL")
180            .map_err(|_| PgError::Connection("DATABASE_URL environment variable not set".to_string()))?;
181        Self::connect_url(&url).await
182    }
183
184    /// Connect using a PostgreSQL connection URL.
185    /// 
186    /// Parses the URL format: `postgresql://user:password@host:port/database`
187    /// or `postgres://user:password@host:port/database`
188    /// 
189    /// # Example
190    /// ```ignore
191    /// let driver = PgDriver::connect_url("postgresql://user:pass@localhost:5432/mydb").await?;
192    /// ```
193    pub async fn connect_url(url: &str) -> PgResult<Self> {
194        let (host, port, user, database, password) = Self::parse_database_url(url)?;
195        
196        if let Some(pwd) = password {
197            Self::connect_with_password(&host, port, &user, &database, &pwd).await
198        } else {
199            Self::connect(&host, port, &user, &database).await
200        }
201    }
202
203    /// Parse a PostgreSQL connection URL into components.
204    /// 
205    /// Format: `postgresql://user:password@host:port/database`
206    /// or `postgres://user:password@host:port/database`
207    /// 
208    /// URL percent-encoding is automatically decoded for user and password.
209    fn parse_database_url(url: &str) -> PgResult<(String, u16, String, String, Option<String>)> {
210        // Remove scheme (postgresql:// or postgres://)
211        let after_scheme = url.split("://").nth(1)
212            .ok_or_else(|| PgError::Connection("Invalid DATABASE_URL: missing scheme".to_string()))?;
213        
214        // Split into auth@host parts
215        let (auth_part, host_db_part) = if let Some(at_pos) = after_scheme.rfind('@') {
216            (Some(&after_scheme[..at_pos]), &after_scheme[at_pos + 1..])
217        } else {
218            (None, after_scheme)
219        };
220        
221        // Parse auth (user:password)
222        let (user, password) = if let Some(auth) = auth_part {
223            let parts: Vec<&str> = auth.splitn(2, ':').collect();
224            if parts.len() == 2 {
225                // URL-decode both user and password
226                (
227                    Self::percent_decode(parts[0]),
228                    Some(Self::percent_decode(parts[1])),
229                )
230            } else {
231                (Self::percent_decode(parts[0]), None)
232            }
233        } else {
234            return Err(PgError::Connection("Invalid DATABASE_URL: missing user".to_string()));
235        };
236        
237        // Parse host:port/database
238        let (host_port, database) = if let Some(slash_pos) = host_db_part.find('/') {
239            (&host_db_part[..slash_pos], host_db_part[slash_pos + 1..].to_string())
240        } else {
241            return Err(PgError::Connection("Invalid DATABASE_URL: missing database name".to_string()));
242        };
243        
244        // Parse host:port
245        let (host, port) = if let Some(colon_pos) = host_port.rfind(':') {
246            let port_str = &host_port[colon_pos + 1..];
247            let port = port_str.parse::<u16>()
248                .map_err(|_| PgError::Connection(format!("Invalid port: {}", port_str)))?;
249            (host_port[..colon_pos].to_string(), port)
250        } else {
251            (host_port.to_string(), 5432) // Default PostgreSQL port
252        };
253        
254        Ok((host, port, user, database, password))
255    }
256    
257    /// Decode URL percent-encoded string.
258    /// Handles common encodings: %20 (space), %2B (+), %3D (=), %40 (@), %2F (/), etc.
259    fn percent_decode(s: &str) -> String {
260        let mut result = String::with_capacity(s.len());
261        let mut chars = s.chars().peekable();
262        
263        while let Some(c) = chars.next() {
264            if c == '%' {
265                // Try to parse next two chars as hex
266                let hex: String = chars.by_ref().take(2).collect();
267                if hex.len() == 2
268                    && let Ok(byte) = u8::from_str_radix(&hex, 16)
269                {
270                    result.push(byte as char);
271                    continue;
272                }
273                // If parsing failed, keep original
274                result.push('%');
275                result.push_str(&hex);
276            } else if c == '+' {
277                // '+' often represents space in query strings (form encoding)
278                // But in path components, keep as-is. PostgreSQL URLs use path encoding.
279                result.push('+');
280            } else {
281                result.push(c);
282            }
283        }
284        
285        result
286    }
287
288    /// Connect to PostgreSQL with a connection timeout.
289    /// If the connection cannot be established within the timeout, returns an error.
290    /// # Example
291    /// ```ignore
292    /// use std::time::Duration;
293    /// let driver = PgDriver::connect_with_timeout(
294    ///     "localhost", 5432, "user", "db", "password",
295    ///     Duration::from_secs(5)
296    /// ).await?;
297    /// ```
298    pub async fn connect_with_timeout(
299        host: &str,
300        port: u16,
301        user: &str,
302        database: &str,
303        password: &str,
304        timeout: std::time::Duration,
305    ) -> PgResult<Self> {
306        tokio::time::timeout(
307            timeout,
308            Self::connect_with_password(host, port, user, database, password),
309        )
310        .await
311        .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
312    }
313    /// Clear the prepared statement cache.
314    /// Frees memory by removing all cached statements.
315    /// Note: Statements remain on the PostgreSQL server until connection closes.
316    pub fn clear_cache(&mut self) {
317        self.connection.stmt_cache.clear();
318        self.connection.prepared_statements.clear();
319    }
320
321    /// Get cache statistics.
322    /// Returns (current_size, max_capacity).
323    pub fn cache_stats(&self) -> (usize, usize) {
324        (self.connection.stmt_cache.len(), self.connection.stmt_cache.cap().get())
325    }
326
327    /// Execute a QAIL command and fetch all rows (CACHED + ZERO-ALLOC).
328    /// **Default method** - uses prepared statement caching for best performance.
329    /// On first call: sends Parse + Bind + Execute + Sync
330    /// On subsequent calls with same SQL: sends only Bind + Execute (SKIPS Parse!)
331    /// Uses LRU cache with max 1000 statements (auto-evicts oldest).
332    pub async fn fetch_all(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
333        // Delegate to fetch_all_cached for cached-by-default behavior
334        self.fetch_all_cached(cmd).await
335    }
336
337    /// Execute a QAIL command and fetch all rows as a typed struct.
338    /// Requires the target type to implement `QailRow` trait.
339    /// 
340    /// # Example
341    /// ```ignore
342    /// let users: Vec<User> = driver.fetch_typed::<User>(&query).await?;
343    /// ```
344    pub async fn fetch_typed<T: row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Vec<T>> {
345        let rows = self.fetch_all(cmd).await?;
346        Ok(rows.iter().map(T::from_row).collect())
347    }
348
349    /// Execute a QAIL command and fetch a single row as a typed struct.
350    /// Returns None if no rows are returned.
351    pub async fn fetch_one_typed<T: row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Option<T>> {
352        let rows = self.fetch_all(cmd).await?;
353        Ok(rows.first().map(T::from_row))
354    }
355
356    /// Execute a QAIL command and fetch all rows (UNCACHED).
357    /// Sends Parse + Bind + Execute on every call.
358    /// Use for one-off queries or when caching is not desired.
359    pub async fn fetch_all_uncached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
360        use crate::protocol::AstEncoder;
361
362        let wire_bytes = AstEncoder::encode_cmd_reuse(
363            cmd,
364            &mut self.connection.sql_buf,
365            &mut self.connection.params_buf,
366        );
367
368        self.connection.send_bytes(&wire_bytes).await?;
369
370        let mut rows: Vec<PgRow> = Vec::new();
371        let mut column_info: Option<Arc<ColumnInfo>> = None;
372
373        let mut error: Option<PgError> = None;
374
375        loop {
376            let msg = self.connection.recv().await?;
377            match msg {
378                crate::protocol::BackendMessage::ParseComplete
379                | crate::protocol::BackendMessage::BindComplete => {}
380                crate::protocol::BackendMessage::RowDescription(fields) => {
381                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
382                }
383                crate::protocol::BackendMessage::DataRow(data) => {
384                    if error.is_none() {
385                        rows.push(PgRow {
386                            columns: data,
387                            column_info: column_info.clone(),
388                        });
389                    }
390                }
391                crate::protocol::BackendMessage::CommandComplete(_) => {}
392                crate::protocol::BackendMessage::ReadyForQuery(_) => {
393                    if let Some(err) = error {
394                        return Err(err);
395                    }
396                    return Ok(rows);
397                }
398                crate::protocol::BackendMessage::ErrorResponse(err) => {
399                    if error.is_none() {
400                        error = Some(PgError::Query(err.message));
401                    }
402                }
403                _ => {}
404            }
405        }
406    }
407
408    /// Execute a QAIL command and fetch all rows (FAST VERSION).
409    /// Uses optimized recv_with_data_fast for faster response parsing.
410    /// Skips column metadata collection for maximum speed.
411    pub async fn fetch_all_fast(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
412        use crate::protocol::AstEncoder;
413
414        let wire_bytes = AstEncoder::encode_cmd_reuse(
415            cmd,
416            &mut self.connection.sql_buf,
417            &mut self.connection.params_buf,
418        );
419
420        self.connection.send_bytes(&wire_bytes).await?;
421
422        // Collect results using FAST receiver
423        let mut rows: Vec<PgRow> = Vec::new();
424        let mut error: Option<PgError> = None;
425
426        loop {
427            let res = self.connection.recv_with_data_fast().await;
428            match res {
429                Ok((msg_type, data)) => {
430                    match msg_type {
431                        b'D' => {
432                             // DataRow
433                            if error.is_none() && let Some(columns) = data {
434                                rows.push(PgRow {
435                                    columns,
436                                    column_info: None, // Skip metadata for speed
437                                });
438                            }
439                        }
440                        b'Z' => {
441                            // ReadyForQuery
442                            if let Some(err) = error {
443                                return Err(err);
444                            }
445                            return Ok(rows);
446                        }
447                        _ => {} // 1, 2, C, T - skip Parse/Bind/CommandComplete/RowDescription
448                    }
449                }
450                Err(e) => {
451                   // recv_with_data_fast returns Err on ErrorResponse automatically.
452                   // We need to capture it and continue draining.
453                   // BUT recv_with_data_fast doesn't return the error *message type* if it fails.
454                   // It returns PgError::Query(msg).
455                   // So we capture the error, but we must continue RECVing until ReadyForQuery.
456                   // However, recv_with_data_fast will KEEP returning Err(Query) if the buffer has E?
457                   // No, recv_with_data_fast consumes the E message before returning Err.
458                   
459                   if error.is_none() {
460                       error = Some(e);
461                   }
462                   // Continue loop to drain until ReadyForQuery... 
463                   // BUT wait, does recv_with_data_fast handle the *rest* of the stream?
464                   // If we call it again, it will read the NEXT message.
465                   // So we just continue.
466                }
467            }
468        }
469    }
470
471    /// Execute a QAIL command and fetch one row.
472    pub async fn fetch_one(&mut self, cmd: &Qail) -> PgResult<PgRow> {
473        let rows = self.fetch_all(cmd).await?;
474        rows.into_iter().next().ok_or(PgError::NoRows)
475    }
476
477    /// Execute a QAIL command with PREPARED STATEMENT CACHING (ZERO-ALLOC).
478    /// Like fetch_all(), but caches the prepared statement on the server.
479    /// On first call: sends Parse + Bind + Execute + Sync
480    /// On subsequent calls: sends only Bind + Execute + Sync (SKIPS Parse!)
481    pub async fn fetch_all_cached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
482        use crate::protocol::AstEncoder;
483        use std::collections::hash_map::DefaultHasher;
484        use std::hash::{Hash, Hasher};
485
486        self.connection.sql_buf.clear();
487        self.connection.params_buf.clear();
488        
489        // Encode SQL to reusable buffer
490        match cmd.action {
491            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
492                crate::protocol::ast_encoder::dml::encode_select(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
493            }
494            qail_core::ast::Action::Add => {
495                crate::protocol::ast_encoder::dml::encode_insert(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
496            }
497            qail_core::ast::Action::Set => {
498                crate::protocol::ast_encoder::dml::encode_update(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
499            }
500            qail_core::ast::Action::Del => {
501                crate::protocol::ast_encoder::dml::encode_delete(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
502            }
503            _ => {
504                // Fallback for unsupported actions
505                let (sql, params) = AstEncoder::encode_cmd_sql(cmd);
506                let raw_rows = self.connection.query_cached(&sql, &params).await?;
507                return Ok(raw_rows.into_iter().map(|data| PgRow { columns: data, column_info: None }).collect());
508            }
509        }
510
511        let mut hasher = DefaultHasher::new();
512        self.connection.sql_buf.hash(&mut hasher);
513        let sql_hash = hasher.finish();
514
515        let stmt_name = if let Some(name) = self.connection.stmt_cache.get(&sql_hash) {
516            name.clone()
517        } else {
518            let name = format!("qail_{:x}", sql_hash);
519            
520            use crate::protocol::PgEncoder;
521            use tokio::io::AsyncWriteExt;
522            
523            let sql_str = std::str::from_utf8(&self.connection.sql_buf).unwrap_or("");
524            let parse_msg = PgEncoder::encode_parse(&name, sql_str, &[]);
525            self.connection.stream.write_all(&parse_msg).await?;
526            
527            self.connection.stmt_cache.put(sql_hash, name.clone());
528            self.connection.prepared_statements.insert(name.clone(), sql_str.to_string());
529            
530            name
531        };
532
533        // Send Bind + Execute + Sync (always)
534        use crate::protocol::PgEncoder;
535        use tokio::io::AsyncWriteExt;
536        
537        let mut buf = bytes::BytesMut::with_capacity(128);
538        PgEncoder::encode_bind_to(&mut buf, &stmt_name, &self.connection.params_buf)
539            .map_err(|e| PgError::Encode(e.to_string()))?;
540        PgEncoder::encode_execute_to(&mut buf);
541        PgEncoder::encode_sync_to(&mut buf);
542        self.connection.stream.write_all(&buf).await?;
543
544        let mut rows: Vec<PgRow> = Vec::new();
545        let mut error: Option<PgError> = None;
546
547        loop {
548            let msg = self.connection.recv().await?;
549            match msg {
550                crate::protocol::BackendMessage::ParseComplete
551                | crate::protocol::BackendMessage::BindComplete => {}
552                crate::protocol::BackendMessage::RowDescription(_) => {}
553                crate::protocol::BackendMessage::DataRow(data) => {
554                    if error.is_none() {
555                        rows.push(PgRow {
556                            columns: data,
557                            column_info: None,
558                        });
559                    }
560                }
561                crate::protocol::BackendMessage::CommandComplete(_) => {}
562                crate::protocol::BackendMessage::ReadyForQuery(_) => {
563                    if let Some(err) = error {
564                        return Err(err);
565                    }
566                    return Ok(rows);
567                }
568                crate::protocol::BackendMessage::ErrorResponse(err) => {
569                    if error.is_none() {
570                        error = Some(PgError::Query(err.message));
571                        // Invalidate cache to prevent "prepared statement does not exist"
572                        // on next retry if the error happened during Parse/Bind.
573                        self.connection.stmt_cache.clear();
574                        self.connection.prepared_statements.clear();
575                    }
576                }
577                _ => {}
578            }
579        }
580    }
581
582    /// Execute a QAIL command (for mutations) - ZERO-ALLOC.
583    pub async fn execute(&mut self, cmd: &Qail) -> PgResult<u64> {
584        use crate::protocol::AstEncoder;
585
586        let wire_bytes = AstEncoder::encode_cmd_reuse(
587            cmd,
588            &mut self.connection.sql_buf,
589            &mut self.connection.params_buf,
590        );
591
592        self.connection.send_bytes(&wire_bytes).await?;
593
594        let mut affected = 0u64;
595        let mut error: Option<PgError> = None;
596
597        loop {
598            let msg = self.connection.recv().await?;
599            match msg {
600                crate::protocol::BackendMessage::ParseComplete
601                | crate::protocol::BackendMessage::BindComplete => {}
602                crate::protocol::BackendMessage::RowDescription(_) => {}
603                crate::protocol::BackendMessage::DataRow(_) => {}
604                crate::protocol::BackendMessage::CommandComplete(tag) => {
605                    if error.is_none() && let Some(n) = tag.split_whitespace().last() {
606                        affected = n.parse().unwrap_or(0);
607                    }
608                }
609                crate::protocol::BackendMessage::ReadyForQuery(_) => {
610                    if let Some(err) = error {
611                        return Err(err);
612                    }
613                    return Ok(affected);
614                }
615                crate::protocol::BackendMessage::ErrorResponse(err) => {
616                    if error.is_none() {
617                        error = Some(PgError::Query(err.message));
618                    }
619                }
620                _ => {}
621            }
622        }
623    }
624
625    // ==================== TRANSACTION CONTROL ====================
626
627    /// Begin a transaction (AST-native).
628    pub async fn begin(&mut self) -> PgResult<()> {
629        self.connection.begin_transaction().await
630    }
631
632    /// Commit the current transaction (AST-native).
633    pub async fn commit(&mut self) -> PgResult<()> {
634        self.connection.commit().await
635    }
636
637    /// Rollback the current transaction (AST-native).
638    pub async fn rollback(&mut self) -> PgResult<()> {
639        self.connection.rollback().await
640    }
641
642    /// Create a named savepoint within the current transaction.
643    /// Savepoints allow partial rollback within a transaction.
644    /// Use `rollback_to()` to return to this savepoint.
645    /// # Example
646    /// ```ignore
647    /// driver.begin().await?;
648    /// driver.execute(&insert1).await?;
649    /// driver.savepoint("sp1").await?;
650    /// driver.execute(&insert2).await?;
651    /// driver.rollback_to("sp1").await?; // Undo insert2, keep insert1
652    /// driver.commit().await?;
653    /// ```
654    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
655        self.connection.savepoint(name).await
656    }
657
658    /// Rollback to a previously created savepoint.
659    /// Discards all changes since the named savepoint was created,
660    /// but keeps the transaction open.
661    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
662        self.connection.rollback_to(name).await
663    }
664
665    /// Release a savepoint (free resources, if no longer needed).
666    /// After release, the savepoint cannot be rolled back to.
667    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
668        self.connection.release_savepoint(name).await
669    }
670
671    // ==================== BATCH TRANSACTIONS ====================
672
673    /// Execute multiple commands in a single atomic transaction.
674    /// All commands succeed or all are rolled back.
675    /// # Example
676    /// ```ignore
677    /// let cmds = vec![
678    ///     Qail::add("users").columns(["name"]).values(["Alice"]),
679    ///     Qail::add("users").columns(["name"]).values(["Bob"]),
680    /// ];
681    /// let results = driver.execute_batch(&cmds).await?;
682    /// // results = [1, 1] (rows affected)
683    /// ```
684    pub async fn execute_batch(&mut self, cmds: &[Qail]) -> PgResult<Vec<u64>> {
685        self.begin().await?;
686        let mut results = Vec::with_capacity(cmds.len());
687        for cmd in cmds {
688            match self.execute(cmd).await {
689                Ok(n) => results.push(n),
690                Err(e) => {
691                    self.rollback().await?;
692                    return Err(e);
693                }
694            }
695        }
696        self.commit().await?;
697        Ok(results)
698    }
699
700    // ==================== STATEMENT TIMEOUT ====================
701
702    /// Set statement timeout for this connection (in milliseconds).
703    /// # Example
704    /// ```ignore
705    /// driver.set_statement_timeout(30_000).await?; // 30 seconds
706    /// ```
707    pub async fn set_statement_timeout(&mut self, ms: u32) -> PgResult<()> {
708        self.execute_raw(&format!("SET statement_timeout = {}", ms))
709            .await
710    }
711
712    /// Reset statement timeout to default (no limit).
713    pub async fn reset_statement_timeout(&mut self) -> PgResult<()> {
714        self.execute_raw("RESET statement_timeout").await
715    }
716
717    // ==================== RLS (MULTI-TENANT) ====================
718
719    /// Set the RLS context for multi-tenant data isolation.
720    ///
721    /// Configures PostgreSQL session variables (`app.current_operator_id`, etc.)
722    /// so that RLS policies automatically filter data by tenant.
723    ///
724    /// Since `PgDriver` takes `&mut self`, the borrow checker guarantees
725    /// that `set_config` and all subsequent queries execute on the **same
726    /// connection** — no pool race conditions possible.
727    ///
728    /// # Example
729    /// ```ignore
730    /// driver.set_rls_context(RlsContext::operator("op-123")).await?;
731    /// let orders = driver.fetch_all(&Qail::get("orders")).await?;
732    /// // orders only contains rows where operator_id = 'op-123'
733    /// ```
734    pub async fn set_rls_context(&mut self, ctx: rls::RlsContext) -> PgResult<()> {
735        let sql = rls::context_to_sql(&ctx);
736        self.execute_raw(&sql).await?;
737        self.rls_context = Some(ctx);
738        Ok(())
739    }
740
741    /// Clear the RLS context, resetting session variables to safe defaults.
742    ///
743    /// After clearing, all RLS-protected queries will return zero rows
744    /// (empty operator_id matches nothing).
745    pub async fn clear_rls_context(&mut self) -> PgResult<()> {
746        self.execute_raw(rls::reset_sql()).await?;
747        self.rls_context = None;
748        Ok(())
749    }
750
751    /// Get the current RLS context, if any.
752    pub fn rls_context(&self) -> Option<&rls::RlsContext> {
753        self.rls_context.as_ref()
754    }
755
756    // ==================== PIPELINE (BATCH) ====================
757
758    /// Execute multiple Qail ASTs in a single network round-trip (PIPELINING).
759    /// # Example
760    /// ```ignore
761    /// let cmds: Vec<Qail> = (1..=1000)
762    ///     .map(|i| Qail::get("harbors").columns(["id", "name"]).limit(i))
763    ///     .collect();
764    /// let count = driver.pipeline_batch(&cmds).await?;
765    /// assert_eq!(count, 1000);
766    /// ```
767    pub async fn pipeline_batch(&mut self, cmds: &[Qail]) -> PgResult<usize> {
768        self.connection.pipeline_ast_fast(cmds).await
769    }
770
771    /// Execute multiple Qail ASTs and return full row data.
772    pub async fn pipeline_fetch(&mut self, cmds: &[Qail]) -> PgResult<Vec<Vec<PgRow>>> {
773        let raw_results = self.connection.pipeline_ast(cmds).await?;
774
775        let results: Vec<Vec<PgRow>> = raw_results
776            .into_iter()
777            .map(|rows| {
778                rows.into_iter()
779                    .map(|columns| PgRow {
780                        columns,
781                        column_info: None,
782                    })
783                    .collect()
784            })
785            .collect();
786
787        Ok(results)
788    }
789
790    /// Prepare a SQL statement for repeated execution.
791    pub async fn prepare(&mut self, sql: &str) -> PgResult<PreparedStatement> {
792        self.connection.prepare(sql).await
793    }
794
795    /// Execute a prepared statement pipeline in FAST mode (count only).
796    pub async fn pipeline_prepared_fast(
797        &mut self,
798        stmt: &PreparedStatement,
799        params_batch: &[Vec<Option<Vec<u8>>>],
800    ) -> PgResult<usize> {
801        self.connection
802            .pipeline_prepared_fast(stmt, params_batch)
803            .await
804    }
805
806    // ==================== LEGACY/BOOTSTRAP ====================
807
808    /// Execute a raw SQL string.
809    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
810    /// Use for bootstrap DDL only (e.g., migration table creation).
811    /// For transactions, use `begin()`, `commit()`, `rollback()`.
812    pub async fn execute_raw(&mut self, sql: &str) -> PgResult<()> {
813        // Reject literal NULL bytes - they corrupt PostgreSQL connection state
814        if sql.as_bytes().contains(&0) {
815            return Err(crate::PgError::Protocol(
816                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
817            ));
818        }
819        self.connection.execute_simple(sql).await
820    }
821
822    /// Execute a raw SQL query and return rows.
823    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
824    /// Use for bootstrap/admin queries only.
825    pub async fn fetch_raw(&mut self, sql: &str) -> PgResult<Vec<PgRow>> {
826        if sql.as_bytes().contains(&0) {
827            return Err(crate::PgError::Protocol(
828                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
829            ));
830        }
831        
832        use tokio::io::AsyncWriteExt;
833        use crate::protocol::PgEncoder;
834        
835        // Use simple query protocol (no prepared statements)
836        let msg = PgEncoder::encode_query_string(sql);
837        self.connection.stream.write_all(&msg).await?;
838        
839        let mut rows: Vec<PgRow> = Vec::new();
840        let mut column_info: Option<std::sync::Arc<ColumnInfo>> = None;
841        
842
843        let mut error: Option<PgError> = None;
844
845        loop {
846            let msg = self.connection.recv().await?;
847            match msg {
848                crate::protocol::BackendMessage::RowDescription(fields) => {
849                    column_info = Some(std::sync::Arc::new(ColumnInfo::from_fields(&fields)));
850                }
851                crate::protocol::BackendMessage::DataRow(data) => {
852                    if error.is_none() {
853                        rows.push(PgRow {
854                            columns: data,
855                            column_info: column_info.clone(),
856                        });
857                    }
858                }
859                crate::protocol::BackendMessage::CommandComplete(_) => {}
860                crate::protocol::BackendMessage::ReadyForQuery(_) => {
861                    if let Some(err) = error {
862                        return Err(err);
863                    }
864                    return Ok(rows);
865                }
866                crate::protocol::BackendMessage::ErrorResponse(err) => {
867                    if error.is_none() {
868                        error = Some(PgError::Query(err.message));
869                    }
870                }
871                _ => {}
872            }
873        }
874    }
875
876    /// Bulk insert data using PostgreSQL COPY protocol (AST-native).
877    /// Uses a Qail::Add to get validated table and column names from the AST,
878    /// not user-provided strings. This is the sound, AST-native approach.
879    /// # Example
880    /// ```ignore
881    /// // Create a Qail::Add to define table and columns
882    /// let cmd = Qail::add("users")
883    ///     .columns(["id", "name", "email"]);
884    /// // Bulk insert rows
885    /// let rows: Vec<Vec<Value>> = vec![
886    ///     vec![Value::Int(1), Value::String("Alice"), Value::String("alice@ex.com")],
887    ///     vec![Value::Int(2), Value::String("Bob"), Value::String("bob@ex.com")],
888    /// ];
889    /// driver.copy_bulk(&cmd, &rows).await?;
890    /// ```
891    pub async fn copy_bulk(
892        &mut self,
893        cmd: &Qail,
894        rows: &[Vec<qail_core::ast::Value>],
895    ) -> PgResult<u64> {
896        use qail_core::ast::Action;
897
898
899        if cmd.action != Action::Add {
900            return Err(PgError::Query(
901                "copy_bulk requires Qail::Add action".to_string(),
902            ));
903        }
904
905        let table = &cmd.table;
906
907        let columns: Vec<String> = cmd
908            .columns
909            .iter()
910            .filter_map(|expr| {
911                use qail_core::ast::Expr;
912                match expr {
913                    Expr::Named(name) => Some(name.clone()),
914                    Expr::Aliased { name, .. } => Some(name.clone()),
915                    Expr::Star => None, // Can't COPY with *
916                    _ => None,
917                }
918            })
919            .collect();
920
921        if columns.is_empty() {
922            return Err(PgError::Query(
923                "copy_bulk requires columns in Qail".to_string(),
924            ));
925        }
926
927        // Use optimized COPY path: direct Value → bytes encoding, single syscall
928        self.connection.copy_in_fast(table, &columns, rows).await
929    }
930
931    /// **Fastest** bulk insert using pre-encoded COPY data.
932    /// Accepts raw COPY text format bytes. Use when caller has already
933    /// encoded rows to avoid any encoding overhead.
934    /// # Format
935    /// Data should be tab-separated rows with newlines (COPY text format):
936    /// `1\thello\t3.14\n2\tworld\t2.71\n`
937    /// # Example
938    /// ```ignore
939    /// let cmd = Qail::add("users").columns(["id", "name"]);
940    /// let data = b"1\tAlice\n2\tBob\n";
941    /// driver.copy_bulk_bytes(&cmd, data).await?;
942    /// ```
943    pub async fn copy_bulk_bytes(&mut self, cmd: &Qail, data: &[u8]) -> PgResult<u64> {
944        use qail_core::ast::Action;
945
946        if cmd.action != Action::Add {
947            return Err(PgError::Query(
948                "copy_bulk_bytes requires Qail::Add action".to_string(),
949            ));
950        }
951
952        let table = &cmd.table;
953        let columns: Vec<String> = cmd
954            .columns
955            .iter()
956            .filter_map(|expr| {
957                use qail_core::ast::Expr;
958                match expr {
959                    Expr::Named(name) => Some(name.clone()),
960                    Expr::Aliased { name, .. } => Some(name.clone()),
961                    _ => None,
962                }
963            })
964            .collect();
965
966        if columns.is_empty() {
967            return Err(PgError::Query(
968                "copy_bulk_bytes requires columns in Qail".to_string(),
969            ));
970        }
971
972        // Direct to raw COPY - zero encoding!
973        self.connection.copy_in_raw(table, &columns, data).await
974    }
975
976    /// Export table data using PostgreSQL COPY TO STDOUT (zero-copy streaming).
977    /// Returns rows as tab-separated bytes for direct re-import via copy_bulk_bytes.
978    /// # Example
979    /// ```ignore
980    /// let data = driver.copy_export_table("users", &["id", "name"]).await?;
981    /// shadow_driver.copy_bulk_bytes(&cmd, &data).await?;
982    /// ```
983    pub async fn copy_export_table(
984        &mut self,
985        table: &str,
986        columns: &[String],
987    ) -> PgResult<Vec<u8>> {
988        let cols = columns.join(", ");
989        let sql = format!("COPY {} ({}) TO STDOUT", table, cols);
990        
991        self.connection.copy_out_raw(&sql).await
992    }
993
994    /// Stream large result sets using PostgreSQL cursors.
995    /// This method uses DECLARE CURSOR internally to stream rows in batches,
996    /// avoiding loading the entire result set into memory.
997    /// # Example
998    /// ```ignore
999    /// let cmd = Qail::get("large_table");
1000    /// let batches = driver.stream_cmd(&cmd, 100).await?;
1001    /// for batch in batches {
1002    ///     for row in batch {
1003    ///         // process row
1004    ///     }
1005    /// }
1006    /// ```
1007    pub async fn stream_cmd(
1008        &mut self,
1009        cmd: &Qail,
1010        batch_size: usize,
1011    ) -> PgResult<Vec<Vec<PgRow>>> {
1012        use std::sync::atomic::{AtomicU64, Ordering};
1013        static CURSOR_ID: AtomicU64 = AtomicU64::new(0);
1014
1015        let cursor_name = format!("qail_cursor_{}", CURSOR_ID.fetch_add(1, Ordering::SeqCst));
1016
1017        // AST-NATIVE: Generate SQL directly from AST (no to_sql_parameterized!)
1018        use crate::protocol::AstEncoder;
1019        let mut sql_buf = bytes::BytesMut::with_capacity(256);
1020        let mut params: Vec<Option<Vec<u8>>> = Vec::new();
1021        AstEncoder::encode_select_sql(cmd, &mut sql_buf, &mut params);
1022        let sql = String::from_utf8_lossy(&sql_buf).to_string();
1023
1024        // Must be in a transaction for cursors
1025        self.connection.begin_transaction().await?;
1026
1027        // Declare cursor
1028        self.connection.declare_cursor(&cursor_name, &sql).await?;
1029
1030        // Fetch all batches
1031        let mut all_batches = Vec::new();
1032        while let Some(rows) = self
1033            .connection
1034            .fetch_cursor(&cursor_name, batch_size)
1035            .await?
1036        {
1037            let pg_rows: Vec<PgRow> = rows
1038                .into_iter()
1039                .map(|cols| PgRow {
1040                    columns: cols,
1041                    column_info: None,
1042                })
1043                .collect();
1044            all_batches.push(pg_rows);
1045        }
1046
1047        self.connection.close_cursor(&cursor_name).await?;
1048        self.connection.commit().await?;
1049
1050        Ok(all_batches)
1051    }
1052}
1053
1054// ============================================================================
1055// Connection Builder
1056// ============================================================================
1057
1058/// Builder for creating PgDriver connections with named parameters.
1059/// # Example
1060/// ```ignore
1061/// let driver = PgDriver::builder()
1062///     .host("localhost")
1063///     .port(5432)
1064///     .user("admin")
1065///     .database("mydb")
1066///     .password("secret")
1067///     .connect()
1068///     .await?;
1069/// ```
1070#[derive(Default)]
1071pub struct PgDriverBuilder {
1072    host: Option<String>,
1073    port: Option<u16>,
1074    user: Option<String>,
1075    database: Option<String>,
1076    password: Option<String>,
1077    timeout: Option<std::time::Duration>,
1078}
1079
1080impl PgDriverBuilder {
1081    /// Create a new builder with default values.
1082    pub fn new() -> Self {
1083        Self::default()
1084    }
1085
1086    /// Set the host (default: "127.0.0.1").
1087    pub fn host(mut self, host: impl Into<String>) -> Self {
1088        self.host = Some(host.into());
1089        self
1090    }
1091
1092    /// Set the port (default: 5432).
1093    pub fn port(mut self, port: u16) -> Self {
1094        self.port = Some(port);
1095        self
1096    }
1097
1098    /// Set the username (required).
1099    pub fn user(mut self, user: impl Into<String>) -> Self {
1100        self.user = Some(user.into());
1101        self
1102    }
1103
1104    /// Set the database name (required).
1105    pub fn database(mut self, database: impl Into<String>) -> Self {
1106        self.database = Some(database.into());
1107        self
1108    }
1109
1110    /// Set the password (optional, for SCRAM-SHA-256 auth).
1111    pub fn password(mut self, password: impl Into<String>) -> Self {
1112        self.password = Some(password.into());
1113        self
1114    }
1115
1116    /// Set connection timeout (optional).
1117    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
1118        self.timeout = Some(timeout);
1119        self
1120    }
1121
1122    /// Connect to PostgreSQL using the configured parameters.
1123    pub async fn connect(self) -> PgResult<PgDriver> {
1124        let host = self.host.as_deref().unwrap_or("127.0.0.1");
1125        let port = self.port.unwrap_or(5432);
1126        let user = self.user.as_deref().ok_or_else(|| {
1127            PgError::Connection("User is required".to_string())
1128        })?;
1129        let database = self.database.as_deref().ok_or_else(|| {
1130            PgError::Connection("Database is required".to_string())
1131        })?;
1132
1133        match (self.password.as_deref(), self.timeout) {
1134            (Some(password), Some(timeout)) => {
1135                PgDriver::connect_with_timeout(host, port, user, database, password, timeout).await
1136            }
1137            (Some(password), None) => {
1138                PgDriver::connect_with_password(host, port, user, database, password).await
1139            }
1140            (None, Some(timeout)) => {
1141                tokio::time::timeout(
1142                    timeout,
1143                    PgDriver::connect(host, port, user, database),
1144                )
1145                .await
1146                .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
1147            }
1148            (None, None) => {
1149                PgDriver::connect(host, port, user, database).await
1150            }
1151        }
1152    }
1153}