Skip to main content

qail_pg/driver/
mod.rs

1//! PostgreSQL Driver Module (Layer 3: Async I/O)
2//!
3//! Auto-detects the best I/O backend:
4//! - Linux 5.1+: io_uring (fastest)
5//! - Linux < 5.1 / macOS / Windows: tokio
6//!
7//! Connection methods are split across modules for easier maintenance:
8//! - `connection.rs` - Core struct and connect methods
9//! - `io.rs` - send, recv, recv_msg_type_fast
10//! - `query.rs` - query, query_cached, execute_simple
11//! - `transaction.rs` - begin_transaction, commit, rollback
12//! - `cursor.rs` - declare_cursor, fetch_cursor, close_cursor  
13//! - `copy.rs` - COPY protocol for bulk operations
14//! - `pipeline.rs` - High-performance pipelining (275k q/s)
15//! - `cancel.rs` - Query cancellation
16//! - `io_backend.rs` - Runtime I/O backend detection
17
18mod cancel;
19mod connection;
20mod copy;
21mod cursor;
22mod io;
23pub mod io_backend;
24mod pipeline;
25mod pool;
26mod prepared;
27mod query;
28pub mod rls;
29mod row;
30mod stream;
31mod transaction;
32
33pub use connection::PgConnection;
34pub use connection::TlsConfig;
35pub(crate) use connection::{CANCEL_REQUEST_CODE, parse_affected_rows};
36pub use cancel::CancelToken;
37pub use io_backend::{IoBackend, backend_name, detect as detect_io_backend};
38pub use pool::{PgPool, PoolConfig, PoolStats, PooledConnection};
39pub use prepared::PreparedStatement;
40pub use rls::RlsContext;
41pub use row::QailRow;
42
43use qail_core::ast::Qail;
44use std::collections::HashMap;
45use std::sync::Arc;
46
47#[derive(Debug, Clone)]
48pub struct ColumnInfo {
49    pub name_to_index: HashMap<String, usize>,
50    pub oids: Vec<u32>,
51    pub formats: Vec<i16>,
52}
53
54impl ColumnInfo {
55    pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
56        let mut name_to_index = HashMap::with_capacity(fields.len());
57        let mut oids = Vec::with_capacity(fields.len());
58        let mut formats = Vec::with_capacity(fields.len());
59
60        for (i, field) in fields.iter().enumerate() {
61            name_to_index.insert(field.name.clone(), i);
62            oids.push(field.type_oid);
63            formats.push(field.format);
64        }
65
66        Self {
67            name_to_index,
68            oids,
69            formats,
70        }
71    }
72}
73
74/// PostgreSQL row with column data and metadata.
75pub struct PgRow {
76    pub columns: Vec<Option<Vec<u8>>>,
77    pub column_info: Option<Arc<ColumnInfo>>,
78}
79
80/// Error type for PostgreSQL driver operations.
81#[derive(Debug)]
82pub enum PgError {
83    Connection(String),
84    Protocol(String),
85    Auth(String),
86    Query(String),
87    NoRows,
88    /// I/O error
89    Io(std::io::Error),
90    /// Encoding error (parameter limit, etc.)
91    Encode(String),
92}
93
94impl std::fmt::Display for PgError {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            PgError::Connection(e) => write!(f, "Connection error: {}", e),
98            PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
99            PgError::Auth(e) => write!(f, "Auth error: {}", e),
100            PgError::Query(e) => write!(f, "Query error: {}", e),
101            PgError::NoRows => write!(f, "No rows returned"),
102            PgError::Io(e) => write!(f, "I/O error: {}", e),
103            PgError::Encode(e) => write!(f, "Encode error: {}", e),
104        }
105    }
106}
107
108impl std::error::Error for PgError {}
109
110impl From<std::io::Error> for PgError {
111    fn from(e: std::io::Error) -> Self {
112        PgError::Io(e)
113    }
114}
115
116/// Result type for PostgreSQL operations.
117pub type PgResult<T> = Result<T, PgError>;
118
119/// Result of a query that returns rows (SELECT/GET).
120#[derive(Debug, Clone)]
121pub struct QueryResult {
122    /// Column names from RowDescription.
123    pub columns: Vec<String>,
124    /// Rows of text-decoded values (None = NULL).
125    pub rows: Vec<Vec<Option<String>>>,
126}
127
128/// Combines the pure encoder (Layer 2) with async I/O (Layer 3).
129pub struct PgDriver {
130    #[allow(dead_code)]
131    connection: PgConnection,
132    /// Current RLS context, if set. Used for multi-tenant data isolation.
133    rls_context: Option<RlsContext>,
134}
135
136impl PgDriver {
137    /// Create a new driver with an existing connection.
138    pub fn new(connection: PgConnection) -> Self {
139        Self { connection, rls_context: None }
140    }
141
142    /// Builder pattern for ergonomic connection configuration.
143    /// # Example
144    /// ```ignore
145    /// let driver = PgDriver::builder()
146    ///     .host("localhost")
147    ///     .port(5432)
148    ///     .user("admin")
149    ///     .database("mydb")
150    ///     .password("secret")  // Optional
151    ///     .connect()
152    ///     .await?;
153    /// ```
154    pub fn builder() -> PgDriverBuilder {
155        PgDriverBuilder::new()
156    }
157
158    /// Connect to PostgreSQL and create a driver (trust mode, no password).
159    pub async fn connect(host: &str, port: u16, user: &str, database: &str) -> PgResult<Self> {
160        let connection = PgConnection::connect(host, port, user, database).await?;
161        Ok(Self::new(connection))
162    }
163
164    /// Connect to PostgreSQL with password authentication (SCRAM-SHA-256).
165    pub async fn connect_with_password(
166        host: &str,
167        port: u16,
168        user: &str,
169        database: &str,
170        password: &str,
171    ) -> PgResult<Self> {
172        let connection =
173            PgConnection::connect_with_password(host, port, user, database, Some(password)).await?;
174        Ok(Self::new(connection))
175    }
176
177    /// Connect using DATABASE_URL environment variable.
178    /// 
179    /// Parses the URL format: `postgresql://user:password@host:port/database`
180    /// or `postgres://user:password@host:port/database`
181    /// 
182    /// # Example
183    /// ```ignore
184    /// // Set DATABASE_URL=postgresql://user:pass@localhost:5432/mydb
185    /// let driver = PgDriver::connect_env().await?;
186    /// ```
187    pub async fn connect_env() -> PgResult<Self> {
188        let url = std::env::var("DATABASE_URL")
189            .map_err(|_| PgError::Connection("DATABASE_URL environment variable not set".to_string()))?;
190        Self::connect_url(&url).await
191    }
192
193    /// Connect using a PostgreSQL connection URL.
194    /// 
195    /// Parses the URL format: `postgresql://user:password@host:port/database`
196    /// or `postgres://user:password@host:port/database`
197    /// 
198    /// # Example
199    /// ```ignore
200    /// let driver = PgDriver::connect_url("postgresql://user:pass@localhost:5432/mydb").await?;
201    /// ```
202    pub async fn connect_url(url: &str) -> PgResult<Self> {
203        let (host, port, user, database, password) = Self::parse_database_url(url)?;
204        
205        if let Some(pwd) = password {
206            Self::connect_with_password(&host, port, &user, &database, &pwd).await
207        } else {
208            Self::connect(&host, port, &user, &database).await
209        }
210    }
211
212    /// Parse a PostgreSQL connection URL into components.
213    /// 
214    /// Format: `postgresql://user:password@host:port/database`
215    /// or `postgres://user:password@host:port/database`
216    /// 
217    /// URL percent-encoding is automatically decoded for user and password.
218    fn parse_database_url(url: &str) -> PgResult<(String, u16, String, String, Option<String>)> {
219        // Remove scheme (postgresql:// or postgres://)
220        let after_scheme = url.split("://").nth(1)
221            .ok_or_else(|| PgError::Connection("Invalid DATABASE_URL: missing scheme".to_string()))?;
222        
223        // Split into auth@host parts
224        let (auth_part, host_db_part) = if let Some(at_pos) = after_scheme.rfind('@') {
225            (Some(&after_scheme[..at_pos]), &after_scheme[at_pos + 1..])
226        } else {
227            (None, after_scheme)
228        };
229        
230        // Parse auth (user:password)
231        let (user, password) = if let Some(auth) = auth_part {
232            let parts: Vec<&str> = auth.splitn(2, ':').collect();
233            if parts.len() == 2 {
234                // URL-decode both user and password
235                (
236                    Self::percent_decode(parts[0]),
237                    Some(Self::percent_decode(parts[1])),
238                )
239            } else {
240                (Self::percent_decode(parts[0]), None)
241            }
242        } else {
243            return Err(PgError::Connection("Invalid DATABASE_URL: missing user".to_string()));
244        };
245        
246        // Parse host:port/database
247        let (host_port, database) = if let Some(slash_pos) = host_db_part.find('/') {
248            (&host_db_part[..slash_pos], host_db_part[slash_pos + 1..].to_string())
249        } else {
250            return Err(PgError::Connection("Invalid DATABASE_URL: missing database name".to_string()));
251        };
252        
253        // Parse host:port
254        let (host, port) = if let Some(colon_pos) = host_port.rfind(':') {
255            let port_str = &host_port[colon_pos + 1..];
256            let port = port_str.parse::<u16>()
257                .map_err(|_| PgError::Connection(format!("Invalid port: {}", port_str)))?;
258            (host_port[..colon_pos].to_string(), port)
259        } else {
260            (host_port.to_string(), 5432) // Default PostgreSQL port
261        };
262        
263        Ok((host, port, user, database, password))
264    }
265    
266    /// Decode URL percent-encoded string.
267    /// Handles common encodings: %20 (space), %2B (+), %3D (=), %40 (@), %2F (/), etc.
268    fn percent_decode(s: &str) -> String {
269        let mut result = String::with_capacity(s.len());
270        let mut chars = s.chars().peekable();
271        
272        while let Some(c) = chars.next() {
273            if c == '%' {
274                // Try to parse next two chars as hex
275                let hex: String = chars.by_ref().take(2).collect();
276                if hex.len() == 2
277                    && let Ok(byte) = u8::from_str_radix(&hex, 16)
278                {
279                    result.push(byte as char);
280                    continue;
281                }
282                // If parsing failed, keep original
283                result.push('%');
284                result.push_str(&hex);
285            } else if c == '+' {
286                // '+' often represents space in query strings (form encoding)
287                // But in path components, keep as-is. PostgreSQL URLs use path encoding.
288                result.push('+');
289            } else {
290                result.push(c);
291            }
292        }
293        
294        result
295    }
296
297    /// Connect to PostgreSQL with a connection timeout.
298    /// If the connection cannot be established within the timeout, returns an error.
299    /// # Example
300    /// ```ignore
301    /// use std::time::Duration;
302    /// let driver = PgDriver::connect_with_timeout(
303    ///     "localhost", 5432, "user", "db", "password",
304    ///     Duration::from_secs(5)
305    /// ).await?;
306    /// ```
307    pub async fn connect_with_timeout(
308        host: &str,
309        port: u16,
310        user: &str,
311        database: &str,
312        password: &str,
313        timeout: std::time::Duration,
314    ) -> PgResult<Self> {
315        tokio::time::timeout(
316            timeout,
317            Self::connect_with_password(host, port, user, database, password),
318        )
319        .await
320        .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
321    }
322    /// Clear the prepared statement cache.
323    /// Frees memory by removing all cached statements.
324    /// Note: Statements remain on the PostgreSQL server until connection closes.
325    pub fn clear_cache(&mut self) {
326        self.connection.stmt_cache.clear();
327        self.connection.prepared_statements.clear();
328    }
329
330    /// Get cache statistics.
331    /// Returns (current_size, max_capacity).
332    pub fn cache_stats(&self) -> (usize, usize) {
333        (self.connection.stmt_cache.len(), self.connection.stmt_cache.cap().get())
334    }
335
336    /// Execute a QAIL command and fetch all rows (CACHED + ZERO-ALLOC).
337    /// **Default method** - uses prepared statement caching for best performance.
338    /// On first call: sends Parse + Bind + Execute + Sync
339    /// On subsequent calls with same SQL: sends only Bind + Execute (SKIPS Parse!)
340    /// Uses LRU cache with max 1000 statements (auto-evicts oldest).
341    pub async fn fetch_all(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
342        // Delegate to fetch_all_cached for cached-by-default behavior
343        self.fetch_all_cached(cmd).await
344    }
345
346    /// Execute a QAIL command and fetch all rows as a typed struct.
347    /// Requires the target type to implement `QailRow` trait.
348    /// 
349    /// # Example
350    /// ```ignore
351    /// let users: Vec<User> = driver.fetch_typed::<User>(&query).await?;
352    /// ```
353    pub async fn fetch_typed<T: row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Vec<T>> {
354        let rows = self.fetch_all(cmd).await?;
355        Ok(rows.iter().map(T::from_row).collect())
356    }
357
358    /// Execute a QAIL command and fetch a single row as a typed struct.
359    /// Returns None if no rows are returned.
360    pub async fn fetch_one_typed<T: row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Option<T>> {
361        let rows = self.fetch_all(cmd).await?;
362        Ok(rows.first().map(T::from_row))
363    }
364
365    /// Execute a QAIL command and fetch all rows (UNCACHED).
366    /// Sends Parse + Bind + Execute on every call.
367    /// Use for one-off queries or when caching is not desired.
368    pub async fn fetch_all_uncached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
369        use crate::protocol::AstEncoder;
370
371        let wire_bytes = AstEncoder::encode_cmd_reuse(
372            cmd,
373            &mut self.connection.sql_buf,
374            &mut self.connection.params_buf,
375        );
376
377        self.connection.send_bytes(&wire_bytes).await?;
378
379        let mut rows: Vec<PgRow> = Vec::new();
380        let mut column_info: Option<Arc<ColumnInfo>> = None;
381
382        let mut error: Option<PgError> = None;
383
384        loop {
385            let msg = self.connection.recv().await?;
386            match msg {
387                crate::protocol::BackendMessage::ParseComplete
388                | crate::protocol::BackendMessage::BindComplete => {}
389                crate::protocol::BackendMessage::RowDescription(fields) => {
390                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
391                }
392                crate::protocol::BackendMessage::DataRow(data) => {
393                    if error.is_none() {
394                        rows.push(PgRow {
395                            columns: data,
396                            column_info: column_info.clone(),
397                        });
398                    }
399                }
400                crate::protocol::BackendMessage::CommandComplete(_) => {}
401                crate::protocol::BackendMessage::ReadyForQuery(_) => {
402                    if let Some(err) = error {
403                        return Err(err);
404                    }
405                    return Ok(rows);
406                }
407                crate::protocol::BackendMessage::ErrorResponse(err) => {
408                    if error.is_none() {
409                        error = Some(PgError::Query(err.message));
410                    }
411                }
412                _ => {}
413            }
414        }
415    }
416
417    /// Execute a QAIL command and fetch all rows (FAST VERSION).
418    /// Uses optimized recv_with_data_fast for faster response parsing.
419    /// Skips column metadata collection for maximum speed.
420    pub async fn fetch_all_fast(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
421        use crate::protocol::AstEncoder;
422
423        let wire_bytes = AstEncoder::encode_cmd_reuse(
424            cmd,
425            &mut self.connection.sql_buf,
426            &mut self.connection.params_buf,
427        );
428
429        self.connection.send_bytes(&wire_bytes).await?;
430
431        // Collect results using FAST receiver
432        let mut rows: Vec<PgRow> = Vec::new();
433        let mut error: Option<PgError> = None;
434
435        loop {
436            let res = self.connection.recv_with_data_fast().await;
437            match res {
438                Ok((msg_type, data)) => {
439                    match msg_type {
440                        b'D' => {
441                             // DataRow
442                            if error.is_none() && let Some(columns) = data {
443                                rows.push(PgRow {
444                                    columns,
445                                    column_info: None, // Skip metadata for speed
446                                });
447                            }
448                        }
449                        b'Z' => {
450                            // ReadyForQuery
451                            if let Some(err) = error {
452                                return Err(err);
453                            }
454                            return Ok(rows);
455                        }
456                        _ => {} // 1, 2, C, T - skip Parse/Bind/CommandComplete/RowDescription
457                    }
458                }
459                Err(e) => {
460                   // recv_with_data_fast returns Err on ErrorResponse automatically.
461                   // We need to capture it and continue draining.
462                   // BUT recv_with_data_fast doesn't return the error *message type* if it fails.
463                   // It returns PgError::Query(msg).
464                   // So we capture the error, but we must continue RECVing until ReadyForQuery.
465                   // However, recv_with_data_fast will KEEP returning Err(Query) if the buffer has E?
466                   // No, recv_with_data_fast consumes the E message before returning Err.
467                   
468                   if error.is_none() {
469                       error = Some(e);
470                   }
471                   // Continue loop to drain until ReadyForQuery... 
472                   // BUT wait, does recv_with_data_fast handle the *rest* of the stream?
473                   // If we call it again, it will read the NEXT message.
474                   // So we just continue.
475                }
476            }
477        }
478    }
479
480    /// Execute a QAIL command and fetch one row.
481    pub async fn fetch_one(&mut self, cmd: &Qail) -> PgResult<PgRow> {
482        let rows = self.fetch_all(cmd).await?;
483        rows.into_iter().next().ok_or(PgError::NoRows)
484    }
485
486    /// Execute a QAIL command with PREPARED STATEMENT CACHING.
487    /// Like fetch_all(), but caches the prepared statement on the server.
488    /// On first call: sends Parse + Describe + Bind + Execute + Sync
489    /// On subsequent calls: sends only Bind + Execute + Sync (SKIPS Parse!)
490    /// Column metadata (RowDescription) is cached alongside the statement
491    /// so that by-name column access works on every call.
492    pub async fn fetch_all_cached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
493        use crate::protocol::AstEncoder;
494        use std::collections::hash_map::DefaultHasher;
495        use std::hash::{Hash, Hasher};
496
497        self.connection.sql_buf.clear();
498        self.connection.params_buf.clear();
499        
500        // Encode SQL to reusable buffer
501        match cmd.action {
502            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
503                crate::protocol::ast_encoder::dml::encode_select(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
504            }
505            qail_core::ast::Action::Add => {
506                crate::protocol::ast_encoder::dml::encode_insert(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
507            }
508            qail_core::ast::Action::Set => {
509                crate::protocol::ast_encoder::dml::encode_update(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
510            }
511            qail_core::ast::Action::Del => {
512                crate::protocol::ast_encoder::dml::encode_delete(cmd, &mut self.connection.sql_buf, &mut self.connection.params_buf).ok();
513            }
514            _ => {
515                // Fallback for unsupported actions
516                let (sql, params) = AstEncoder::encode_cmd_sql(cmd);
517                let raw_rows = self.connection.query_cached(&sql, &params).await?;
518                return Ok(raw_rows.into_iter().map(|data| PgRow { columns: data, column_info: None }).collect());
519            }
520        }
521
522        let mut hasher = DefaultHasher::new();
523        self.connection.sql_buf.hash(&mut hasher);
524        let sql_hash = hasher.finish();
525
526        let is_cache_miss = !self.connection.stmt_cache.contains(&sql_hash);
527
528        let stmt_name = if let Some(name) = self.connection.stmt_cache.get(&sql_hash) {
529            name.clone()
530        } else {
531            let name = format!("qail_{:x}", sql_hash);
532            
533            use crate::protocol::PgEncoder;
534            use tokio::io::AsyncWriteExt;
535            
536            let sql_str = std::str::from_utf8(&self.connection.sql_buf).unwrap_or("");
537            
538            // Send Parse + Describe(Statement) to get RowDescription on first call
539            let parse_msg = PgEncoder::encode_parse(&name, sql_str, &[]);
540            let describe_msg = PgEncoder::encode_describe(false, &name);
541            self.connection.stream.write_all(&parse_msg).await?;
542            self.connection.stream.write_all(&describe_msg).await?;
543            
544            self.connection.stmt_cache.put(sql_hash, name.clone());
545            self.connection.prepared_statements.insert(name.clone(), sql_str.to_string());
546            
547            name
548        };
549
550        // Send Bind + Execute + Sync (always)
551        use crate::protocol::PgEncoder;
552        use tokio::io::AsyncWriteExt;
553        
554        let mut buf = bytes::BytesMut::with_capacity(128);
555        PgEncoder::encode_bind_to(&mut buf, &stmt_name, &self.connection.params_buf)
556            .map_err(|e| PgError::Encode(e.to_string()))?;
557        PgEncoder::encode_execute_to(&mut buf);
558        PgEncoder::encode_sync_to(&mut buf);
559        self.connection.stream.write_all(&buf).await?;
560
561        // On cache hit, use the previously cached ColumnInfo
562        let cached_column_info = self.connection.column_info_cache.get(&sql_hash).cloned();
563
564        let mut rows: Vec<PgRow> = Vec::new();
565        let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
566        let mut error: Option<PgError> = None;
567
568        loop {
569            let msg = self.connection.recv().await?;
570            match msg {
571                crate::protocol::BackendMessage::ParseComplete
572                | crate::protocol::BackendMessage::BindComplete => {}
573                crate::protocol::BackendMessage::ParameterDescription(_) => {
574                    // Sent after Describe(Statement) — ignore
575                }
576                crate::protocol::BackendMessage::RowDescription(fields) => {
577                    // Received after Describe(Statement) on cache miss
578                    let info = Arc::new(ColumnInfo::from_fields(&fields));
579                    if is_cache_miss {
580                        self.connection.column_info_cache.insert(sql_hash, info.clone());
581                    }
582                    column_info = Some(info);
583                }
584                crate::protocol::BackendMessage::DataRow(data) => {
585                    if error.is_none() {
586                        rows.push(PgRow {
587                            columns: data,
588                            column_info: column_info.clone(),
589                        });
590                    }
591                }
592                crate::protocol::BackendMessage::CommandComplete(_) => {}
593                crate::protocol::BackendMessage::NoData => {
594                    // Sent by Describe for statements that return no data (e.g. pure UPDATE without RETURNING)
595                }
596                crate::protocol::BackendMessage::ReadyForQuery(_) => {
597                    if let Some(err) = error {
598                        return Err(err);
599                    }
600                    return Ok(rows);
601                }
602                crate::protocol::BackendMessage::ErrorResponse(err) => {
603                    if error.is_none() {
604                        error = Some(PgError::Query(err.message));
605                        // Invalidate cache to prevent "prepared statement does not exist"
606                        // on next retry if the error happened during Parse/Bind.
607                        self.connection.stmt_cache.clear();
608                        self.connection.prepared_statements.clear();
609                        self.connection.column_info_cache.clear();
610                    }
611                }
612                _ => {}
613            }
614        }
615    }
616
617    /// Execute a QAIL command (for mutations) - ZERO-ALLOC.
618    pub async fn execute(&mut self, cmd: &Qail) -> PgResult<u64> {
619        use crate::protocol::AstEncoder;
620
621        let wire_bytes = AstEncoder::encode_cmd_reuse(
622            cmd,
623            &mut self.connection.sql_buf,
624            &mut self.connection.params_buf,
625        );
626
627        self.connection.send_bytes(&wire_bytes).await?;
628
629        let mut affected = 0u64;
630        let mut error: Option<PgError> = None;
631
632        loop {
633            let msg = self.connection.recv().await?;
634            match msg {
635                crate::protocol::BackendMessage::ParseComplete
636                | crate::protocol::BackendMessage::BindComplete => {}
637                crate::protocol::BackendMessage::RowDescription(_) => {}
638                crate::protocol::BackendMessage::DataRow(_) => {}
639                crate::protocol::BackendMessage::CommandComplete(tag) => {
640                    if error.is_none() && let Some(n) = tag.split_whitespace().last() {
641                        affected = n.parse().unwrap_or(0);
642                    }
643                }
644                crate::protocol::BackendMessage::ReadyForQuery(_) => {
645                    if let Some(err) = error {
646                        return Err(err);
647                    }
648                    return Ok(affected);
649                }
650                crate::protocol::BackendMessage::ErrorResponse(err) => {
651                    if error.is_none() {
652                        error = Some(PgError::Query(err.message));
653                    }
654                }
655                _ => {}
656            }
657        }
658    }
659
660    /// Query a QAIL command and return rows (for SELECT/GET queries).
661    /// Like `execute()` but collects RowDescription + DataRow messages
662    /// instead of discarding them.
663    pub async fn query_ast(&mut self, cmd: &Qail) -> PgResult<QueryResult> {
664        use crate::protocol::AstEncoder;
665
666        let wire_bytes = AstEncoder::encode_cmd_reuse(
667            cmd,
668            &mut self.connection.sql_buf,
669            &mut self.connection.params_buf,
670        );
671
672        self.connection.send_bytes(&wire_bytes).await?;
673
674        let mut columns: Vec<String> = Vec::new();
675        let mut rows: Vec<Vec<Option<String>>> = Vec::new();
676        let mut error: Option<PgError> = None;
677
678        loop {
679            let msg = self.connection.recv().await?;
680            match msg {
681                crate::protocol::BackendMessage::ParseComplete
682                | crate::protocol::BackendMessage::BindComplete => {}
683                crate::protocol::BackendMessage::RowDescription(fields) => {
684                    columns = fields.into_iter().map(|f| f.name).collect();
685                }
686                crate::protocol::BackendMessage::DataRow(data) => {
687                    if error.is_none() {
688                        let row: Vec<Option<String>> = data
689                            .into_iter()
690                            .map(|col| col.map(|bytes| String::from_utf8_lossy(&bytes).to_string()))
691                            .collect();
692                        rows.push(row);
693                    }
694                }
695                crate::protocol::BackendMessage::CommandComplete(_) => {}
696                crate::protocol::BackendMessage::NoData => {}
697                crate::protocol::BackendMessage::ReadyForQuery(_) => {
698                    if let Some(err) = error {
699                        return Err(err);
700                    }
701                    return Ok(QueryResult { columns, rows });
702                }
703                crate::protocol::BackendMessage::ErrorResponse(err) => {
704                    if error.is_none() {
705                        error = Some(PgError::Query(err.message));
706                    }
707                }
708                _ => {}
709            }
710        }
711    }
712
713    // ==================== TRANSACTION CONTROL ====================
714
715    /// Begin a transaction (AST-native).
716    pub async fn begin(&mut self) -> PgResult<()> {
717        self.connection.begin_transaction().await
718    }
719
720    /// Commit the current transaction (AST-native).
721    pub async fn commit(&mut self) -> PgResult<()> {
722        self.connection.commit().await
723    }
724
725    /// Rollback the current transaction (AST-native).
726    pub async fn rollback(&mut self) -> PgResult<()> {
727        self.connection.rollback().await
728    }
729
730    /// Create a named savepoint within the current transaction.
731    /// Savepoints allow partial rollback within a transaction.
732    /// Use `rollback_to()` to return to this savepoint.
733    /// # Example
734    /// ```ignore
735    /// driver.begin().await?;
736    /// driver.execute(&insert1).await?;
737    /// driver.savepoint("sp1").await?;
738    /// driver.execute(&insert2).await?;
739    /// driver.rollback_to("sp1").await?; // Undo insert2, keep insert1
740    /// driver.commit().await?;
741    /// ```
742    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
743        self.connection.savepoint(name).await
744    }
745
746    /// Rollback to a previously created savepoint.
747    /// Discards all changes since the named savepoint was created,
748    /// but keeps the transaction open.
749    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
750        self.connection.rollback_to(name).await
751    }
752
753    /// Release a savepoint (free resources, if no longer needed).
754    /// After release, the savepoint cannot be rolled back to.
755    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
756        self.connection.release_savepoint(name).await
757    }
758
759    // ==================== BATCH TRANSACTIONS ====================
760
761    /// Execute multiple commands in a single atomic transaction.
762    /// All commands succeed or all are rolled back.
763    /// # Example
764    /// ```ignore
765    /// let cmds = vec![
766    ///     Qail::add("users").columns(["name"]).values(["Alice"]),
767    ///     Qail::add("users").columns(["name"]).values(["Bob"]),
768    /// ];
769    /// let results = driver.execute_batch(&cmds).await?;
770    /// // results = [1, 1] (rows affected)
771    /// ```
772    pub async fn execute_batch(&mut self, cmds: &[Qail]) -> PgResult<Vec<u64>> {
773        self.begin().await?;
774        let mut results = Vec::with_capacity(cmds.len());
775        for cmd in cmds {
776            match self.execute(cmd).await {
777                Ok(n) => results.push(n),
778                Err(e) => {
779                    self.rollback().await?;
780                    return Err(e);
781                }
782            }
783        }
784        self.commit().await?;
785        Ok(results)
786    }
787
788    // ==================== STATEMENT TIMEOUT ====================
789
790    /// Set statement timeout for this connection (in milliseconds).
791    /// # Example
792    /// ```ignore
793    /// driver.set_statement_timeout(30_000).await?; // 30 seconds
794    /// ```
795    pub async fn set_statement_timeout(&mut self, ms: u32) -> PgResult<()> {
796        self.execute_raw(&format!("SET statement_timeout = {}", ms))
797            .await
798    }
799
800    /// Reset statement timeout to default (no limit).
801    pub async fn reset_statement_timeout(&mut self) -> PgResult<()> {
802        self.execute_raw("RESET statement_timeout").await
803    }
804
805    // ==================== RLS (MULTI-TENANT) ====================
806
807    /// Set the RLS context for multi-tenant data isolation.
808    ///
809    /// Configures PostgreSQL session variables (`app.current_operator_id`, etc.)
810    /// so that RLS policies automatically filter data by tenant.
811    ///
812    /// Since `PgDriver` takes `&mut self`, the borrow checker guarantees
813    /// that `set_config` and all subsequent queries execute on the **same
814    /// connection** — no pool race conditions possible.
815    ///
816    /// # Example
817    /// ```ignore
818    /// driver.set_rls_context(RlsContext::operator("op-123")).await?;
819    /// let orders = driver.fetch_all(&Qail::get("orders")).await?;
820    /// // orders only contains rows where operator_id = 'op-123'
821    /// ```
822    pub async fn set_rls_context(&mut self, ctx: rls::RlsContext) -> PgResult<()> {
823        let sql = rls::context_to_sql(&ctx);
824        self.execute_raw(&sql).await?;
825        self.rls_context = Some(ctx);
826        Ok(())
827    }
828
829    /// Clear the RLS context, resetting session variables to safe defaults.
830    ///
831    /// After clearing, all RLS-protected queries will return zero rows
832    /// (empty operator_id matches nothing).
833    pub async fn clear_rls_context(&mut self) -> PgResult<()> {
834        self.execute_raw(rls::reset_sql()).await?;
835        self.rls_context = None;
836        Ok(())
837    }
838
839    /// Get the current RLS context, if any.
840    pub fn rls_context(&self) -> Option<&rls::RlsContext> {
841        self.rls_context.as_ref()
842    }
843
844    // ==================== PIPELINE (BATCH) ====================
845
846    /// Execute multiple Qail ASTs in a single network round-trip (PIPELINING).
847    /// # Example
848    /// ```ignore
849    /// let cmds: Vec<Qail> = (1..=1000)
850    ///     .map(|i| Qail::get("harbors").columns(["id", "name"]).limit(i))
851    ///     .collect();
852    /// let count = driver.pipeline_batch(&cmds).await?;
853    /// assert_eq!(count, 1000);
854    /// ```
855    pub async fn pipeline_batch(&mut self, cmds: &[Qail]) -> PgResult<usize> {
856        self.connection.pipeline_ast_fast(cmds).await
857    }
858
859    /// Execute multiple Qail ASTs and return full row data.
860    pub async fn pipeline_fetch(&mut self, cmds: &[Qail]) -> PgResult<Vec<Vec<PgRow>>> {
861        let raw_results = self.connection.pipeline_ast(cmds).await?;
862
863        let results: Vec<Vec<PgRow>> = raw_results
864            .into_iter()
865            .map(|rows| {
866                rows.into_iter()
867                    .map(|columns| PgRow {
868                        columns,
869                        column_info: None,
870                    })
871                    .collect()
872            })
873            .collect();
874
875        Ok(results)
876    }
877
878    /// Prepare a SQL statement for repeated execution.
879    pub async fn prepare(&mut self, sql: &str) -> PgResult<PreparedStatement> {
880        self.connection.prepare(sql).await
881    }
882
883    /// Execute a prepared statement pipeline in FAST mode (count only).
884    pub async fn pipeline_prepared_fast(
885        &mut self,
886        stmt: &PreparedStatement,
887        params_batch: &[Vec<Option<Vec<u8>>>],
888    ) -> PgResult<usize> {
889        self.connection
890            .pipeline_prepared_fast(stmt, params_batch)
891            .await
892    }
893
894    // ==================== LEGACY/BOOTSTRAP ====================
895
896    /// Execute a raw SQL string.
897    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
898    /// Use for bootstrap DDL only (e.g., migration table creation).
899    /// For transactions, use `begin()`, `commit()`, `rollback()`.
900    pub async fn execute_raw(&mut self, sql: &str) -> PgResult<()> {
901        // Reject literal NULL bytes - they corrupt PostgreSQL connection state
902        if sql.as_bytes().contains(&0) {
903            return Err(crate::PgError::Protocol(
904                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
905            ));
906        }
907        self.connection.execute_simple(sql).await
908    }
909
910    /// Execute a raw SQL query and return rows.
911    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
912    /// Use for bootstrap/admin queries only.
913    pub async fn fetch_raw(&mut self, sql: &str) -> PgResult<Vec<PgRow>> {
914        if sql.as_bytes().contains(&0) {
915            return Err(crate::PgError::Protocol(
916                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
917            ));
918        }
919        
920        use tokio::io::AsyncWriteExt;
921        use crate::protocol::PgEncoder;
922        
923        // Use simple query protocol (no prepared statements)
924        let msg = PgEncoder::encode_query_string(sql);
925        self.connection.stream.write_all(&msg).await?;
926        
927        let mut rows: Vec<PgRow> = Vec::new();
928        let mut column_info: Option<std::sync::Arc<ColumnInfo>> = None;
929        
930
931        let mut error: Option<PgError> = None;
932
933        loop {
934            let msg = self.connection.recv().await?;
935            match msg {
936                crate::protocol::BackendMessage::RowDescription(fields) => {
937                    column_info = Some(std::sync::Arc::new(ColumnInfo::from_fields(&fields)));
938                }
939                crate::protocol::BackendMessage::DataRow(data) => {
940                    if error.is_none() {
941                        rows.push(PgRow {
942                            columns: data,
943                            column_info: column_info.clone(),
944                        });
945                    }
946                }
947                crate::protocol::BackendMessage::CommandComplete(_) => {}
948                crate::protocol::BackendMessage::ReadyForQuery(_) => {
949                    if let Some(err) = error {
950                        return Err(err);
951                    }
952                    return Ok(rows);
953                }
954                crate::protocol::BackendMessage::ErrorResponse(err) => {
955                    if error.is_none() {
956                        error = Some(PgError::Query(err.message));
957                    }
958                }
959                _ => {}
960            }
961        }
962    }
963
964    /// Bulk insert data using PostgreSQL COPY protocol (AST-native).
965    /// Uses a Qail::Add to get validated table and column names from the AST,
966    /// not user-provided strings. This is the sound, AST-native approach.
967    /// # Example
968    /// ```ignore
969    /// // Create a Qail::Add to define table and columns
970    /// let cmd = Qail::add("users")
971    ///     .columns(["id", "name", "email"]);
972    /// // Bulk insert rows
973    /// let rows: Vec<Vec<Value>> = vec![
974    ///     vec![Value::Int(1), Value::String("Alice"), Value::String("alice@ex.com")],
975    ///     vec![Value::Int(2), Value::String("Bob"), Value::String("bob@ex.com")],
976    /// ];
977    /// driver.copy_bulk(&cmd, &rows).await?;
978    /// ```
979    pub async fn copy_bulk(
980        &mut self,
981        cmd: &Qail,
982        rows: &[Vec<qail_core::ast::Value>],
983    ) -> PgResult<u64> {
984        use qail_core::ast::Action;
985
986
987        if cmd.action != Action::Add {
988            return Err(PgError::Query(
989                "copy_bulk requires Qail::Add action".to_string(),
990            ));
991        }
992
993        let table = &cmd.table;
994
995        let columns: Vec<String> = cmd
996            .columns
997            .iter()
998            .filter_map(|expr| {
999                use qail_core::ast::Expr;
1000                match expr {
1001                    Expr::Named(name) => Some(name.clone()),
1002                    Expr::Aliased { name, .. } => Some(name.clone()),
1003                    Expr::Star => None, // Can't COPY with *
1004                    _ => None,
1005                }
1006            })
1007            .collect();
1008
1009        if columns.is_empty() {
1010            return Err(PgError::Query(
1011                "copy_bulk requires columns in Qail".to_string(),
1012            ));
1013        }
1014
1015        // Use optimized COPY path: direct Value → bytes encoding, single syscall
1016        self.connection.copy_in_fast(table, &columns, rows).await
1017    }
1018
1019    /// **Fastest** bulk insert using pre-encoded COPY data.
1020    /// Accepts raw COPY text format bytes. Use when caller has already
1021    /// encoded rows to avoid any encoding overhead.
1022    /// # Format
1023    /// Data should be tab-separated rows with newlines (COPY text format):
1024    /// `1\thello\t3.14\n2\tworld\t2.71\n`
1025    /// # Example
1026    /// ```ignore
1027    /// let cmd = Qail::add("users").columns(["id", "name"]);
1028    /// let data = b"1\tAlice\n2\tBob\n";
1029    /// driver.copy_bulk_bytes(&cmd, data).await?;
1030    /// ```
1031    pub async fn copy_bulk_bytes(&mut self, cmd: &Qail, data: &[u8]) -> PgResult<u64> {
1032        use qail_core::ast::Action;
1033
1034        if cmd.action != Action::Add {
1035            return Err(PgError::Query(
1036                "copy_bulk_bytes requires Qail::Add action".to_string(),
1037            ));
1038        }
1039
1040        let table = &cmd.table;
1041        let columns: Vec<String> = cmd
1042            .columns
1043            .iter()
1044            .filter_map(|expr| {
1045                use qail_core::ast::Expr;
1046                match expr {
1047                    Expr::Named(name) => Some(name.clone()),
1048                    Expr::Aliased { name, .. } => Some(name.clone()),
1049                    _ => None,
1050                }
1051            })
1052            .collect();
1053
1054        if columns.is_empty() {
1055            return Err(PgError::Query(
1056                "copy_bulk_bytes requires columns in Qail".to_string(),
1057            ));
1058        }
1059
1060        // Direct to raw COPY - zero encoding!
1061        self.connection.copy_in_raw(table, &columns, data).await
1062    }
1063
1064    /// Export table data using PostgreSQL COPY TO STDOUT (zero-copy streaming).
1065    /// Returns rows as tab-separated bytes for direct re-import via copy_bulk_bytes.
1066    /// # Example
1067    /// ```ignore
1068    /// let data = driver.copy_export_table("users", &["id", "name"]).await?;
1069    /// shadow_driver.copy_bulk_bytes(&cmd, &data).await?;
1070    /// ```
1071    pub async fn copy_export_table(
1072        &mut self,
1073        table: &str,
1074        columns: &[String],
1075    ) -> PgResult<Vec<u8>> {
1076        let cols = columns.join(", ");
1077        let sql = format!("COPY {} ({}) TO STDOUT", table, cols);
1078        
1079        self.connection.copy_out_raw(&sql).await
1080    }
1081
1082    /// Stream large result sets using PostgreSQL cursors.
1083    /// This method uses DECLARE CURSOR internally to stream rows in batches,
1084    /// avoiding loading the entire result set into memory.
1085    /// # Example
1086    /// ```ignore
1087    /// let cmd = Qail::get("large_table");
1088    /// let batches = driver.stream_cmd(&cmd, 100).await?;
1089    /// for batch in batches {
1090    ///     for row in batch {
1091    ///         // process row
1092    ///     }
1093    /// }
1094    /// ```
1095    pub async fn stream_cmd(
1096        &mut self,
1097        cmd: &Qail,
1098        batch_size: usize,
1099    ) -> PgResult<Vec<Vec<PgRow>>> {
1100        use std::sync::atomic::{AtomicU64, Ordering};
1101        static CURSOR_ID: AtomicU64 = AtomicU64::new(0);
1102
1103        let cursor_name = format!("qail_cursor_{}", CURSOR_ID.fetch_add(1, Ordering::SeqCst));
1104
1105        // AST-NATIVE: Generate SQL directly from AST (no to_sql_parameterized!)
1106        use crate::protocol::AstEncoder;
1107        let mut sql_buf = bytes::BytesMut::with_capacity(256);
1108        let mut params: Vec<Option<Vec<u8>>> = Vec::new();
1109        AstEncoder::encode_select_sql(cmd, &mut sql_buf, &mut params);
1110        let sql = String::from_utf8_lossy(&sql_buf).to_string();
1111
1112        // Must be in a transaction for cursors
1113        self.connection.begin_transaction().await?;
1114
1115        // Declare cursor
1116        self.connection.declare_cursor(&cursor_name, &sql).await?;
1117
1118        // Fetch all batches
1119        let mut all_batches = Vec::new();
1120        while let Some(rows) = self
1121            .connection
1122            .fetch_cursor(&cursor_name, batch_size)
1123            .await?
1124        {
1125            let pg_rows: Vec<PgRow> = rows
1126                .into_iter()
1127                .map(|cols| PgRow {
1128                    columns: cols,
1129                    column_info: None,
1130                })
1131                .collect();
1132            all_batches.push(pg_rows);
1133        }
1134
1135        self.connection.close_cursor(&cursor_name).await?;
1136        self.connection.commit().await?;
1137
1138        Ok(all_batches)
1139    }
1140}
1141
1142// ============================================================================
1143// Connection Builder
1144// ============================================================================
1145
1146/// Builder for creating PgDriver connections with named parameters.
1147/// # Example
1148/// ```ignore
1149/// let driver = PgDriver::builder()
1150///     .host("localhost")
1151///     .port(5432)
1152///     .user("admin")
1153///     .database("mydb")
1154///     .password("secret")
1155///     .connect()
1156///     .await?;
1157/// ```
1158#[derive(Default)]
1159pub struct PgDriverBuilder {
1160    host: Option<String>,
1161    port: Option<u16>,
1162    user: Option<String>,
1163    database: Option<String>,
1164    password: Option<String>,
1165    timeout: Option<std::time::Duration>,
1166}
1167
1168impl PgDriverBuilder {
1169    /// Create a new builder with default values.
1170    pub fn new() -> Self {
1171        Self::default()
1172    }
1173
1174    /// Set the host (default: "127.0.0.1").
1175    pub fn host(mut self, host: impl Into<String>) -> Self {
1176        self.host = Some(host.into());
1177        self
1178    }
1179
1180    /// Set the port (default: 5432).
1181    pub fn port(mut self, port: u16) -> Self {
1182        self.port = Some(port);
1183        self
1184    }
1185
1186    /// Set the username (required).
1187    pub fn user(mut self, user: impl Into<String>) -> Self {
1188        self.user = Some(user.into());
1189        self
1190    }
1191
1192    /// Set the database name (required).
1193    pub fn database(mut self, database: impl Into<String>) -> Self {
1194        self.database = Some(database.into());
1195        self
1196    }
1197
1198    /// Set the password (optional, for SCRAM-SHA-256 auth).
1199    pub fn password(mut self, password: impl Into<String>) -> Self {
1200        self.password = Some(password.into());
1201        self
1202    }
1203
1204    /// Set connection timeout (optional).
1205    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
1206        self.timeout = Some(timeout);
1207        self
1208    }
1209
1210    /// Connect to PostgreSQL using the configured parameters.
1211    pub async fn connect(self) -> PgResult<PgDriver> {
1212        let host = self.host.as_deref().unwrap_or("127.0.0.1");
1213        let port = self.port.unwrap_or(5432);
1214        let user = self.user.as_deref().ok_or_else(|| {
1215            PgError::Connection("User is required".to_string())
1216        })?;
1217        let database = self.database.as_deref().ok_or_else(|| {
1218            PgError::Connection("Database is required".to_string())
1219        })?;
1220
1221        match (self.password.as_deref(), self.timeout) {
1222            (Some(password), Some(timeout)) => {
1223                PgDriver::connect_with_timeout(host, port, user, database, password, timeout).await
1224            }
1225            (Some(password), None) => {
1226                PgDriver::connect_with_password(host, port, user, database, password).await
1227            }
1228            (None, Some(timeout)) => {
1229                tokio::time::timeout(
1230                    timeout,
1231                    PgDriver::connect(host, port, user, database),
1232                )
1233                .await
1234                .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
1235            }
1236            (None, None) => {
1237                PgDriver::connect(host, port, user, database).await
1238            }
1239        }
1240    }
1241}