Skip to main content

qail_pg/driver/
mod.rs

1//! PostgreSQL Driver Module (Layer 3: Async I/O)
2//!
3//! Auto-detects the best I/O backend:
4//! - Linux 5.1+: io_uring (fastest)
5//! - Linux < 5.1 / macOS / Windows: tokio
6//!
7//! Connection methods are split across modules for easier maintenance:
8//! - `connection.rs` - Core struct and connect methods
9//! - `io.rs` - send, recv, recv_msg_type_fast
10//! - `query.rs` - query, query_cached, execute_simple
11//! - `transaction.rs` - begin_transaction, commit, rollback
12//! - `cursor.rs` - declare_cursor, fetch_cursor, close_cursor  
13//! - `copy.rs` - COPY protocol for bulk operations
14//! - `pipeline.rs` - High-performance pipelining (275k q/s)
15//! - `cancel.rs` - Query cancellation
16//! - `io_backend.rs` - Runtime I/O backend detection
17
18mod cancel;
19mod connection;
20mod copy;
21mod cursor;
22mod io;
23pub mod io_backend;
24mod pipeline;
25mod pool;
26mod prepared;
27mod query;
28pub mod rls;
29mod row;
30mod stream;
31mod transaction;
32
33pub use connection::PgConnection;
34pub use connection::TlsConfig;
35pub(crate) use connection::{CANCEL_REQUEST_CODE, parse_affected_rows};
36pub use cancel::CancelToken;
37pub use io_backend::{IoBackend, backend_name, detect as detect_io_backend};
38pub use pool::{PgPool, PoolConfig, PoolStats, PooledConnection};
39pub use prepared::PreparedStatement;
40pub use rls::RlsContext;
41pub use row::QailRow;
42
43use qail_core::ast::Qail;
44use std::collections::HashMap;
45use std::sync::Arc;
46
47#[derive(Debug, Clone)]
48pub struct ColumnInfo {
49    pub name_to_index: HashMap<String, usize>,
50    pub oids: Vec<u32>,
51    pub formats: Vec<i16>,
52}
53
54impl ColumnInfo {
55    pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
56        let mut name_to_index = HashMap::with_capacity(fields.len());
57        let mut oids = Vec::with_capacity(fields.len());
58        let mut formats = Vec::with_capacity(fields.len());
59
60        for (i, field) in fields.iter().enumerate() {
61            name_to_index.insert(field.name.clone(), i);
62            oids.push(field.type_oid);
63            formats.push(field.format);
64        }
65
66        Self {
67            name_to_index,
68            oids,
69            formats,
70        }
71    }
72}
73
74/// PostgreSQL row with column data and metadata.
75pub struct PgRow {
76    pub columns: Vec<Option<Vec<u8>>>,
77    pub column_info: Option<Arc<ColumnInfo>>,
78}
79
80/// Error type for PostgreSQL driver operations.
81#[derive(Debug)]
82pub enum PgError {
83    Connection(String),
84    Protocol(String),
85    Auth(String),
86    Query(String),
87    NoRows,
88    /// I/O error
89    Io(std::io::Error),
90    /// Encoding error (parameter limit, etc.)
91    Encode(String),
92}
93
94impl std::fmt::Display for PgError {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            PgError::Connection(e) => write!(f, "Connection error: {}", e),
98            PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
99            PgError::Auth(e) => write!(f, "Auth error: {}", e),
100            PgError::Query(e) => write!(f, "Query error: {}", e),
101            PgError::NoRows => write!(f, "No rows returned"),
102            PgError::Io(e) => write!(f, "I/O error: {}", e),
103            PgError::Encode(e) => write!(f, "Encode error: {}", e),
104        }
105    }
106}
107
108impl std::error::Error for PgError {}
109
110impl From<std::io::Error> for PgError {
111    fn from(e: std::io::Error) -> Self {
112        PgError::Io(e)
113    }
114}
115
116/// Result type for PostgreSQL operations.
117pub type PgResult<T> = Result<T, PgError>;
118
119/// Combines the pure encoder (Layer 2) with async I/O (Layer 3).
120pub struct PgDriver {
121    #[allow(dead_code)]
122    connection: PgConnection,
123    /// Current RLS context, if set. Used for multi-tenant data isolation.
124    rls_context: Option<RlsContext>,
125}
126
127impl PgDriver {
128    /// Create a new driver with an existing connection.
129    pub fn new(connection: PgConnection) -> Self {
130        Self { connection, rls_context: None }
131    }
132
133    /// Builder pattern for ergonomic connection configuration.
134    /// # Example
135    /// ```ignore
136    /// let driver = PgDriver::builder()
137    ///     .host("localhost")
138    ///     .port(5432)
139    ///     .user("admin")
140    ///     .database("mydb")
141    ///     .password("secret")  // Optional
142    ///     .connect()
143    ///     .await?;
144    /// ```
145    pub fn builder() -> PgDriverBuilder {
146        PgDriverBuilder::new()
147    }
148
149    /// Connect to PostgreSQL and create a driver (trust mode, no password).
150    pub async fn connect(host: &str, port: u16, user: &str, database: &str) -> PgResult<Self> {
151        let connection = PgConnection::connect(host, port, user, database).await?;
152        Ok(Self::new(connection))
153    }
154
155    /// Connect to PostgreSQL with password authentication (SCRAM-SHA-256).
156    pub async fn connect_with_password(
157        host: &str,
158        port: u16,
159        user: &str,
160        database: &str,
161        password: &str,
162    ) -> PgResult<Self> {
163        let connection =
164            PgConnection::connect_with_password(host, port, user, database, Some(password)).await?;
165        Ok(Self::new(connection))
166    }
167
168    /// Connect using DATABASE_URL environment variable.
169    /// 
170    /// Parses the URL format: `postgresql://user:password@host:port/database`
171    /// or `postgres://user:password@host:port/database`
172    /// 
173    /// # Example
174    /// ```ignore
175    /// // Set DATABASE_URL=postgresql://user:pass@localhost:5432/mydb
176    /// let driver = PgDriver::connect_env().await?;
177    /// ```
178    pub async fn connect_env() -> PgResult<Self> {
179        let url = std::env::var("DATABASE_URL")
180            .map_err(|_| PgError::Connection("DATABASE_URL environment variable not set".to_string()))?;
181        Self::connect_url(&url).await
182    }
183
184    /// Connect using a PostgreSQL connection URL.
185    /// 
186    /// Parses the URL format: `postgresql://user:password@host:port/database`
187    /// or `postgres://user:password@host:port/database`
188    /// 
189    /// # Example
190    /// ```ignore
191    /// let driver = PgDriver::connect_url("postgresql://user:pass@localhost:5432/mydb").await?;
192    /// ```
193    pub async fn connect_url(url: &str) -> PgResult<Self> {
194        let (host, port, user, database, password) = Self::parse_database_url(url)?;
195        
196        if let Some(pwd) = password {
197            Self::connect_with_password(&host, port, &user, &database, &pwd).await
198        } else {
199            Self::connect(&host, port, &user, &database).await
200        }
201    }
202
203    /// Parse a PostgreSQL connection URL into components.
204    /// 
205    /// Format: `postgresql://user:password@host:port/database`
206    /// or `postgres://user:password@host:port/database`
207    /// 
208    /// URL percent-encoding is automatically decoded for user and password.
209    fn parse_database_url(url: &str) -> PgResult<(String, u16, String, String, Option<String>)> {
210        // Remove scheme (postgresql:// or postgres://)
211        let after_scheme = url.split("://").nth(1)
212            .ok_or_else(|| PgError::Connection("Invalid DATABASE_URL: missing scheme".to_string()))?;
213        
214        // Split into auth@host parts
215        let (auth_part, host_db_part) = if let Some(at_pos) = after_scheme.rfind('@') {
216            (Some(&after_scheme[..at_pos]), &after_scheme[at_pos + 1..])
217        } else {
218            (None, after_scheme)
219        };
220        
221        // Parse auth (user:password)
222        let (user, password) = if let Some(auth) = auth_part {
223            let parts: Vec<&str> = auth.splitn(2, ':').collect();
224            if parts.len() == 2 {
225                // URL-decode both user and password
226                (
227                    Self::percent_decode(parts[0]),
228                    Some(Self::percent_decode(parts[1])),
229                )
230            } else {
231                (Self::percent_decode(parts[0]), None)
232            }
233        } else {
234            return Err(PgError::Connection("Invalid DATABASE_URL: missing user".to_string()));
235        };
236        
237        // Parse host:port/database
238        let (host_port, database) = if let Some(slash_pos) = host_db_part.find('/') {
239            (&host_db_part[..slash_pos], host_db_part[slash_pos + 1..].to_string())
240        } else {
241            return Err(PgError::Connection("Invalid DATABASE_URL: missing database name".to_string()));
242        };
243        
244        // Parse host:port
245        let (host, port) = if let Some(colon_pos) = host_port.rfind(':') {
246            let port_str = &host_port[colon_pos + 1..];
247            let port = port_str.parse::<u16>()
248                .map_err(|_| PgError::Connection(format!("Invalid port: {}", port_str)))?;
249            (host_port[..colon_pos].to_string(), port)
250        } else {
251            (host_port.to_string(), 5432) // Default PostgreSQL port
252        };
253        
254        Ok((host, port, user, database, password))
255    }
256    
257    /// Decode URL percent-encoded string.
258    /// Handles common encodings: %20 (space), %2B (+), %3D (=), %40 (@), %2F (/), etc.
259    fn percent_decode(s: &str) -> String {
260        let mut result = String::with_capacity(s.len());
261        let mut chars = s.chars().peekable();
262        
263        while let Some(c) = chars.next() {
264            if c == '%' {
265                // Try to parse next two chars as hex
266                let hex: String = chars.by_ref().take(2).collect();
267                if hex.len() == 2
268                    && let Ok(byte) = u8::from_str_radix(&hex, 16)
269                {
270                    result.push(byte as char);
271                    continue;
272                }
273                // If parsing failed, keep original
274                result.push('%');
275                result.push_str(&hex);
276            } else if c == '+' {
277                // '+' often represents space in query strings (form encoding)
278                // But in path components, keep as-is. PostgreSQL URLs use path encoding.
279                result.push('+');
280            } else {
281                result.push(c);
282            }
283        }
284        
285        result
286    }
287
288    /// Connect to PostgreSQL with a connection timeout.
289    /// If the connection cannot be established within the timeout, returns an error.
290    /// # Example
291    /// ```ignore
292    /// use std::time::Duration;
293    /// let driver = PgDriver::connect_with_timeout(
294    ///     "localhost", 5432, "user", "db", "password",
295    ///     Duration::from_secs(5)
296    /// ).await?;
297    /// ```
298    pub async fn connect_with_timeout(
299        host: &str,
300        port: u16,
301        user: &str,
302        database: &str,
303        password: &str,
304        timeout: std::time::Duration,
305    ) -> PgResult<Self> {
306        tokio::time::timeout(
307            timeout,
308            Self::connect_with_password(host, port, user, database, password),
309        )
310        .await
311        .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
312    }
313    /// Clear the prepared statement cache.
314    /// Frees memory by removing all cached statements.
315    /// Note: Statements remain on the PostgreSQL server until connection closes.
316    pub fn clear_cache(&mut self) {
317        self.connection.stmt_cache.clear();
318        self.connection.prepared_statements.clear();
319    }
320
321    /// Get cache statistics.
322    /// Returns (current_size, max_capacity).
323    pub fn cache_stats(&self) -> (usize, usize) {
324        (self.connection.stmt_cache.len(), self.connection.stmt_cache.cap().get())
325    }
326
327    /// Execute a QAIL command and fetch all rows (CACHED + ZERO-ALLOC).
328    /// **Default method** - uses prepared statement caching for best performance.
329    /// On first call: sends Parse + Bind + Execute + Sync
330    /// On subsequent calls with same SQL: sends only Bind + Execute (SKIPS Parse!)
331    /// Uses LRU cache with max 1000 statements (auto-evicts oldest).
332    pub async fn fetch_all(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
333        // Delegate to fetch_all_cached for cached-by-default behavior
334        self.fetch_all_cached(cmd).await
335    }
336
337    /// Execute a QAIL command and fetch all rows as a typed struct.
338    /// Requires the target type to implement `QailRow` trait.
339    /// 
340    /// # Example
341    /// ```ignore
342    /// let users: Vec<User> = driver.fetch_typed::<User>(&query).await?;
343    /// ```
344    pub async fn fetch_typed<T: row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Vec<T>> {
345        let rows = self.fetch_all(cmd).await?;
346        Ok(rows.iter().map(T::from_row).collect())
347    }
348
349    /// Execute a QAIL command and fetch a single row as a typed struct.
350    /// Returns None if no rows are returned.
351    pub async fn fetch_one_typed<T: row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Option<T>> {
352        let rows = self.fetch_all(cmd).await?;
353        Ok(rows.first().map(T::from_row))
354    }
355
356    /// Execute a QAIL command and fetch all rows (UNCACHED).
357    /// Sends Parse + Bind + Execute on every call.
358    /// Use for one-off queries or when caching is not desired.
359    pub async fn fetch_all_uncached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
360        use crate::protocol::AstEncoder;
361
362        let wire_bytes = AstEncoder::encode_cmd_reuse(
363            cmd,
364            &mut self.connection.sql_buf,
365            &mut self.connection.params_buf,
366        );
367
368        self.connection.send_bytes(&wire_bytes).await?;
369
370        let mut rows: Vec<PgRow> = Vec::new();
371        let mut column_info: Option<Arc<ColumnInfo>> = None;
372
373        let mut error: Option<PgError> = None;
374
375        loop {
376            let msg = self.connection.recv().await?;
377            match msg {
378                crate::protocol::BackendMessage::ParseComplete
379                | crate::protocol::BackendMessage::BindComplete => {}
380                crate::protocol::BackendMessage::RowDescription(fields) => {
381                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
382                }
383                crate::protocol::BackendMessage::DataRow(data) => {
384                    if error.is_none() {
385                        rows.push(PgRow {
386                            columns: data,
387                            column_info: column_info.clone(),
388                        });
389                    }
390                }
391                crate::protocol::BackendMessage::CommandComplete(_) => {}
392                crate::protocol::BackendMessage::ReadyForQuery(_) => {
393                    if let Some(err) = error {
394                        return Err(err);
395                    }
396                    return Ok(rows);
397                }
398                crate::protocol::BackendMessage::ErrorResponse(err) => {
399                    if error.is_none() {
400                        error = Some(PgError::Query(err.message));
401                    }
402                }
403                _ => {}
404            }
405        }
406    }
407
408    /// Execute a QAIL command and fetch all rows (FAST VERSION).
409    /// Uses optimized recv_with_data_fast for faster response parsing.
410    /// Skips column metadata collection for maximum speed.
411    pub async fn fetch_all_fast(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
412        use crate::protocol::AstEncoder;
413
414        let wire_bytes = AstEncoder::encode_cmd_reuse(
415            cmd,
416            &mut self.connection.sql_buf,
417            &mut self.connection.params_buf,
418        );
419
420        self.connection.send_bytes(&wire_bytes).await?;
421
422        // Collect results using FAST receiver
423        let mut rows: Vec<PgRow> = Vec::new();
424        let mut error: Option<PgError> = None;
425
426        loop {
427            let res = self.connection.recv_with_data_fast().await;
428            match res {
429                Ok((msg_type, data)) => {
430                    match msg_type {
431                        b'D' => {
432                             // DataRow
433                            if error.is_none() && let Some(columns) = data {
434                                rows.push(PgRow {
435                                    columns,
436                                    column_info: None, // Skip metadata for speed
437                                });
438                            }
439                        }
440                        b'Z' => {
441                            // ReadyForQuery
442                            if let Some(err) = error {
443                                return Err(err);
444                            }
445                            return Ok(rows);
446                        }
447                        _ => {} // 1, 2, C, T - skip Parse/Bind/CommandComplete/RowDescription
448                    }
449                }
450                Err(e) => {
451                   // recv_with_data_fast returns Err on ErrorResponse automatically.
452                   // We need to capture it and continue draining.
453                   // BUT recv_with_data_fast doesn't return the error *message type* if it fails.
454                   // It returns PgError::Query(msg).
455                   // So we capture the error, but we must continue RECVing until ReadyForQuery.
456                   // However, recv_with_data_fast will KEEP returning Err(Query) if the buffer has E?
457                   // No, recv_with_data_fast consumes the E message before returning Err.
458                   
459                   if error.is_none() {
460                       error = Some(e);
461                   }
462                   // Continue loop to drain until ReadyForQuery... 
463                   // BUT wait, does recv_with_data_fast handle the *rest* of the stream?
464                   // If we call it again, it will read the NEXT message.
465                   // So we just continue.
466                }
467            }
468        }
469    }
470
471    /// Execute a QAIL command and fetch one row.
472    pub async fn fetch_one(&mut self, cmd: &Qail) -> PgResult<PgRow> {
473        let rows = self.fetch_all(cmd).await?;
474        rows.into_iter().next().ok_or(PgError::NoRows)
475    }
476
477    /// Execute a QAIL command with PREPARED STATEMENT CACHING.
478    /// Like fetch_all(), but caches the prepared statement on the server.
479    /// On first call: sends Parse + Describe + Bind + Execute + Sync
480    /// On subsequent calls: sends only Bind + Execute + Sync (SKIPS Parse!)
481    /// Column metadata (RowDescription) is cached alongside the statement
482    /// so that by-name column access works on every call.
483    pub async fn fetch_all_cached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
484        use crate::protocol::AstEncoder;
485        use std::collections::hash_map::DefaultHasher;
486        use std::hash::{Hash, Hasher};
487
488        self.connection.sql_buf.clear();
489        self.connection.params_buf.clear();
490        
491        // Encode SQL to reusable buffer
492        match cmd.action {
493            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
494                crate::protocol::ast_encoder::dml::encode_select(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
495            }
496            qail_core::ast::Action::Add => {
497                crate::protocol::ast_encoder::dml::encode_insert(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
498            }
499            qail_core::ast::Action::Set => {
500                crate::protocol::ast_encoder::dml::encode_update(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
501            }
502            qail_core::ast::Action::Del => {
503                crate::protocol::ast_encoder::dml::encode_delete(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
504            }
505            _ => {
506                // Fallback for unsupported actions
507                let (sql, params) = AstEncoder::encode_cmd_sql(cmd);
508                let raw_rows = self.connection.query_cached(&sql, &params).await?;
509                return Ok(raw_rows.into_iter().map(|data| PgRow { columns: data, column_info: None }).collect());
510            }
511        }
512
513        let mut hasher = DefaultHasher::new();
514        self.connection.sql_buf.hash(&mut hasher);
515        let sql_hash = hasher.finish();
516
517        let is_cache_miss = !self.connection.stmt_cache.contains(&sql_hash);
518
519        let stmt_name = if let Some(name) = self.connection.stmt_cache.get(&sql_hash) {
520            name.clone()
521        } else {
522            let name = format!("qail_{:x}", sql_hash);
523            
524            use crate::protocol::PgEncoder;
525            use tokio::io::AsyncWriteExt;
526            
527            let sql_str = std::str::from_utf8(&self.connection.sql_buf).unwrap_or("");
528            
529            // Send Parse + Describe(Statement) to get RowDescription on first call
530            let parse_msg = PgEncoder::encode_parse(&name, sql_str, &[]);
531            let describe_msg = PgEncoder::encode_describe(false, &name);
532            self.connection.stream.write_all(&parse_msg).await?;
533            self.connection.stream.write_all(&describe_msg).await?;
534            
535            self.connection.stmt_cache.put(sql_hash, name.clone());
536            self.connection.prepared_statements.insert(name.clone(), sql_str.to_string());
537            
538            name
539        };
540
541        // Send Bind + Execute + Sync (always)
542        use crate::protocol::PgEncoder;
543        use tokio::io::AsyncWriteExt;
544        
545        let mut buf = bytes::BytesMut::with_capacity(128);
546        PgEncoder::encode_bind_to(&mut buf, &stmt_name, &self.connection.params_buf)
547            .map_err(|e| PgError::Encode(e.to_string()))?;
548        PgEncoder::encode_execute_to(&mut buf);
549        PgEncoder::encode_sync_to(&mut buf);
550        self.connection.stream.write_all(&buf).await?;
551
552        // On cache hit, use the previously cached ColumnInfo
553        let cached_column_info = self.connection.column_info_cache.get(&sql_hash).cloned();
554
555        let mut rows: Vec<PgRow> = Vec::new();
556        let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
557        let mut error: Option<PgError> = None;
558
559        loop {
560            let msg = self.connection.recv().await?;
561            match msg {
562                crate::protocol::BackendMessage::ParseComplete
563                | crate::protocol::BackendMessage::BindComplete => {}
564                crate::protocol::BackendMessage::ParameterDescription(_) => {
565                    // Sent after Describe(Statement) — ignore
566                }
567                crate::protocol::BackendMessage::RowDescription(fields) => {
568                    // Received after Describe(Statement) on cache miss
569                    let info = Arc::new(ColumnInfo::from_fields(&fields));
570                    if is_cache_miss {
571                        self.connection.column_info_cache.insert(sql_hash, info.clone());
572                    }
573                    column_info = Some(info);
574                }
575                crate::protocol::BackendMessage::DataRow(data) => {
576                    if error.is_none() {
577                        rows.push(PgRow {
578                            columns: data,
579                            column_info: column_info.clone(),
580                        });
581                    }
582                }
583                crate::protocol::BackendMessage::CommandComplete(_) => {}
584                crate::protocol::BackendMessage::NoData => {
585                    // Sent by Describe for statements that return no data (e.g. pure UPDATE without RETURNING)
586                }
587                crate::protocol::BackendMessage::ReadyForQuery(_) => {
588                    if let Some(err) = error {
589                        return Err(err);
590                    }
591                    return Ok(rows);
592                }
593                crate::protocol::BackendMessage::ErrorResponse(err) => {
594                    if error.is_none() {
595                        error = Some(PgError::Query(err.message));
596                        // Invalidate cache to prevent "prepared statement does not exist"
597                        // on next retry if the error happened during Parse/Bind.
598                        self.connection.stmt_cache.clear();
599                        self.connection.prepared_statements.clear();
600                        self.connection.column_info_cache.clear();
601                    }
602                }
603                _ => {}
604            }
605        }
606    }
607
608    /// Execute a QAIL command (for mutations) - ZERO-ALLOC.
609    pub async fn execute(&mut self, cmd: &Qail) -> PgResult<u64> {
610        use crate::protocol::AstEncoder;
611
612        let wire_bytes = AstEncoder::encode_cmd_reuse(
613            cmd,
614            &mut self.connection.sql_buf,
615            &mut self.connection.params_buf,
616        );
617
618        self.connection.send_bytes(&wire_bytes).await?;
619
620        let mut affected = 0u64;
621        let mut error: Option<PgError> = None;
622
623        loop {
624            let msg = self.connection.recv().await?;
625            match msg {
626                crate::protocol::BackendMessage::ParseComplete
627                | crate::protocol::BackendMessage::BindComplete => {}
628                crate::protocol::BackendMessage::RowDescription(_) => {}
629                crate::protocol::BackendMessage::DataRow(_) => {}
630                crate::protocol::BackendMessage::CommandComplete(tag) => {
631                    if error.is_none() && let Some(n) = tag.split_whitespace().last() {
632                        affected = n.parse().unwrap_or(0);
633                    }
634                }
635                crate::protocol::BackendMessage::ReadyForQuery(_) => {
636                    if let Some(err) = error {
637                        return Err(err);
638                    }
639                    return Ok(affected);
640                }
641                crate::protocol::BackendMessage::ErrorResponse(err) => {
642                    if error.is_none() {
643                        error = Some(PgError::Query(err.message));
644                    }
645                }
646                _ => {}
647            }
648        }
649    }
650
651    // ==================== TRANSACTION CONTROL ====================
652
653    /// Begin a transaction (AST-native).
654    pub async fn begin(&mut self) -> PgResult<()> {
655        self.connection.begin_transaction().await
656    }
657
658    /// Commit the current transaction (AST-native).
659    pub async fn commit(&mut self) -> PgResult<()> {
660        self.connection.commit().await
661    }
662
663    /// Rollback the current transaction (AST-native).
664    pub async fn rollback(&mut self) -> PgResult<()> {
665        self.connection.rollback().await
666    }
667
668    /// Create a named savepoint within the current transaction.
669    /// Savepoints allow partial rollback within a transaction.
670    /// Use `rollback_to()` to return to this savepoint.
671    /// # Example
672    /// ```ignore
673    /// driver.begin().await?;
674    /// driver.execute(&insert1).await?;
675    /// driver.savepoint("sp1").await?;
676    /// driver.execute(&insert2).await?;
677    /// driver.rollback_to("sp1").await?; // Undo insert2, keep insert1
678    /// driver.commit().await?;
679    /// ```
680    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
681        self.connection.savepoint(name).await
682    }
683
684    /// Rollback to a previously created savepoint.
685    /// Discards all changes since the named savepoint was created,
686    /// but keeps the transaction open.
687    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
688        self.connection.rollback_to(name).await
689    }
690
691    /// Release a savepoint (free resources, if no longer needed).
692    /// After release, the savepoint cannot be rolled back to.
693    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
694        self.connection.release_savepoint(name).await
695    }
696
697    // ==================== BATCH TRANSACTIONS ====================
698
699    /// Execute multiple commands in a single atomic transaction.
700    /// All commands succeed or all are rolled back.
701    /// # Example
702    /// ```ignore
703    /// let cmds = vec![
704    ///     Qail::add("users").columns(["name"]).values(["Alice"]),
705    ///     Qail::add("users").columns(["name"]).values(["Bob"]),
706    /// ];
707    /// let results = driver.execute_batch(&cmds).await?;
708    /// // results = [1, 1] (rows affected)
709    /// ```
710    pub async fn execute_batch(&mut self, cmds: &[Qail]) -> PgResult<Vec<u64>> {
711        self.begin().await?;
712        let mut results = Vec::with_capacity(cmds.len());
713        for cmd in cmds {
714            match self.execute(cmd).await {
715                Ok(n) => results.push(n),
716                Err(e) => {
717                    self.rollback().await?;
718                    return Err(e);
719                }
720            }
721        }
722        self.commit().await?;
723        Ok(results)
724    }
725
726    // ==================== STATEMENT TIMEOUT ====================
727
728    /// Set statement timeout for this connection (in milliseconds).
729    /// # Example
730    /// ```ignore
731    /// driver.set_statement_timeout(30_000).await?; // 30 seconds
732    /// ```
733    pub async fn set_statement_timeout(&mut self, ms: u32) -> PgResult<()> {
734        self.execute_raw(&format!("SET statement_timeout = {}", ms))
735            .await
736    }
737
738    /// Reset statement timeout to default (no limit).
739    pub async fn reset_statement_timeout(&mut self) -> PgResult<()> {
740        self.execute_raw("RESET statement_timeout").await
741    }
742
743    // ==================== RLS (MULTI-TENANT) ====================
744
745    /// Set the RLS context for multi-tenant data isolation.
746    ///
747    /// Configures PostgreSQL session variables (`app.current_operator_id`, etc.)
748    /// so that RLS policies automatically filter data by tenant.
749    ///
750    /// Since `PgDriver` takes `&mut self`, the borrow checker guarantees
751    /// that `set_config` and all subsequent queries execute on the **same
752    /// connection** — no pool race conditions possible.
753    ///
754    /// # Example
755    /// ```ignore
756    /// driver.set_rls_context(RlsContext::operator("op-123")).await?;
757    /// let orders = driver.fetch_all(&Qail::get("orders")).await?;
758    /// // orders only contains rows where operator_id = 'op-123'
759    /// ```
760    pub async fn set_rls_context(&mut self, ctx: rls::RlsContext) -> PgResult<()> {
761        let sql = rls::context_to_sql(&ctx);
762        self.execute_raw(&sql).await?;
763        self.rls_context = Some(ctx);
764        Ok(())
765    }
766
767    /// Clear the RLS context, resetting session variables to safe defaults.
768    ///
769    /// After clearing, all RLS-protected queries will return zero rows
770    /// (empty operator_id matches nothing).
771    pub async fn clear_rls_context(&mut self) -> PgResult<()> {
772        self.execute_raw(rls::reset_sql()).await?;
773        self.rls_context = None;
774        Ok(())
775    }
776
777    /// Get the current RLS context, if any.
778    pub fn rls_context(&self) -> Option<&rls::RlsContext> {
779        self.rls_context.as_ref()
780    }
781
782    // ==================== PIPELINE (BATCH) ====================
783
784    /// Execute multiple Qail ASTs in a single network round-trip (PIPELINING).
785    /// # Example
786    /// ```ignore
787    /// let cmds: Vec<Qail> = (1..=1000)
788    ///     .map(|i| Qail::get("harbors").columns(["id", "name"]).limit(i))
789    ///     .collect();
790    /// let count = driver.pipeline_batch(&cmds).await?;
791    /// assert_eq!(count, 1000);
792    /// ```
793    pub async fn pipeline_batch(&mut self, cmds: &[Qail]) -> PgResult<usize> {
794        self.connection.pipeline_ast_fast(cmds).await
795    }
796
797    /// Execute multiple Qail ASTs and return full row data.
798    pub async fn pipeline_fetch(&mut self, cmds: &[Qail]) -> PgResult<Vec<Vec<PgRow>>> {
799        let raw_results = self.connection.pipeline_ast(cmds).await?;
800
801        let results: Vec<Vec<PgRow>> = raw_results
802            .into_iter()
803            .map(|rows| {
804                rows.into_iter()
805                    .map(|columns| PgRow {
806                        columns,
807                        column_info: None,
808                    })
809                    .collect()
810            })
811            .collect();
812
813        Ok(results)
814    }
815
816    /// Prepare a SQL statement for repeated execution.
817    pub async fn prepare(&mut self, sql: &str) -> PgResult<PreparedStatement> {
818        self.connection.prepare(sql).await
819    }
820
821    /// Execute a prepared statement pipeline in FAST mode (count only).
822    pub async fn pipeline_prepared_fast(
823        &mut self,
824        stmt: &PreparedStatement,
825        params_batch: &[Vec<Option<Vec<u8>>>],
826    ) -> PgResult<usize> {
827        self.connection
828            .pipeline_prepared_fast(stmt, params_batch)
829            .await
830    }
831
832    // ==================== LEGACY/BOOTSTRAP ====================
833
834    /// Execute a raw SQL string.
835    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
836    /// Use for bootstrap DDL only (e.g., migration table creation).
837    /// For transactions, use `begin()`, `commit()`, `rollback()`.
838    pub async fn execute_raw(&mut self, sql: &str) -> PgResult<()> {
839        // Reject literal NULL bytes - they corrupt PostgreSQL connection state
840        if sql.as_bytes().contains(&0) {
841            return Err(crate::PgError::Protocol(
842                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
843            ));
844        }
845        self.connection.execute_simple(sql).await
846    }
847
848    /// Execute a raw SQL query and return rows.
849    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
850    /// Use for bootstrap/admin queries only.
851    pub async fn fetch_raw(&mut self, sql: &str) -> PgResult<Vec<PgRow>> {
852        if sql.as_bytes().contains(&0) {
853            return Err(crate::PgError::Protocol(
854                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
855            ));
856        }
857        
858        use tokio::io::AsyncWriteExt;
859        use crate::protocol::PgEncoder;
860        
861        // Use simple query protocol (no prepared statements)
862        let msg = PgEncoder::encode_query_string(sql);
863        self.connection.stream.write_all(&msg).await?;
864        
865        let mut rows: Vec<PgRow> = Vec::new();
866        let mut column_info: Option<std::sync::Arc<ColumnInfo>> = None;
867        
868
869        let mut error: Option<PgError> = None;
870
871        loop {
872            let msg = self.connection.recv().await?;
873            match msg {
874                crate::protocol::BackendMessage::RowDescription(fields) => {
875                    column_info = Some(std::sync::Arc::new(ColumnInfo::from_fields(&fields)));
876                }
877                crate::protocol::BackendMessage::DataRow(data) => {
878                    if error.is_none() {
879                        rows.push(PgRow {
880                            columns: data,
881                            column_info: column_info.clone(),
882                        });
883                    }
884                }
885                crate::protocol::BackendMessage::CommandComplete(_) => {}
886                crate::protocol::BackendMessage::ReadyForQuery(_) => {
887                    if let Some(err) = error {
888                        return Err(err);
889                    }
890                    return Ok(rows);
891                }
892                crate::protocol::BackendMessage::ErrorResponse(err) => {
893                    if error.is_none() {
894                        error = Some(PgError::Query(err.message));
895                    }
896                }
897                _ => {}
898            }
899        }
900    }
901
902    /// Bulk insert data using PostgreSQL COPY protocol (AST-native).
903    /// Uses a Qail::Add to get validated table and column names from the AST,
904    /// not user-provided strings. This is the sound, AST-native approach.
905    /// # Example
906    /// ```ignore
907    /// // Create a Qail::Add to define table and columns
908    /// let cmd = Qail::add("users")
909    ///     .columns(["id", "name", "email"]);
910    /// // Bulk insert rows
911    /// let rows: Vec<Vec<Value>> = vec![
912    ///     vec![Value::Int(1), Value::String("Alice"), Value::String("alice@ex.com")],
913    ///     vec![Value::Int(2), Value::String("Bob"), Value::String("bob@ex.com")],
914    /// ];
915    /// driver.copy_bulk(&cmd, &rows).await?;
916    /// ```
917    pub async fn copy_bulk(
918        &mut self,
919        cmd: &Qail,
920        rows: &[Vec<qail_core::ast::Value>],
921    ) -> PgResult<u64> {
922        use qail_core::ast::Action;
923
924
925        if cmd.action != Action::Add {
926            return Err(PgError::Query(
927                "copy_bulk requires Qail::Add action".to_string(),
928            ));
929        }
930
931        let table = &cmd.table;
932
933        let columns: Vec<String> = cmd
934            .columns
935            .iter()
936            .filter_map(|expr| {
937                use qail_core::ast::Expr;
938                match expr {
939                    Expr::Named(name) => Some(name.clone()),
940                    Expr::Aliased { name, .. } => Some(name.clone()),
941                    Expr::Star => None, // Can't COPY with *
942                    _ => None,
943                }
944            })
945            .collect();
946
947        if columns.is_empty() {
948            return Err(PgError::Query(
949                "copy_bulk requires columns in Qail".to_string(),
950            ));
951        }
952
953        // Use optimized COPY path: direct Value → bytes encoding, single syscall
954        self.connection.copy_in_fast(table, &columns, rows).await
955    }
956
957    /// **Fastest** bulk insert using pre-encoded COPY data.
958    /// Accepts raw COPY text format bytes. Use when caller has already
959    /// encoded rows to avoid any encoding overhead.
960    /// # Format
961    /// Data should be tab-separated rows with newlines (COPY text format):
962    /// `1\thello\t3.14\n2\tworld\t2.71\n`
963    /// # Example
964    /// ```ignore
965    /// let cmd = Qail::add("users").columns(["id", "name"]);
966    /// let data = b"1\tAlice\n2\tBob\n";
967    /// driver.copy_bulk_bytes(&cmd, data).await?;
968    /// ```
969    pub async fn copy_bulk_bytes(&mut self, cmd: &Qail, data: &[u8]) -> PgResult<u64> {
970        use qail_core::ast::Action;
971
972        if cmd.action != Action::Add {
973            return Err(PgError::Query(
974                "copy_bulk_bytes requires Qail::Add action".to_string(),
975            ));
976        }
977
978        let table = &cmd.table;
979        let columns: Vec<String> = cmd
980            .columns
981            .iter()
982            .filter_map(|expr| {
983                use qail_core::ast::Expr;
984                match expr {
985                    Expr::Named(name) => Some(name.clone()),
986                    Expr::Aliased { name, .. } => Some(name.clone()),
987                    _ => None,
988                }
989            })
990            .collect();
991
992        if columns.is_empty() {
993            return Err(PgError::Query(
994                "copy_bulk_bytes requires columns in Qail".to_string(),
995            ));
996        }
997
998        // Direct to raw COPY - zero encoding!
999        self.connection.copy_in_raw(table, &columns, data).await
1000    }
1001
1002    /// Export table data using PostgreSQL COPY TO STDOUT (zero-copy streaming).
1003    /// Returns rows as tab-separated bytes for direct re-import via copy_bulk_bytes.
1004    /// # Example
1005    /// ```ignore
1006    /// let data = driver.copy_export_table("users", &["id", "name"]).await?;
1007    /// shadow_driver.copy_bulk_bytes(&cmd, &data).await?;
1008    /// ```
1009    pub async fn copy_export_table(
1010        &mut self,
1011        table: &str,
1012        columns: &[String],
1013    ) -> PgResult<Vec<u8>> {
1014        let cols = columns.join(", ");
1015        let sql = format!("COPY {} ({}) TO STDOUT", table, cols);
1016        
1017        self.connection.copy_out_raw(&sql).await
1018    }
1019
1020    /// Stream large result sets using PostgreSQL cursors.
1021    /// This method uses DECLARE CURSOR internally to stream rows in batches,
1022    /// avoiding loading the entire result set into memory.
1023    /// # Example
1024    /// ```ignore
1025    /// let cmd = Qail::get("large_table");
1026    /// let batches = driver.stream_cmd(&cmd, 100).await?;
1027    /// for batch in batches {
1028    ///     for row in batch {
1029    ///         // process row
1030    ///     }
1031    /// }
1032    /// ```
1033    pub async fn stream_cmd(
1034        &mut self,
1035        cmd: &Qail,
1036        batch_size: usize,
1037    ) -> PgResult<Vec<Vec<PgRow>>> {
1038        use std::sync::atomic::{AtomicU64, Ordering};
1039        static CURSOR_ID: AtomicU64 = AtomicU64::new(0);
1040
1041        let cursor_name = format!("qail_cursor_{}", CURSOR_ID.fetch_add(1, Ordering::SeqCst));
1042
1043        // AST-NATIVE: Generate SQL directly from AST (no to_sql_parameterized!)
1044        use crate::protocol::AstEncoder;
1045        let mut sql_buf = bytes::BytesMut::with_capacity(256);
1046        let mut params: Vec<Option<Vec<u8>>> = Vec::new();
1047        AstEncoder::encode_select_sql(cmd, &mut sql_buf, &mut params);
1048        let sql = String::from_utf8_lossy(&sql_buf).to_string();
1049
1050        // Must be in a transaction for cursors
1051        self.connection.begin_transaction().await?;
1052
1053        // Declare cursor
1054        self.connection.declare_cursor(&cursor_name, &sql).await?;
1055
1056        // Fetch all batches
1057        let mut all_batches = Vec::new();
1058        while let Some(rows) = self
1059            .connection
1060            .fetch_cursor(&cursor_name, batch_size)
1061            .await?
1062        {
1063            let pg_rows: Vec<PgRow> = rows
1064                .into_iter()
1065                .map(|cols| PgRow {
1066                    columns: cols,
1067                    column_info: None,
1068                })
1069                .collect();
1070            all_batches.push(pg_rows);
1071        }
1072
1073        self.connection.close_cursor(&cursor_name).await?;
1074        self.connection.commit().await?;
1075
1076        Ok(all_batches)
1077    }
1078}
1079
1080// ============================================================================
1081// Connection Builder
1082// ============================================================================
1083
1084/// Builder for creating PgDriver connections with named parameters.
1085/// # Example
1086/// ```ignore
1087/// let driver = PgDriver::builder()
1088///     .host("localhost")
1089///     .port(5432)
1090///     .user("admin")
1091///     .database("mydb")
1092///     .password("secret")
1093///     .connect()
1094///     .await?;
1095/// ```
1096#[derive(Default)]
1097pub struct PgDriverBuilder {
1098    host: Option<String>,
1099    port: Option<u16>,
1100    user: Option<String>,
1101    database: Option<String>,
1102    password: Option<String>,
1103    timeout: Option<std::time::Duration>,
1104}
1105
1106impl PgDriverBuilder {
1107    /// Create a new builder with default values.
1108    pub fn new() -> Self {
1109        Self::default()
1110    }
1111
1112    /// Set the host (default: "127.0.0.1").
1113    pub fn host(mut self, host: impl Into<String>) -> Self {
1114        self.host = Some(host.into());
1115        self
1116    }
1117
1118    /// Set the port (default: 5432).
1119    pub fn port(mut self, port: u16) -> Self {
1120        self.port = Some(port);
1121        self
1122    }
1123
1124    /// Set the username (required).
1125    pub fn user(mut self, user: impl Into<String>) -> Self {
1126        self.user = Some(user.into());
1127        self
1128    }
1129
1130    /// Set the database name (required).
1131    pub fn database(mut self, database: impl Into<String>) -> Self {
1132        self.database = Some(database.into());
1133        self
1134    }
1135
1136    /// Set the password (optional, for SCRAM-SHA-256 auth).
1137    pub fn password(mut self, password: impl Into<String>) -> Self {
1138        self.password = Some(password.into());
1139        self
1140    }
1141
1142    /// Set connection timeout (optional).
1143    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
1144        self.timeout = Some(timeout);
1145        self
1146    }
1147
1148    /// Connect to PostgreSQL using the configured parameters.
1149    pub async fn connect(self) -> PgResult<PgDriver> {
1150        let host = self.host.as_deref().unwrap_or("127.0.0.1");
1151        let port = self.port.unwrap_or(5432);
1152        let user = self.user.as_deref().ok_or_else(|| {
1153            PgError::Connection("User is required".to_string())
1154        })?;
1155        let database = self.database.as_deref().ok_or_else(|| {
1156            PgError::Connection("Database is required".to_string())
1157        })?;
1158
1159        match (self.password.as_deref(), self.timeout) {
1160            (Some(password), Some(timeout)) => {
1161                PgDriver::connect_with_timeout(host, port, user, database, password, timeout).await
1162            }
1163            (Some(password), None) => {
1164                PgDriver::connect_with_password(host, port, user, database, password).await
1165            }
1166            (None, Some(timeout)) => {
1167                tokio::time::timeout(
1168                    timeout,
1169                    PgDriver::connect(host, port, user, database),
1170                )
1171                .await
1172                .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
1173            }
1174            (None, None) => {
1175                PgDriver::connect(host, port, user, database).await
1176            }
1177        }
1178    }
1179}