Skip to main content

qail_pg/driver/
mod.rs

1//! PostgreSQL Driver Module (Layer 3: Async I/O)
2//!
3//! Auto-detects the best I/O backend:
4//! - Linux 5.1+: io_uring (fastest)
5//! - Linux < 5.1 / macOS / Windows: tokio
6//!
7//! Connection methods are split across modules for easier maintenance:
8//! - `connection.rs` - Core struct and connect methods
9//! - `io.rs` - send, recv, recv_msg_type_fast
10//! - `query.rs` - query, query_cached, execute_simple
11//! - `transaction.rs` - begin_transaction, commit, rollback
12//! - `cursor.rs` - declare_cursor, fetch_cursor, close_cursor  
13//! - `copy.rs` - COPY protocol for bulk operations
14//! - `pipeline.rs` - High-performance pipelining (275k q/s)
15//! - `cancel.rs` - Query cancellation
16//! - `notification.rs` - LISTEN/NOTIFY support
17//! - `io_backend.rs` - Runtime I/O backend detection
18
19pub mod branch_sql;
20mod cancel;
21mod connection;
22mod copy;
23mod cursor;
24pub mod explain;
25#[cfg(all(feature = "enterprise-gssapi", target_os = "linux"))]
26pub mod gss;
27mod io;
28pub mod io_backend;
29pub mod notification;
30mod pipeline;
31mod pool;
32mod prepared;
33mod query;
34pub mod rls;
35mod row;
36mod stream;
37mod transaction;
38
39pub use cancel::CancelToken;
40pub use connection::PgConnection;
41pub use connection::TlsConfig;
42pub(crate) use connection::{CANCEL_REQUEST_CODE, parse_affected_rows};
43pub use io_backend::{IoBackend, backend_name, detect as detect_io_backend};
44pub use notification::Notification;
45pub use pool::{PgPool, PoolConfig, PoolStats, PooledConnection};
46pub use prepared::PreparedStatement;
47pub use rls::RlsContext;
48pub use row::QailRow;
49
50use qail_core::ast::Qail;
51use std::collections::HashMap;
52use std::sync::Arc;
53
54/// Metadata about the columns returned by a query.
55///
56/// Maps column names to positional indices and stores OID / format
57/// information so that [`PgRow`] values can be decoded correctly.
58#[derive(Debug, Clone)]
59pub struct ColumnInfo {
60    /// Lookup table from column name to zero-based index.
61    pub name_to_index: HashMap<String, usize>,
62    /// PostgreSQL type OIDs, one per column.
63    pub oids: Vec<u32>,
64    /// Wire format codes (0 = text, 1 = binary), one per column.
65    pub formats: Vec<i16>,
66}
67
68impl ColumnInfo {
69    /// Build column metadata from the `RowDescription` field list
70    /// returned by the backend after a query.
71    pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
72        let mut name_to_index = HashMap::with_capacity(fields.len());
73        let mut oids = Vec::with_capacity(fields.len());
74        let mut formats = Vec::with_capacity(fields.len());
75
76        for (i, field) in fields.iter().enumerate() {
77            name_to_index.insert(field.name.clone(), i);
78            oids.push(field.type_oid);
79            formats.push(field.format);
80        }
81
82        Self {
83            name_to_index,
84            oids,
85            formats,
86        }
87    }
88}
89
90/// PostgreSQL row with column data and metadata.
91pub struct PgRow {
92    /// Raw column values — `None` represents SQL `NULL`.
93    pub columns: Vec<Option<Vec<u8>>>,
94    /// Shared column metadata for decoding values by name or type.
95    pub column_info: Option<Arc<ColumnInfo>>,
96}
97
98/// Error type for PostgreSQL driver operations.
99#[derive(Debug)]
100pub enum PgError {
101    /// TCP / TLS connection failure with the PostgreSQL server.
102    Connection(String),
103    /// Wire-protocol framing or decoding error.
104    Protocol(String),
105    /// Authentication failure (bad password, unsupported mechanism, etc.).
106    Auth(String),
107    /// Query execution error returned by the backend (e.g. constraint violation).
108    Query(String),
109    /// Structured server error with SQLSTATE and optional detail/hint fields.
110    QueryServer(PgServerError),
111    /// The query returned zero rows when at least one was expected.
112    NoRows,
113    /// I/O error (preserves inner error for chaining)
114    Io(std::io::Error),
115    /// Encoding error (parameter limit, etc.)
116    Encode(String),
117    /// Operation timed out (connection, acquire, query)
118    Timeout(String),
119    /// Pool exhausted — all connections are in use
120    PoolExhausted {
121        /// Maximum pool size that was reached.
122        max: usize,
123    },
124    /// Pool is closed and no longer accepting requests
125    PoolClosed,
126}
127
128/// Structured PostgreSQL server error fields.
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct PgServerError {
131    /// Severity level (e.g. `ERROR`, `FATAL`, `WARNING`).
132    pub severity: String,
133    /// SQLSTATE error code (e.g. `23505`).
134    pub code: String,
135    /// Human-readable message.
136    pub message: String,
137    /// Optional detailed description.
138    pub detail: Option<String>,
139    /// Optional hint from server.
140    pub hint: Option<String>,
141}
142
143impl From<crate::protocol::ErrorFields> for PgServerError {
144    fn from(value: crate::protocol::ErrorFields) -> Self {
145        Self {
146            severity: value.severity,
147            code: value.code,
148            message: value.message,
149            detail: value.detail,
150            hint: value.hint,
151        }
152    }
153}
154
155impl std::fmt::Display for PgError {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        match self {
158            PgError::Connection(e) => write!(f, "Connection error: {}", e),
159            PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
160            PgError::Auth(e) => write!(f, "Auth error: {}", e),
161            PgError::Query(e) => write!(f, "Query error: {}", e),
162            PgError::QueryServer(e) => write!(f, "Query error [{}]: {}", e.code, e.message),
163            PgError::NoRows => write!(f, "No rows returned"),
164            PgError::Io(e) => write!(f, "I/O error: {}", e),
165            PgError::Encode(e) => write!(f, "Encode error: {}", e),
166            PgError::Timeout(ctx) => write!(f, "Timeout: {}", ctx),
167            PgError::PoolExhausted { max } => write!(f, "Pool exhausted ({} max connections)", max),
168            PgError::PoolClosed => write!(f, "Connection pool is closed"),
169        }
170    }
171}
172
173impl std::error::Error for PgError {
174    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
175        match self {
176            PgError::Io(e) => Some(e),
177            _ => None,
178        }
179    }
180}
181
182impl From<std::io::Error> for PgError {
183    fn from(e: std::io::Error) -> Self {
184        PgError::Io(e)
185    }
186}
187
188impl From<crate::protocol::EncodeError> for PgError {
189    fn from(e: crate::protocol::EncodeError) -> Self {
190        PgError::Encode(e.to_string())
191    }
192}
193
194impl PgError {
195    /// Return structured server error fields when available.
196    pub fn server_error(&self) -> Option<&PgServerError> {
197        match self {
198            PgError::QueryServer(err) => Some(err),
199            _ => None,
200        }
201    }
202
203    /// Return SQLSTATE code when available.
204    pub fn sqlstate(&self) -> Option<&str> {
205        self.server_error().map(|e| e.code.as_str())
206    }
207
208    /// True when a cached prepared statement can be self-healed by clearing
209    /// local statement state and retrying once.
210    pub fn is_prepared_statement_retryable(&self) -> bool {
211        let Some(err) = self.server_error() else {
212            return false;
213        };
214
215        let code = err.code.as_str();
216        let message = err.message.to_ascii_lowercase();
217
218        // invalid_sql_statement_name
219        if code.eq_ignore_ascii_case("26000")
220            && message.contains("prepared statement")
221            && message.contains("does not exist")
222        {
223            return true;
224        }
225
226        // feature_not_supported + message heuristic used by PostgreSQL replans.
227        if code.eq_ignore_ascii_case("0A000") && message.contains("cached plan must be replanned") {
228            return true;
229        }
230
231        // Defensive message-only fallback for proxy/failover rewrites.
232        message.contains("cached plan must be replanned")
233    }
234
235    /// True when the error is a transient server condition that may succeed
236    /// on retry. Covers serialization failures, deadlocks, standby
237    /// unavailability, connection exceptions, and prepared-statement staleness.
238    ///
239    /// Callers should pair this with a bounded retry loop and backoff.
240    pub fn is_transient_server_error(&self) -> bool {
241        // Non-server errors that are inherently transient.
242        match self {
243            PgError::Timeout(_) => return true,
244            PgError::Io(io) => {
245                return matches!(
246                    io.kind(),
247                    std::io::ErrorKind::TimedOut
248                        | std::io::ErrorKind::ConnectionRefused
249                        | std::io::ErrorKind::ConnectionReset
250                        | std::io::ErrorKind::BrokenPipe
251                        | std::io::ErrorKind::Interrupted
252                );
253            }
254            PgError::Connection(_) => return true,
255            _ => {}
256        }
257
258        // Prepared-statement staleness is a subset of transient errors.
259        if self.is_prepared_statement_retryable() {
260            return true;
261        }
262
263        let Some(code) = self.sqlstate() else {
264            return false;
265        };
266
267        matches!(
268            code,
269            // serialization_failure — MVCC conflict, safe to retry
270            "40001"
271            // deadlock_detected — PG auto-aborts one participant
272            | "40P01"
273            // cannot_connect_now — hot-standby recovery in progress
274            | "57P03"
275            // admin_shutdown / crash_shutdown — server restarting
276            | "57P01"
277            | "57P02"
278        ) || code.starts_with("08") // connection_exception class
279    }
280}
281
282/// Result type for PostgreSQL operations.
283pub type PgResult<T> = Result<T, PgError>;
284
285/// Result of a query that returns rows (SELECT/GET).
286#[derive(Debug, Clone)]
287pub struct QueryResult {
288    /// Column names from RowDescription.
289    pub columns: Vec<String>,
290    /// Rows of text-decoded values (None = NULL).
291    pub rows: Vec<Vec<Option<String>>>,
292}
293
294/// PostgreSQL result-column wire format.
295///
296/// - `Text` (0): server sends textual column values.
297/// - `Binary` (1): server sends binary column values.
298#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
299pub enum ResultFormat {
300    /// Text format (`0`)
301    #[default]
302    Text,
303    /// Binary format (`1`)
304    Binary,
305}
306
307impl ResultFormat {
308    #[inline]
309    pub(crate) fn as_wire_code(self) -> i16 {
310        match self {
311            ResultFormat::Text => crate::protocol::PgEncoder::FORMAT_TEXT,
312            ResultFormat::Binary => crate::protocol::PgEncoder::FORMAT_BINARY,
313        }
314    }
315}
316
317/// SCRAM channel-binding policy during SASL negotiation.
318#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
319pub enum ScramChannelBindingMode {
320    /// Do not use `SCRAM-SHA-256-PLUS` even when available.
321    Disable,
322    /// Prefer `SCRAM-SHA-256-PLUS`, fallback to plain SCRAM if needed.
323    #[default]
324    Prefer,
325    /// Require `SCRAM-SHA-256-PLUS` and fail otherwise.
326    Require,
327}
328
329impl ScramChannelBindingMode {
330    /// Parse common config string values.
331    pub fn parse(value: &str) -> Option<Self> {
332        match value.trim().to_ascii_lowercase().as_str() {
333            "disable" | "off" | "false" | "no" => Some(Self::Disable),
334            "prefer" | "on" | "true" | "yes" => Some(Self::Prefer),
335            "require" | "required" => Some(Self::Require),
336            _ => None,
337        }
338    }
339}
340
341/// Enterprise authentication mechanisms initiated by PostgreSQL.
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum EnterpriseAuthMechanism {
344    /// Kerberos V5 (`AuthenticationKerberosV5`, auth code `2`).
345    KerberosV5,
346    /// GSSAPI (`AuthenticationGSS`, auth code `7`).
347    GssApi,
348    /// SSPI (`AuthenticationSSPI`, auth code `9`, primarily Windows servers).
349    Sspi,
350}
351
352/// Callback used to generate GSS/SSPI response tokens.
353///
354/// The callback receives:
355/// - negotiated enterprise auth mechanism
356/// - optional server challenge bytes (`None` for initial token)
357///
358/// It must return the client response token bytes to send in `GSSResponse`.
359pub type GssTokenProvider = fn(EnterpriseAuthMechanism, Option<&[u8]>) -> Result<Vec<u8>, String>;
360
361/// Structured token request for stateful Kerberos/GSS/SSPI providers.
362#[derive(Debug, Clone, Copy)]
363pub struct GssTokenRequest<'a> {
364    /// Stable per-handshake identifier so providers can keep per-connection state.
365    pub session_id: u64,
366    /// Negotiated enterprise auth mechanism.
367    pub mechanism: EnterpriseAuthMechanism,
368    /// Server challenge token (`None` for initial token).
369    pub server_token: Option<&'a [u8]>,
370}
371
372/// Stateful callback for Kerberos/GSS/SSPI response generation.
373///
374/// Use this when the underlying auth stack needs per-handshake context between
375/// `AuthenticationGSS` and `AuthenticationGSSContinue` messages.
376pub type GssTokenProviderEx =
377    Arc<dyn for<'a> Fn(GssTokenRequest<'a>) -> Result<Vec<u8>, String> + Send + Sync>;
378
379/// Password-auth mechanism policy.
380///
381/// Defaults allow all PostgreSQL password mechanisms for compatibility.
382#[derive(Debug, Clone, Copy, PartialEq, Eq)]
383pub struct AuthSettings {
384    /// Allow server-requested cleartext password auth.
385    pub allow_cleartext_password: bool,
386    /// Allow server-requested MD5 password auth.
387    pub allow_md5_password: bool,
388    /// Allow server-requested SCRAM auth.
389    pub allow_scram_sha_256: bool,
390    /// Allow server-requested Kerberos V5 auth flow.
391    pub allow_kerberos_v5: bool,
392    /// Allow server-requested GSSAPI auth flow.
393    pub allow_gssapi: bool,
394    /// Allow server-requested SSPI auth flow.
395    pub allow_sspi: bool,
396    /// SCRAM channel-binding requirement.
397    pub channel_binding: ScramChannelBindingMode,
398}
399
400impl Default for AuthSettings {
401    fn default() -> Self {
402        Self {
403            allow_cleartext_password: true,
404            allow_md5_password: true,
405            allow_scram_sha_256: true,
406            allow_kerberos_v5: false,
407            allow_gssapi: false,
408            allow_sspi: false,
409            channel_binding: ScramChannelBindingMode::Prefer,
410        }
411    }
412}
413
414impl AuthSettings {
415    /// Restrictive mode: SCRAM-only password auth.
416    pub fn scram_only() -> Self {
417        Self {
418            allow_cleartext_password: false,
419            allow_md5_password: false,
420            allow_scram_sha_256: true,
421            allow_kerberos_v5: false,
422            allow_gssapi: false,
423            allow_sspi: false,
424            channel_binding: ScramChannelBindingMode::Prefer,
425        }
426    }
427
428    /// Restrictive mode: enterprise Kerberos/GSS only (no password auth).
429    pub fn gssapi_only() -> Self {
430        Self {
431            allow_cleartext_password: false,
432            allow_md5_password: false,
433            allow_scram_sha_256: false,
434            allow_kerberos_v5: true,
435            allow_gssapi: true,
436            allow_sspi: true,
437            channel_binding: ScramChannelBindingMode::Prefer,
438        }
439    }
440
441    pub(crate) fn has_any_password_method(self) -> bool {
442        self.allow_cleartext_password || self.allow_md5_password || self.allow_scram_sha_256
443    }
444}
445
446/// TLS policy for connection establishment.
447#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
448pub enum TlsMode {
449    /// Do not attempt TLS.
450    #[default]
451    Disable,
452    /// Try TLS first; fallback to plaintext only when server has no TLS support.
453    Prefer,
454    /// Require TLS and fail if unavailable.
455    Require,
456}
457
458impl TlsMode {
459    /// Parse libpq-style `sslmode` values.
460    pub fn parse_sslmode(value: &str) -> Option<Self> {
461        match value.trim().to_ascii_lowercase().as_str() {
462            "disable" => Some(Self::Disable),
463            "allow" | "prefer" => Some(Self::Prefer),
464            "require" | "verify-ca" | "verify-full" => Some(Self::Require),
465            _ => None,
466        }
467    }
468}
469
470/// GSSAPI encryption mode for transport-level encryption via Kerberos.
471///
472/// Controls whether the driver attempts GSSAPI session encryption
473/// (GSSENCRequest) before falling back to TLS or plaintext.
474///
475/// See: PostgreSQL protocol §54.2.11 — GSSAPI Session Encryption.
476#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
477pub enum GssEncMode {
478    /// Never attempt GSSAPI encryption.
479    #[default]
480    Disable,
481    /// Try GSSAPI encryption first; fall back to TLS or plaintext.
482    Prefer,
483    /// Require GSSAPI encryption — fail if the server rejects GSSENCRequest.
484    Require,
485}
486
487impl GssEncMode {
488    /// Parse libpq-style `gssencmode` values.
489    pub fn parse_gssencmode(value: &str) -> Option<Self> {
490        match value.trim().to_ascii_lowercase().as_str() {
491            "disable" => Some(Self::Disable),
492            "prefer" => Some(Self::Prefer),
493            "require" => Some(Self::Require),
494            _ => None,
495        }
496    }
497}
498
499/// Advanced connection options for enterprise deployments.
500#[derive(Clone, Default)]
501pub struct ConnectOptions {
502    /// TLS mode for the primary connection.
503    pub tls_mode: TlsMode,
504    /// GSSAPI session encryption mode.
505    pub gss_enc_mode: GssEncMode,
506    /// Optional custom CA bundle (PEM) for TLS server validation.
507    pub tls_ca_cert_pem: Option<Vec<u8>>,
508    /// Optional mTLS client certificate/key config.
509    pub mtls: Option<TlsConfig>,
510    /// Optional callback for Kerberos/GSS/SSPI token generation.
511    pub gss_token_provider: Option<GssTokenProvider>,
512    /// Optional stateful Kerberos/GSS/SSPI token provider.
513    pub gss_token_provider_ex: Option<GssTokenProviderEx>,
514    /// Password-auth policy.
515    pub auth: AuthSettings,
516}
517
518impl std::fmt::Debug for ConnectOptions {
519    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520        f.debug_struct("ConnectOptions")
521            .field("tls_mode", &self.tls_mode)
522            .field("gss_enc_mode", &self.gss_enc_mode)
523            .field(
524                "tls_ca_cert_pem",
525                &self.tls_ca_cert_pem.as_ref().map(std::vec::Vec::len),
526            )
527            .field("mtls", &self.mtls.as_ref().map(|_| "<configured>"))
528            .field(
529                "gss_token_provider",
530                &self.gss_token_provider.as_ref().map(|_| "<configured>"),
531            )
532            .field(
533                "gss_token_provider_ex",
534                &self.gss_token_provider_ex.as_ref().map(|_| "<configured>"),
535            )
536            .field("auth", &self.auth)
537            .finish()
538    }
539}
540
541/// Combines the pure encoder (Layer 2) with async I/O (Layer 3).
542pub struct PgDriver {
543    #[allow(dead_code)]
544    connection: PgConnection,
545    /// Current RLS context, if set. Used for multi-tenant data isolation.
546    rls_context: Option<RlsContext>,
547}
548
549impl PgDriver {
550    /// Create a new driver with an existing connection.
551    pub fn new(connection: PgConnection) -> Self {
552        Self {
553            connection,
554            rls_context: None,
555        }
556    }
557
558    /// Builder pattern for ergonomic connection configuration.
559    /// # Example
560    /// ```ignore
561    /// let driver = PgDriver::builder()
562    ///     .host("localhost")
563    ///     .port(5432)
564    ///     .user("admin")
565    ///     .database("mydb")
566    ///     .password("secret")  // Optional
567    ///     .connect()
568    ///     .await?;
569    /// ```
570    pub fn builder() -> PgDriverBuilder {
571        PgDriverBuilder::new()
572    }
573
574    /// Connect to PostgreSQL and create a driver (trust mode, no password).
575    ///
576    /// # Arguments
577    ///
578    /// * `host` — PostgreSQL server hostname or IP.
579    /// * `port` — TCP port (typically 5432).
580    /// * `user` — PostgreSQL role name.
581    /// * `database` — Target database name.
582    pub async fn connect(host: &str, port: u16, user: &str, database: &str) -> PgResult<Self> {
583        let connection = PgConnection::connect(host, port, user, database).await?;
584        Ok(Self::new(connection))
585    }
586
587    /// Connect to PostgreSQL with password authentication.
588    /// Supports server-requested auth flow: cleartext, MD5, or SCRAM-SHA-256.
589    pub async fn connect_with_password(
590        host: &str,
591        port: u16,
592        user: &str,
593        database: &str,
594        password: &str,
595    ) -> PgResult<Self> {
596        let connection =
597            PgConnection::connect_with_password(host, port, user, database, Some(password)).await?;
598        Ok(Self::new(connection))
599    }
600
601    /// Connect with explicit security options.
602    pub async fn connect_with_options(
603        host: &str,
604        port: u16,
605        user: &str,
606        database: &str,
607        password: Option<&str>,
608        options: ConnectOptions,
609    ) -> PgResult<Self> {
610        let connection =
611            PgConnection::connect_with_options(host, port, user, database, password, options)
612                .await?;
613        Ok(Self::new(connection))
614    }
615
616    /// Connect using DATABASE_URL environment variable.
617    ///
618    /// Parses the URL format: `postgresql://user:password@host:port/database`
619    /// or `postgres://user:password@host:port/database`
620    ///
621    /// # Example
622    /// ```ignore
623    /// // Set DATABASE_URL=postgresql://user:pass@localhost:5432/mydb
624    /// let driver = PgDriver::connect_env().await?;
625    /// ```
626    pub async fn connect_env() -> PgResult<Self> {
627        let url = std::env::var("DATABASE_URL").map_err(|_| {
628            PgError::Connection("DATABASE_URL environment variable not set".to_string())
629        })?;
630        Self::connect_url(&url).await
631    }
632
633    /// Connect using a PostgreSQL connection URL.
634    ///
635    /// Parses the URL format: `postgresql://user:password@host:port/database?params`
636    /// or `postgres://user:password@host:port/database?params`
637    ///
638    /// Supports all enterprise query params (sslmode, auth_mode, gss_provider,
639    /// channel_binding, etc.) — same set as `PoolConfig::from_qail_config`.
640    ///
641    /// # Example
642    /// ```ignore
643    /// let driver = PgDriver::connect_url("postgresql://user:pass@localhost:5432/mydb?sslmode=require").await?;
644    /// ```
645    pub async fn connect_url(url: &str) -> PgResult<Self> {
646        let (host, port, user, database, password) = Self::parse_database_url(url)?;
647
648        // Parse enterprise query params using the shared helper from pool.rs.
649        let mut pool_cfg = pool::PoolConfig::new(&host, port, &user, &database);
650        if let Some(pw) = &password {
651            pool_cfg = pool_cfg.password(pw);
652        }
653        if let Some(query) = url.split('?').nth(1) {
654            pool::apply_url_query_params(&mut pool_cfg, query, &host)?;
655        }
656
657        let opts = ConnectOptions {
658            tls_mode: pool_cfg.tls_mode,
659            gss_enc_mode: pool_cfg.gss_enc_mode,
660            tls_ca_cert_pem: pool_cfg.tls_ca_cert_pem,
661            mtls: pool_cfg.mtls,
662            gss_token_provider: pool_cfg.gss_token_provider,
663            gss_token_provider_ex: pool_cfg.gss_token_provider_ex,
664            auth: pool_cfg.auth_settings,
665        };
666
667        Self::connect_with_options(&host, port, &user, &database, password.as_deref(), opts).await
668    }
669
670    /// Parse a PostgreSQL connection URL into components.
671    ///
672    /// Format: `postgresql://user:password@host:port/database`
673    /// or `postgres://user:password@host:port/database`
674    ///
675    /// URL percent-encoding is automatically decoded for user and password.
676    fn parse_database_url(url: &str) -> PgResult<(String, u16, String, String, Option<String>)> {
677        // Remove scheme (postgresql:// or postgres://)
678        let after_scheme = url.split("://").nth(1).ok_or_else(|| {
679            PgError::Connection("Invalid DATABASE_URL: missing scheme".to_string())
680        })?;
681
682        // Split into auth@host parts
683        let (auth_part, host_db_part) = if let Some(at_pos) = after_scheme.rfind('@') {
684            (Some(&after_scheme[..at_pos]), &after_scheme[at_pos + 1..])
685        } else {
686            (None, after_scheme)
687        };
688
689        // Parse auth (user:password)
690        let (user, password) = if let Some(auth) = auth_part {
691            let parts: Vec<&str> = auth.splitn(2, ':').collect();
692            if parts.len() == 2 {
693                // URL-decode both user and password
694                (
695                    Self::percent_decode(parts[0]),
696                    Some(Self::percent_decode(parts[1])),
697                )
698            } else {
699                (Self::percent_decode(parts[0]), None)
700            }
701        } else {
702            return Err(PgError::Connection(
703                "Invalid DATABASE_URL: missing user".to_string(),
704            ));
705        };
706
707        // Parse host:port/database (strip query string if present)
708        let (host_port, database) = if let Some(slash_pos) = host_db_part.find('/') {
709            let raw_db = &host_db_part[slash_pos + 1..];
710            // Strip ?query params — they're handled separately by connect_url
711            let db = raw_db.split('?').next().unwrap_or(raw_db).to_string();
712            (&host_db_part[..slash_pos], db)
713        } else {
714            return Err(PgError::Connection(
715                "Invalid DATABASE_URL: missing database name".to_string(),
716            ));
717        };
718
719        // Parse host:port
720        let (host, port) = if let Some(colon_pos) = host_port.rfind(':') {
721            let port_str = &host_port[colon_pos + 1..];
722            let port = port_str
723                .parse::<u16>()
724                .map_err(|_| PgError::Connection(format!("Invalid port: {}", port_str)))?;
725            (host_port[..colon_pos].to_string(), port)
726        } else {
727            (host_port.to_string(), 5432) // Default PostgreSQL port
728        };
729
730        Ok((host, port, user, database, password))
731    }
732
733    /// Decode URL percent-encoded string.
734    /// Handles common encodings: %20 (space), %2B (+), %3D (=), %40 (@), %2F (/), etc.
735    fn percent_decode(s: &str) -> String {
736        let mut result = String::with_capacity(s.len());
737        let mut chars = s.chars().peekable();
738
739        while let Some(c) = chars.next() {
740            if c == '%' {
741                // Try to parse next two chars as hex
742                let hex: String = chars.by_ref().take(2).collect();
743                if hex.len() == 2
744                    && let Ok(byte) = u8::from_str_radix(&hex, 16)
745                {
746                    result.push(byte as char);
747                    continue;
748                }
749                // If parsing failed, keep original
750                result.push('%');
751                result.push_str(&hex);
752            } else if c == '+' {
753                // '+' often represents space in query strings (form encoding)
754                // But in path components, keep as-is. PostgreSQL URLs use path encoding.
755                result.push('+');
756            } else {
757                result.push(c);
758            }
759        }
760
761        result
762    }
763
764    /// Connect to PostgreSQL with a connection timeout.
765    /// If the connection cannot be established within the timeout, returns an error.
766    /// # Example
767    /// ```ignore
768    /// use std::time::Duration;
769    /// let driver = PgDriver::connect_with_timeout(
770    ///     "localhost", 5432, "user", "db", "password",
771    ///     Duration::from_secs(5)
772    /// ).await?;
773    /// ```
774    pub async fn connect_with_timeout(
775        host: &str,
776        port: u16,
777        user: &str,
778        database: &str,
779        password: &str,
780        timeout: std::time::Duration,
781    ) -> PgResult<Self> {
782        tokio::time::timeout(
783            timeout,
784            Self::connect_with_password(host, port, user, database, password),
785        )
786        .await
787        .map_err(|_| PgError::Timeout(format!("connection after {:?}", timeout)))?
788    }
789    /// Clear the prepared statement cache.
790    /// Frees memory by removing all cached statements.
791    /// Note: Statements remain on the PostgreSQL server until connection closes.
792    pub fn clear_cache(&mut self) {
793        self.connection.clear_prepared_statement_state();
794    }
795
796    /// Get cache statistics.
797    /// Returns (current_size, max_capacity).
798    pub fn cache_stats(&self) -> (usize, usize) {
799        (
800            self.connection.stmt_cache.len(),
801            self.connection.stmt_cache.cap().get(),
802        )
803    }
804
805    /// Execute a QAIL command and fetch all rows (CACHED + ZERO-ALLOC).
806    /// **Default method** - uses prepared statement caching for best performance.
807    /// On first call: sends Parse + Bind + Execute + Sync
808    /// On subsequent calls with same SQL: sends only Bind + Execute (SKIPS Parse!)
809    /// Uses LRU cache with max 1000 statements (auto-evicts oldest).
810    pub async fn fetch_all(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
811        self.fetch_all_with_format(cmd, ResultFormat::Text).await
812    }
813
814    /// Execute a QAIL command and fetch all rows using a specific result format.
815    ///
816    /// `result_format` controls server result-column encoding:
817    /// - [`ResultFormat::Text`] for standard text decoding.
818    /// - [`ResultFormat::Binary`] for binary wire values.
819    pub async fn fetch_all_with_format(
820        &mut self,
821        cmd: &Qail,
822        result_format: ResultFormat,
823    ) -> PgResult<Vec<PgRow>> {
824        // Delegate to cached-by-default behavior.
825        self.fetch_all_cached_with_format(cmd, result_format).await
826    }
827
828    /// Execute a QAIL command and fetch all rows as a typed struct.
829    /// Requires the target type to implement `QailRow` trait.
830    ///
831    /// # Example
832    /// ```ignore
833    /// let users: Vec<User> = driver.fetch_typed::<User>(&query).await?;
834    /// ```
835    pub async fn fetch_typed<T: row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Vec<T>> {
836        let rows = self.fetch_all(cmd).await?;
837        Ok(rows.iter().map(T::from_row).collect())
838    }
839
840    /// Execute a QAIL command and fetch a single row as a typed struct.
841    /// Returns None if no rows are returned.
842    pub async fn fetch_one_typed<T: row::QailRow>(&mut self, cmd: &Qail) -> PgResult<Option<T>> {
843        let rows = self.fetch_all(cmd).await?;
844        Ok(rows.first().map(T::from_row))
845    }
846
847    /// Execute a QAIL command and fetch all rows (UNCACHED).
848    /// Sends Parse + Bind + Execute on every call.
849    /// Use for one-off queries or when caching is not desired.
850    ///
851    /// Optimized: encodes wire bytes into reusable write_buf (zero-alloc).
852    pub async fn fetch_all_uncached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
853        self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
854            .await
855    }
856
857    /// Execute a QAIL command and fetch all rows (UNCACHED) with explicit result format.
858    pub async fn fetch_all_uncached_with_format(
859        &mut self,
860        cmd: &Qail,
861        result_format: ResultFormat,
862    ) -> PgResult<Vec<PgRow>> {
863        use crate::protocol::AstEncoder;
864
865        AstEncoder::encode_cmd_reuse_into_with_result_format(
866            cmd,
867            &mut self.connection.sql_buf,
868            &mut self.connection.params_buf,
869            &mut self.connection.write_buf,
870            result_format.as_wire_code(),
871        )
872        .map_err(|e| PgError::Encode(e.to_string()))?;
873
874        self.connection.flush_write_buf().await?;
875
876        let mut rows: Vec<PgRow> = Vec::with_capacity(32);
877        let mut column_info: Option<Arc<ColumnInfo>> = None;
878
879        let mut error: Option<PgError> = None;
880
881        loop {
882            let msg = self.connection.recv().await?;
883            match msg {
884                crate::protocol::BackendMessage::ParseComplete
885                | crate::protocol::BackendMessage::BindComplete => {}
886                crate::protocol::BackendMessage::RowDescription(fields) => {
887                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
888                }
889                crate::protocol::BackendMessage::DataRow(data) => {
890                    if error.is_none() {
891                        rows.push(PgRow {
892                            columns: data,
893                            column_info: column_info.clone(),
894                        });
895                    }
896                }
897                crate::protocol::BackendMessage::CommandComplete(_) => {}
898                crate::protocol::BackendMessage::ReadyForQuery(_) => {
899                    if let Some(err) = error {
900                        return Err(err);
901                    }
902                    return Ok(rows);
903                }
904                crate::protocol::BackendMessage::ErrorResponse(err) => {
905                    if error.is_none() {
906                        error = Some(PgError::QueryServer(err.into()));
907                    }
908                }
909                _ => {}
910            }
911        }
912    }
913
914    /// Execute a QAIL command and fetch all rows (FAST VERSION).
915    /// Uses optimized recv_with_data_fast for faster response parsing.
916    /// Skips column metadata collection for maximum speed.
917    pub async fn fetch_all_fast(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
918        self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
919            .await
920    }
921
922    /// Execute a QAIL command and fetch all rows (FAST VERSION) with explicit result format.
923    pub async fn fetch_all_fast_with_format(
924        &mut self,
925        cmd: &Qail,
926        result_format: ResultFormat,
927    ) -> PgResult<Vec<PgRow>> {
928        use crate::protocol::AstEncoder;
929
930        AstEncoder::encode_cmd_reuse_into_with_result_format(
931            cmd,
932            &mut self.connection.sql_buf,
933            &mut self.connection.params_buf,
934            &mut self.connection.write_buf,
935            result_format.as_wire_code(),
936        )
937        .map_err(|e| PgError::Encode(e.to_string()))?;
938
939        self.connection.flush_write_buf().await?;
940
941        // Collect results using FAST receiver
942        let mut rows: Vec<PgRow> = Vec::with_capacity(32);
943        let mut error: Option<PgError> = None;
944
945        loop {
946            let res = self.connection.recv_with_data_fast().await;
947            match res {
948                Ok((msg_type, data)) => {
949                    match msg_type {
950                        b'D' => {
951                            // DataRow
952                            if error.is_none()
953                                && let Some(columns) = data
954                            {
955                                rows.push(PgRow {
956                                    columns,
957                                    column_info: None, // Skip metadata for speed
958                                });
959                            }
960                        }
961                        b'Z' => {
962                            // ReadyForQuery
963                            if let Some(err) = error {
964                                return Err(err);
965                            }
966                            return Ok(rows);
967                        }
968                        _ => {} // 1, 2, C, T - skip Parse/Bind/CommandComplete/RowDescription
969                    }
970                }
971                Err(e) => {
972                    // recv_with_data_fast returns Err on ErrorResponse automatically.
973                    // We need to capture it and continue draining.
974                    // BUT recv_with_data_fast doesn't return the error *message type* if it fails.
975                    // It returns PgError::Query(msg).
976                    // So we capture the error, but we must continue RECVing until ReadyForQuery.
977                    // However, recv_with_data_fast will KEEP returning Err(Query) if the buffer has E?
978                    // No, recv_with_data_fast consumes the E message before returning Err.
979
980                    if error.is_none() {
981                        error = Some(e);
982                    }
983                    // Continue loop to drain until ReadyForQuery...
984                    // BUT wait, does recv_with_data_fast handle the *rest* of the stream?
985                    // If we call it again, it will read the NEXT message.
986                    // So we just continue.
987                }
988            }
989        }
990    }
991
992    /// Execute a QAIL command and fetch one row.
993    pub async fn fetch_one(&mut self, cmd: &Qail) -> PgResult<PgRow> {
994        let rows = self.fetch_all(cmd).await?;
995        rows.into_iter().next().ok_or(PgError::NoRows)
996    }
997
998    /// Execute a QAIL command with PREPARED STATEMENT CACHING.
999    /// Like fetch_all(), but caches the prepared statement on the server.
1000    /// On first call: sends Parse + Describe + Bind + Execute + Sync
1001    /// On subsequent calls: sends only Bind + Execute + Sync (SKIPS Parse!)
1002    /// Column metadata (RowDescription) is cached alongside the statement
1003    /// so that by-name column access works on every call.
1004    ///
1005    /// Optimized: all wire messages are batched into a single write_all syscall.
1006    pub async fn fetch_all_cached(&mut self, cmd: &Qail) -> PgResult<Vec<PgRow>> {
1007        self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
1008            .await
1009    }
1010
1011    /// Execute a QAIL command with prepared statement caching and explicit result format.
1012    pub async fn fetch_all_cached_with_format(
1013        &mut self,
1014        cmd: &Qail,
1015        result_format: ResultFormat,
1016    ) -> PgResult<Vec<PgRow>> {
1017        let mut retried = false;
1018        loop {
1019            match self
1020                .fetch_all_cached_with_format_once(cmd, result_format)
1021                .await
1022            {
1023                Ok(rows) => return Ok(rows),
1024                Err(err) if !retried && err.is_prepared_statement_retryable() => {
1025                    retried = true;
1026                    self.connection.clear_prepared_statement_state();
1027                }
1028                Err(err) => return Err(err),
1029            }
1030        }
1031    }
1032
1033    async fn fetch_all_cached_with_format_once(
1034        &mut self,
1035        cmd: &Qail,
1036        result_format: ResultFormat,
1037    ) -> PgResult<Vec<PgRow>> {
1038        use crate::protocol::AstEncoder;
1039        use std::collections::hash_map::DefaultHasher;
1040        use std::hash::{Hash, Hasher};
1041
1042        self.connection.sql_buf.clear();
1043        self.connection.params_buf.clear();
1044
1045        // Encode SQL to reusable buffer
1046        match cmd.action {
1047            qail_core::ast::Action::Get | qail_core::ast::Action::With => {
1048                crate::protocol::ast_encoder::dml::encode_select(
1049                    cmd,
1050                    &mut self.connection.sql_buf,
1051                    &mut self.connection.params_buf,
1052                )?;
1053            }
1054            qail_core::ast::Action::Add => {
1055                crate::protocol::ast_encoder::dml::encode_insert(
1056                    cmd,
1057                    &mut self.connection.sql_buf,
1058                    &mut self.connection.params_buf,
1059                )?;
1060            }
1061            qail_core::ast::Action::Set => {
1062                crate::protocol::ast_encoder::dml::encode_update(
1063                    cmd,
1064                    &mut self.connection.sql_buf,
1065                    &mut self.connection.params_buf,
1066                )?;
1067            }
1068            qail_core::ast::Action::Del => {
1069                crate::protocol::ast_encoder::dml::encode_delete(
1070                    cmd,
1071                    &mut self.connection.sql_buf,
1072                    &mut self.connection.params_buf,
1073                )?;
1074            }
1075            _ => {
1076                // Fallback for unsupported actions
1077                let (sql, params) =
1078                    AstEncoder::encode_cmd_sql(cmd).map_err(|e| PgError::Encode(e.to_string()))?;
1079                let raw_rows = self
1080                    .connection
1081                    .query_cached_with_result_format(&sql, &params, result_format.as_wire_code())
1082                    .await?;
1083                return Ok(raw_rows
1084                    .into_iter()
1085                    .map(|data| PgRow {
1086                        columns: data,
1087                        column_info: None,
1088                    })
1089                    .collect());
1090            }
1091        }
1092
1093        let mut hasher = DefaultHasher::new();
1094        self.connection.sql_buf.hash(&mut hasher);
1095        let sql_hash = hasher.finish();
1096
1097        let is_cache_miss = !self.connection.stmt_cache.contains(&sql_hash);
1098
1099        // Build ALL wire messages into write_buf (single syscall)
1100        self.connection.write_buf.clear();
1101
1102        let stmt_name = if let Some(name) = self.connection.stmt_cache.get(&sql_hash) {
1103            name
1104        } else {
1105            let name = format!("qail_{:x}", sql_hash);
1106
1107            // Evict LRU before borrowing sql_buf to avoid borrow conflict
1108            self.connection.evict_prepared_if_full();
1109
1110            let sql_str = std::str::from_utf8(&self.connection.sql_buf).unwrap_or("");
1111
1112            // Buffer Parse + Describe(Statement) for first call
1113            use crate::protocol::PgEncoder;
1114            let parse_msg = PgEncoder::encode_parse(&name, sql_str, &[]);
1115            let describe_msg = PgEncoder::encode_describe(false, &name);
1116            self.connection.write_buf.extend_from_slice(&parse_msg);
1117            self.connection.write_buf.extend_from_slice(&describe_msg);
1118
1119            self.connection.stmt_cache.put(sql_hash, name.clone());
1120            self.connection
1121                .prepared_statements
1122                .insert(name.clone(), sql_str.to_string());
1123
1124            name
1125        };
1126
1127        // Append Bind + Execute + Sync to same buffer
1128        use crate::protocol::PgEncoder;
1129        PgEncoder::encode_bind_to_with_result_format(
1130            &mut self.connection.write_buf,
1131            &stmt_name,
1132            &self.connection.params_buf,
1133            result_format.as_wire_code(),
1134        )
1135        .map_err(|e| PgError::Encode(e.to_string()))?;
1136        PgEncoder::encode_execute_to(&mut self.connection.write_buf);
1137        PgEncoder::encode_sync_to(&mut self.connection.write_buf);
1138
1139        // Single write_all syscall for all messages
1140        self.connection.flush_write_buf().await?;
1141
1142        // On cache hit, use the previously cached ColumnInfo
1143        let cached_column_info = self.connection.column_info_cache.get(&sql_hash).cloned();
1144
1145        let mut rows: Vec<PgRow> = Vec::with_capacity(32);
1146        let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
1147        let mut error: Option<PgError> = None;
1148
1149        loop {
1150            let msg = self.connection.recv().await?;
1151            match msg {
1152                crate::protocol::BackendMessage::ParseComplete
1153                | crate::protocol::BackendMessage::BindComplete => {}
1154                crate::protocol::BackendMessage::ParameterDescription(_) => {
1155                    // Sent after Describe(Statement) — ignore
1156                }
1157                crate::protocol::BackendMessage::RowDescription(fields) => {
1158                    // Received after Describe(Statement) on cache miss
1159                    let info = Arc::new(ColumnInfo::from_fields(&fields));
1160                    if is_cache_miss {
1161                        self.connection
1162                            .column_info_cache
1163                            .insert(sql_hash, info.clone());
1164                    }
1165                    column_info = Some(info);
1166                }
1167                crate::protocol::BackendMessage::DataRow(data) => {
1168                    if error.is_none() {
1169                        rows.push(PgRow {
1170                            columns: data,
1171                            column_info: column_info.clone(),
1172                        });
1173                    }
1174                }
1175                crate::protocol::BackendMessage::CommandComplete(_) => {}
1176                crate::protocol::BackendMessage::NoData => {
1177                    // Sent by Describe for statements that return no data (e.g. pure UPDATE without RETURNING)
1178                }
1179                crate::protocol::BackendMessage::ReadyForQuery(_) => {
1180                    if let Some(err) = error {
1181                        return Err(err);
1182                    }
1183                    return Ok(rows);
1184                }
1185                crate::protocol::BackendMessage::ErrorResponse(err) => {
1186                    if error.is_none() {
1187                        let query_err = PgError::QueryServer(err.into());
1188                        if query_err.is_prepared_statement_retryable() {
1189                            self.connection.clear_prepared_statement_state();
1190                        }
1191                        error = Some(query_err);
1192                    }
1193                }
1194                _ => {}
1195            }
1196        }
1197    }
1198
1199    /// Execute a QAIL command (for mutations) - ZERO-ALLOC.
1200    pub async fn execute(&mut self, cmd: &Qail) -> PgResult<u64> {
1201        use crate::protocol::AstEncoder;
1202
1203        let wire_bytes = AstEncoder::encode_cmd_reuse(
1204            cmd,
1205            &mut self.connection.sql_buf,
1206            &mut self.connection.params_buf,
1207        )
1208        .map_err(|e| PgError::Encode(e.to_string()))?;
1209
1210        self.connection.send_bytes(&wire_bytes).await?;
1211
1212        let mut affected = 0u64;
1213        let mut error: Option<PgError> = None;
1214
1215        loop {
1216            let msg = self.connection.recv().await?;
1217            match msg {
1218                crate::protocol::BackendMessage::ParseComplete
1219                | crate::protocol::BackendMessage::BindComplete => {}
1220                crate::protocol::BackendMessage::RowDescription(_) => {}
1221                crate::protocol::BackendMessage::DataRow(_) => {}
1222                crate::protocol::BackendMessage::CommandComplete(tag) => {
1223                    if error.is_none()
1224                        && let Some(n) = tag.split_whitespace().last()
1225                    {
1226                        affected = n.parse().unwrap_or(0);
1227                    }
1228                }
1229                crate::protocol::BackendMessage::ReadyForQuery(_) => {
1230                    if let Some(err) = error {
1231                        return Err(err);
1232                    }
1233                    return Ok(affected);
1234                }
1235                crate::protocol::BackendMessage::ErrorResponse(err) => {
1236                    if error.is_none() {
1237                        error = Some(PgError::QueryServer(err.into()));
1238                    }
1239                }
1240                _ => {}
1241            }
1242        }
1243    }
1244
1245    /// Query a QAIL command and return rows (for SELECT/GET queries).
1246    /// Like `execute()` but collects RowDescription + DataRow messages
1247    /// instead of discarding them.
1248    pub async fn query_ast(&mut self, cmd: &Qail) -> PgResult<QueryResult> {
1249        self.query_ast_with_format(cmd, ResultFormat::Text).await
1250    }
1251
1252    /// Query a QAIL command and return rows using an explicit result format.
1253    pub async fn query_ast_with_format(
1254        &mut self,
1255        cmd: &Qail,
1256        result_format: ResultFormat,
1257    ) -> PgResult<QueryResult> {
1258        use crate::protocol::AstEncoder;
1259
1260        let wire_bytes = AstEncoder::encode_cmd_reuse_with_result_format(
1261            cmd,
1262            &mut self.connection.sql_buf,
1263            &mut self.connection.params_buf,
1264            result_format.as_wire_code(),
1265        )
1266        .map_err(|e| PgError::Encode(e.to_string()))?;
1267
1268        self.connection.send_bytes(&wire_bytes).await?;
1269
1270        let mut columns: Vec<String> = Vec::new();
1271        let mut rows: Vec<Vec<Option<String>>> = Vec::new();
1272        let mut error: Option<PgError> = None;
1273
1274        loop {
1275            let msg = self.connection.recv().await?;
1276            match msg {
1277                crate::protocol::BackendMessage::ParseComplete
1278                | crate::protocol::BackendMessage::BindComplete => {}
1279                crate::protocol::BackendMessage::RowDescription(fields) => {
1280                    columns = fields.into_iter().map(|f| f.name).collect();
1281                }
1282                crate::protocol::BackendMessage::DataRow(data) => {
1283                    if error.is_none() {
1284                        let row: Vec<Option<String>> = data
1285                            .into_iter()
1286                            .map(|col| col.map(|bytes| String::from_utf8_lossy(&bytes).to_string()))
1287                            .collect();
1288                        rows.push(row);
1289                    }
1290                }
1291                crate::protocol::BackendMessage::CommandComplete(_) => {}
1292                crate::protocol::BackendMessage::NoData => {}
1293                crate::protocol::BackendMessage::ReadyForQuery(_) => {
1294                    if let Some(err) = error {
1295                        return Err(err);
1296                    }
1297                    return Ok(QueryResult { columns, rows });
1298                }
1299                crate::protocol::BackendMessage::ErrorResponse(err) => {
1300                    if error.is_none() {
1301                        error = Some(PgError::QueryServer(err.into()));
1302                    }
1303                }
1304                _ => {}
1305            }
1306        }
1307    }
1308
1309    // ==================== TRANSACTION CONTROL ====================
1310
1311    /// Begin a transaction (AST-native).
1312    pub async fn begin(&mut self) -> PgResult<()> {
1313        self.connection.begin_transaction().await
1314    }
1315
1316    /// Commit the current transaction (AST-native).
1317    pub async fn commit(&mut self) -> PgResult<()> {
1318        self.connection.commit().await
1319    }
1320
1321    /// Rollback the current transaction (AST-native).
1322    pub async fn rollback(&mut self) -> PgResult<()> {
1323        self.connection.rollback().await
1324    }
1325
1326    /// Create a named savepoint within the current transaction.
1327    /// Savepoints allow partial rollback within a transaction.
1328    /// Use `rollback_to()` to return to this savepoint.
1329    /// # Example
1330    /// ```ignore
1331    /// driver.begin().await?;
1332    /// driver.execute(&insert1).await?;
1333    /// driver.savepoint("sp1").await?;
1334    /// driver.execute(&insert2).await?;
1335    /// driver.rollback_to("sp1").await?; // Undo insert2, keep insert1
1336    /// driver.commit().await?;
1337    /// ```
1338    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
1339        self.connection.savepoint(name).await
1340    }
1341
1342    /// Rollback to a previously created savepoint.
1343    /// Discards all changes since the named savepoint was created,
1344    /// but keeps the transaction open.
1345    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
1346        self.connection.rollback_to(name).await
1347    }
1348
1349    /// Release a savepoint (free resources, if no longer needed).
1350    /// After release, the savepoint cannot be rolled back to.
1351    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
1352        self.connection.release_savepoint(name).await
1353    }
1354
1355    // ==================== BATCH TRANSACTIONS ====================
1356
1357    /// Execute multiple commands in a single atomic transaction.
1358    /// All commands succeed or all are rolled back.
1359    /// # Example
1360    /// ```ignore
1361    /// let cmds = vec![
1362    ///     Qail::add("users").columns(["name"]).values(["Alice"]),
1363    ///     Qail::add("users").columns(["name"]).values(["Bob"]),
1364    /// ];
1365    /// let results = driver.execute_batch(&cmds).await?;
1366    /// // results = [1, 1] (rows affected)
1367    /// ```
1368    pub async fn execute_batch(&mut self, cmds: &[Qail]) -> PgResult<Vec<u64>> {
1369        self.begin().await?;
1370        let mut results = Vec::with_capacity(cmds.len());
1371        for cmd in cmds {
1372            match self.execute(cmd).await {
1373                Ok(n) => results.push(n),
1374                Err(e) => {
1375                    self.rollback().await?;
1376                    return Err(e);
1377                }
1378            }
1379        }
1380        self.commit().await?;
1381        Ok(results)
1382    }
1383
1384    // ==================== STATEMENT TIMEOUT ====================
1385
1386    /// Set statement timeout for this connection (in milliseconds).
1387    /// # Example
1388    /// ```ignore
1389    /// driver.set_statement_timeout(30_000).await?; // 30 seconds
1390    /// ```
1391    pub async fn set_statement_timeout(&mut self, ms: u32) -> PgResult<()> {
1392        self.execute_raw(&format!("SET statement_timeout = {}", ms))
1393            .await
1394    }
1395
1396    /// Reset statement timeout to default (no limit).
1397    pub async fn reset_statement_timeout(&mut self) -> PgResult<()> {
1398        self.execute_raw("RESET statement_timeout").await
1399    }
1400
1401    // ==================== RLS (MULTI-TENANT) ====================
1402
1403    /// Set the RLS context for multi-tenant data isolation.
1404    ///
1405    /// Configures PostgreSQL session variables (`app.current_operator_id`, etc.)
1406    /// so that RLS policies automatically filter data by tenant.
1407    ///
1408    /// Since `PgDriver` takes `&mut self`, the borrow checker guarantees
1409    /// that `set_config` and all subsequent queries execute on the **same
1410    /// connection** — no pool race conditions possible.
1411    ///
1412    /// # Example
1413    /// ```ignore
1414    /// driver.set_rls_context(RlsContext::operator("op-123")).await?;
1415    /// let orders = driver.fetch_all(&Qail::get("orders")).await?;
1416    /// // orders only contains rows where operator_id = 'op-123'
1417    /// ```
1418    pub async fn set_rls_context(&mut self, ctx: rls::RlsContext) -> PgResult<()> {
1419        let sql = rls::context_to_sql(&ctx);
1420        self.execute_raw(&sql).await?;
1421        self.rls_context = Some(ctx);
1422        Ok(())
1423    }
1424
1425    /// Clear the RLS context, resetting session variables to safe defaults.
1426    ///
1427    /// After clearing, all RLS-protected queries will return zero rows
1428    /// (empty operator_id matches nothing).
1429    pub async fn clear_rls_context(&mut self) -> PgResult<()> {
1430        self.execute_raw(rls::reset_sql()).await?;
1431        self.rls_context = None;
1432        Ok(())
1433    }
1434
1435    /// Get the current RLS context, if any.
1436    pub fn rls_context(&self) -> Option<&rls::RlsContext> {
1437        self.rls_context.as_ref()
1438    }
1439
1440    // ==================== PIPELINE (BATCH) ====================
1441
1442    /// Execute multiple Qail ASTs in a single network round-trip (PIPELINING).
1443    /// # Example
1444    /// ```ignore
1445    /// let cmds: Vec<Qail> = (1..=1000)
1446    ///     .map(|i| Qail::get("harbors").columns(["id", "name"]).limit(i))
1447    ///     .collect();
1448    /// let count = driver.pipeline_batch(&cmds).await?;
1449    /// assert_eq!(count, 1000);
1450    /// ```
1451    pub async fn pipeline_batch(&mut self, cmds: &[Qail]) -> PgResult<usize> {
1452        self.connection.pipeline_ast_fast(cmds).await
1453    }
1454
1455    /// Execute multiple Qail ASTs and return full row data.
1456    pub async fn pipeline_fetch(&mut self, cmds: &[Qail]) -> PgResult<Vec<Vec<PgRow>>> {
1457        let raw_results = self.connection.pipeline_ast(cmds).await?;
1458
1459        let results: Vec<Vec<PgRow>> = raw_results
1460            .into_iter()
1461            .map(|rows| {
1462                rows.into_iter()
1463                    .map(|columns| PgRow {
1464                        columns,
1465                        column_info: None,
1466                    })
1467                    .collect()
1468            })
1469            .collect();
1470
1471        Ok(results)
1472    }
1473
1474    /// Prepare a SQL statement for repeated execution.
1475    pub async fn prepare(&mut self, sql: &str) -> PgResult<PreparedStatement> {
1476        self.connection.prepare(sql).await
1477    }
1478
1479    /// Execute a prepared statement pipeline in FAST mode (count only).
1480    pub async fn pipeline_prepared_fast(
1481        &mut self,
1482        stmt: &PreparedStatement,
1483        params_batch: &[Vec<Option<Vec<u8>>>],
1484    ) -> PgResult<usize> {
1485        self.connection
1486            .pipeline_prepared_fast(stmt, params_batch)
1487            .await
1488    }
1489
1490    // ==================== LEGACY/BOOTSTRAP ====================
1491
1492    /// Execute a raw SQL string.
1493    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
1494    /// Use for bootstrap DDL only (e.g., migration table creation).
1495    /// For transactions, use `begin()`, `commit()`, `rollback()`.
1496    pub async fn execute_raw(&mut self, sql: &str) -> PgResult<()> {
1497        // Reject literal NULL bytes - they corrupt PostgreSQL connection state
1498        if sql.as_bytes().contains(&0) {
1499            return Err(crate::PgError::Protocol(
1500                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
1501            ));
1502        }
1503        self.connection.execute_simple(sql).await
1504    }
1505
1506    /// Execute a raw SQL query and return rows.
1507    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
1508    /// Use for bootstrap/admin queries only.
1509    pub async fn fetch_raw(&mut self, sql: &str) -> PgResult<Vec<PgRow>> {
1510        if sql.as_bytes().contains(&0) {
1511            return Err(crate::PgError::Protocol(
1512                "SQL contains NULL byte (0x00) which is invalid in PostgreSQL".to_string(),
1513            ));
1514        }
1515
1516        use crate::protocol::PgEncoder;
1517        use tokio::io::AsyncWriteExt;
1518
1519        // Use simple query protocol (no prepared statements)
1520        let msg = PgEncoder::encode_query_string(sql);
1521        self.connection.stream.write_all(&msg).await?;
1522
1523        let mut rows: Vec<PgRow> = Vec::new();
1524        let mut column_info: Option<std::sync::Arc<ColumnInfo>> = None;
1525
1526        let mut error: Option<PgError> = None;
1527
1528        loop {
1529            let msg = self.connection.recv().await?;
1530            match msg {
1531                crate::protocol::BackendMessage::RowDescription(fields) => {
1532                    column_info = Some(std::sync::Arc::new(ColumnInfo::from_fields(&fields)));
1533                }
1534                crate::protocol::BackendMessage::DataRow(data) => {
1535                    if error.is_none() {
1536                        rows.push(PgRow {
1537                            columns: data,
1538                            column_info: column_info.clone(),
1539                        });
1540                    }
1541                }
1542                crate::protocol::BackendMessage::CommandComplete(_) => {}
1543                crate::protocol::BackendMessage::ReadyForQuery(_) => {
1544                    if let Some(err) = error {
1545                        return Err(err);
1546                    }
1547                    return Ok(rows);
1548                }
1549                crate::protocol::BackendMessage::ErrorResponse(err) => {
1550                    if error.is_none() {
1551                        error = Some(PgError::QueryServer(err.into()));
1552                    }
1553                }
1554                _ => {}
1555            }
1556        }
1557    }
1558
1559    /// Bulk insert data using PostgreSQL COPY protocol (AST-native).
1560    /// Uses a Qail::Add to get validated table and column names from the AST,
1561    /// not user-provided strings. This is the sound, AST-native approach.
1562    /// # Example
1563    /// ```ignore
1564    /// // Create a Qail::Add to define table and columns
1565    /// let cmd = Qail::add("users")
1566    ///     .columns(["id", "name", "email"]);
1567    /// // Bulk insert rows
1568    /// let rows: Vec<Vec<Value>> = vec![
1569    ///     vec![Value::Int(1), Value::String("Alice"), Value::String("alice@ex.com")],
1570    ///     vec![Value::Int(2), Value::String("Bob"), Value::String("bob@ex.com")],
1571    /// ];
1572    /// driver.copy_bulk(&cmd, &rows).await?;
1573    /// ```
1574    pub async fn copy_bulk(
1575        &mut self,
1576        cmd: &Qail,
1577        rows: &[Vec<qail_core::ast::Value>],
1578    ) -> PgResult<u64> {
1579        use qail_core::ast::Action;
1580
1581        if cmd.action != Action::Add {
1582            return Err(PgError::Query(
1583                "copy_bulk requires Qail::Add action".to_string(),
1584            ));
1585        }
1586
1587        let table = &cmd.table;
1588
1589        let columns: Vec<String> = cmd
1590            .columns
1591            .iter()
1592            .filter_map(|expr| {
1593                use qail_core::ast::Expr;
1594                match expr {
1595                    Expr::Named(name) => Some(name.clone()),
1596                    Expr::Aliased { name, .. } => Some(name.clone()),
1597                    Expr::Star => None, // Can't COPY with *
1598                    _ => None,
1599                }
1600            })
1601            .collect();
1602
1603        if columns.is_empty() {
1604            return Err(PgError::Query(
1605                "copy_bulk requires columns in Qail".to_string(),
1606            ));
1607        }
1608
1609        // Use optimized COPY path: direct Value → bytes encoding, single syscall
1610        self.connection.copy_in_fast(table, &columns, rows).await
1611    }
1612
1613    /// **Fastest** bulk insert using pre-encoded COPY data.
1614    /// Accepts raw COPY text format bytes. Use when caller has already
1615    /// encoded rows to avoid any encoding overhead.
1616    /// # Format
1617    /// Data should be tab-separated rows with newlines (COPY text format):
1618    /// `1\thello\t3.14\n2\tworld\t2.71\n`
1619    /// # Example
1620    /// ```ignore
1621    /// let cmd = Qail::add("users").columns(["id", "name"]);
1622    /// let data = b"1\tAlice\n2\tBob\n";
1623    /// driver.copy_bulk_bytes(&cmd, data).await?;
1624    /// ```
1625    pub async fn copy_bulk_bytes(&mut self, cmd: &Qail, data: &[u8]) -> PgResult<u64> {
1626        use qail_core::ast::Action;
1627
1628        if cmd.action != Action::Add {
1629            return Err(PgError::Query(
1630                "copy_bulk_bytes requires Qail::Add action".to_string(),
1631            ));
1632        }
1633
1634        let table = &cmd.table;
1635        let columns: Vec<String> = cmd
1636            .columns
1637            .iter()
1638            .filter_map(|expr| {
1639                use qail_core::ast::Expr;
1640                match expr {
1641                    Expr::Named(name) => Some(name.clone()),
1642                    Expr::Aliased { name, .. } => Some(name.clone()),
1643                    _ => None,
1644                }
1645            })
1646            .collect();
1647
1648        if columns.is_empty() {
1649            return Err(PgError::Query(
1650                "copy_bulk_bytes requires columns in Qail".to_string(),
1651            ));
1652        }
1653
1654        // Direct to raw COPY - zero encoding!
1655        self.connection.copy_in_raw(table, &columns, data).await
1656    }
1657
1658    /// Export table data using PostgreSQL COPY TO STDOUT (zero-copy streaming).
1659    /// Returns rows as tab-separated bytes for direct re-import via copy_bulk_bytes.
1660    /// # Example
1661    /// ```ignore
1662    /// let data = driver.copy_export_table("users", &["id", "name"]).await?;
1663    /// shadow_driver.copy_bulk_bytes(&cmd, &data).await?;
1664    /// ```
1665    pub async fn copy_export_table(
1666        &mut self,
1667        table: &str,
1668        columns: &[String],
1669    ) -> PgResult<Vec<u8>> {
1670        let cols = columns.join(", ");
1671        let sql = format!("COPY {} ({}) TO STDOUT", table, cols);
1672
1673        self.connection.copy_out_raw(&sql).await
1674    }
1675
1676    /// Stream large result sets using PostgreSQL cursors.
1677    /// This method uses DECLARE CURSOR internally to stream rows in batches,
1678    /// avoiding loading the entire result set into memory.
1679    /// # Example
1680    /// ```ignore
1681    /// let cmd = Qail::get("large_table");
1682    /// let batches = driver.stream_cmd(&cmd, 100).await?;
1683    /// for batch in batches {
1684    ///     for row in batch {
1685    ///         // process row
1686    ///     }
1687    /// }
1688    /// ```
1689    pub async fn stream_cmd(&mut self, cmd: &Qail, batch_size: usize) -> PgResult<Vec<Vec<PgRow>>> {
1690        use std::sync::atomic::{AtomicU64, Ordering};
1691        static CURSOR_ID: AtomicU64 = AtomicU64::new(0);
1692
1693        let cursor_name = format!("qail_cursor_{}", CURSOR_ID.fetch_add(1, Ordering::SeqCst));
1694
1695        // AST-NATIVE: Generate SQL directly from AST (no to_sql_parameterized!)
1696        use crate::protocol::AstEncoder;
1697        let mut sql_buf = bytes::BytesMut::with_capacity(256);
1698        let mut params: Vec<Option<Vec<u8>>> = Vec::new();
1699        AstEncoder::encode_select_sql(cmd, &mut sql_buf, &mut params)
1700            .map_err(|e| PgError::Encode(e.to_string()))?;
1701        let sql = String::from_utf8_lossy(&sql_buf).to_string();
1702
1703        // Must be in a transaction for cursors
1704        self.connection.begin_transaction().await?;
1705
1706        // Declare cursor
1707        // Declare cursor with bind params — Extended Query Protocol handles $1, $2 etc.
1708        self.connection
1709            .declare_cursor(&cursor_name, &sql, &params)
1710            .await?;
1711
1712        // Fetch all batches
1713        let mut all_batches = Vec::new();
1714        while let Some(rows) = self
1715            .connection
1716            .fetch_cursor(&cursor_name, batch_size)
1717            .await?
1718        {
1719            let pg_rows: Vec<PgRow> = rows
1720                .into_iter()
1721                .map(|cols| PgRow {
1722                    columns: cols,
1723                    column_info: None,
1724                })
1725                .collect();
1726            all_batches.push(pg_rows);
1727        }
1728
1729        self.connection.close_cursor(&cursor_name).await?;
1730        self.connection.commit().await?;
1731
1732        Ok(all_batches)
1733    }
1734}
1735
1736// ============================================================================
1737// Connection Builder
1738// ============================================================================
1739
1740/// Builder for creating PgDriver connections with named parameters.
1741/// # Example
1742/// ```ignore
1743/// let driver = PgDriver::builder()
1744///     .host("localhost")
1745///     .port(5432)
1746///     .user("admin")
1747///     .database("mydb")
1748///     .password("secret")
1749///     .connect()
1750///     .await?;
1751/// ```
1752#[derive(Default)]
1753pub struct PgDriverBuilder {
1754    host: Option<String>,
1755    port: Option<u16>,
1756    user: Option<String>,
1757    database: Option<String>,
1758    password: Option<String>,
1759    timeout: Option<std::time::Duration>,
1760    connect_options: ConnectOptions,
1761}
1762
1763impl PgDriverBuilder {
1764    /// Create a new builder with default values.
1765    pub fn new() -> Self {
1766        Self::default()
1767    }
1768
1769    /// Set the host (default: "127.0.0.1").
1770    pub fn host(mut self, host: impl Into<String>) -> Self {
1771        self.host = Some(host.into());
1772        self
1773    }
1774
1775    /// Set the port (default: 5432).
1776    pub fn port(mut self, port: u16) -> Self {
1777        self.port = Some(port);
1778        self
1779    }
1780
1781    /// Set the username (required).
1782    pub fn user(mut self, user: impl Into<String>) -> Self {
1783        self.user = Some(user.into());
1784        self
1785    }
1786
1787    /// Set the database name (required).
1788    pub fn database(mut self, database: impl Into<String>) -> Self {
1789        self.database = Some(database.into());
1790        self
1791    }
1792
1793    /// Set the password (optional, for cleartext/MD5/SCRAM-SHA-256 auth).
1794    pub fn password(mut self, password: impl Into<String>) -> Self {
1795        self.password = Some(password.into());
1796        self
1797    }
1798
1799    /// Set connection timeout (optional).
1800    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
1801        self.timeout = Some(timeout);
1802        self
1803    }
1804
1805    /// Set TLS policy (`disable`, `prefer`, `require`).
1806    pub fn tls_mode(mut self, mode: TlsMode) -> Self {
1807        self.connect_options.tls_mode = mode;
1808        self
1809    }
1810
1811    /// Set GSSAPI session encryption mode (`disable`, `prefer`, `require`).
1812    pub fn gss_enc_mode(mut self, mode: GssEncMode) -> Self {
1813        self.connect_options.gss_enc_mode = mode;
1814        self
1815    }
1816
1817    /// Set custom CA bundle PEM for TLS validation.
1818    pub fn tls_ca_cert_pem(mut self, ca_pem: Vec<u8>) -> Self {
1819        self.connect_options.tls_ca_cert_pem = Some(ca_pem);
1820        self
1821    }
1822
1823    /// Enable mTLS using client certificate/key config.
1824    pub fn mtls(mut self, config: TlsConfig) -> Self {
1825        self.connect_options.mtls = Some(config);
1826        self.connect_options.tls_mode = TlsMode::Require;
1827        self
1828    }
1829
1830    /// Override password-auth policy.
1831    pub fn auth_settings(mut self, settings: AuthSettings) -> Self {
1832        self.connect_options.auth = settings;
1833        self
1834    }
1835
1836    /// Set SCRAM channel-binding mode.
1837    pub fn channel_binding_mode(mut self, mode: ScramChannelBindingMode) -> Self {
1838        self.connect_options.auth.channel_binding = mode;
1839        self
1840    }
1841
1842    /// Set Kerberos/GSS/SSPI token provider callback.
1843    pub fn gss_token_provider(mut self, provider: GssTokenProvider) -> Self {
1844        self.connect_options.gss_token_provider = Some(provider);
1845        self
1846    }
1847
1848    /// Set a stateful Kerberos/GSS/SSPI token provider.
1849    pub fn gss_token_provider_ex(mut self, provider: GssTokenProviderEx) -> Self {
1850        self.connect_options.gss_token_provider_ex = Some(provider);
1851        self
1852    }
1853
1854    /// Connect to PostgreSQL using the configured parameters.
1855    pub async fn connect(self) -> PgResult<PgDriver> {
1856        let host = self.host.unwrap_or_else(|| "127.0.0.1".to_string());
1857        let port = self.port.unwrap_or(5432);
1858        let user = self
1859            .user
1860            .ok_or_else(|| PgError::Connection("User is required".to_string()))?;
1861        let database = self
1862            .database
1863            .ok_or_else(|| PgError::Connection("Database is required".to_string()))?;
1864
1865        let password = self.password;
1866        let options = self.connect_options;
1867
1868        if let Some(timeout) = self.timeout {
1869            let options = options.clone();
1870            tokio::time::timeout(
1871                timeout,
1872                PgDriver::connect_with_options(
1873                    &host,
1874                    port,
1875                    &user,
1876                    &database,
1877                    password.as_deref(),
1878                    options,
1879                ),
1880            )
1881            .await
1882            .map_err(|_| PgError::Timeout(format!("connection after {:?}", timeout)))?
1883        } else {
1884            PgDriver::connect_with_options(
1885                &host,
1886                port,
1887                &user,
1888                &database,
1889                password.as_deref(),
1890                options,
1891            )
1892            .await
1893        }
1894    }
1895}
1896
1897#[cfg(test)]
1898mod tests {
1899    use super::{PgError, PgServerError};
1900
1901    fn server_error(code: &str, message: &str) -> PgError {
1902        PgError::QueryServer(PgServerError {
1903            severity: "ERROR".to_string(),
1904            code: code.to_string(),
1905            message: message.to_string(),
1906            detail: None,
1907            hint: None,
1908        })
1909    }
1910
1911    #[test]
1912    fn prepared_statement_missing_is_retryable() {
1913        let err = server_error("26000", "prepared statement \"s1\" does not exist");
1914        assert!(err.is_prepared_statement_retryable());
1915    }
1916
1917    #[test]
1918    fn cached_plan_replanned_is_retryable() {
1919        let err = server_error("0A000", "cached plan must be replanned");
1920        assert!(err.is_prepared_statement_retryable());
1921    }
1922
1923    #[test]
1924    fn unrelated_server_error_is_not_retryable() {
1925        let err = server_error("23505", "duplicate key value violates unique constraint");
1926        assert!(!err.is_prepared_statement_retryable());
1927    }
1928
1929    // ══════════════════════════════════════════════════════════════════
1930    // is_transient_server_error
1931    // ══════════════════════════════════════════════════════════════════
1932
1933    #[test]
1934    fn serialization_failure_is_transient() {
1935        let err = server_error("40001", "could not serialize access");
1936        assert!(err.is_transient_server_error());
1937    }
1938
1939    #[test]
1940    fn deadlock_detected_is_transient() {
1941        let err = server_error("40P01", "deadlock detected");
1942        assert!(err.is_transient_server_error());
1943    }
1944
1945    #[test]
1946    fn cannot_connect_now_is_transient() {
1947        let err = server_error("57P03", "the database system is starting up");
1948        assert!(err.is_transient_server_error());
1949    }
1950
1951    #[test]
1952    fn admin_shutdown_is_transient() {
1953        let err = server_error(
1954            "57P01",
1955            "terminating connection due to administrator command",
1956        );
1957        assert!(err.is_transient_server_error());
1958    }
1959
1960    #[test]
1961    fn connection_exception_class_is_transient() {
1962        let err = server_error("08006", "connection failure");
1963        assert!(err.is_transient_server_error());
1964    }
1965
1966    #[test]
1967    fn connection_does_not_exist_is_transient() {
1968        let err = server_error("08003", "connection does not exist");
1969        assert!(err.is_transient_server_error());
1970    }
1971
1972    #[test]
1973    fn unique_violation_is_not_transient() {
1974        let err = server_error("23505", "duplicate key value violates unique constraint");
1975        assert!(!err.is_transient_server_error());
1976    }
1977
1978    #[test]
1979    fn syntax_error_is_not_transient() {
1980        let err = server_error("42601", "syntax error at or near \"SELECT\"");
1981        assert!(!err.is_transient_server_error());
1982    }
1983
1984    #[test]
1985    fn timeout_error_is_transient() {
1986        let err = PgError::Timeout("query after 30s".to_string());
1987        assert!(err.is_transient_server_error());
1988    }
1989
1990    #[test]
1991    fn io_connection_reset_is_transient() {
1992        let err = PgError::Io(std::io::Error::new(
1993            std::io::ErrorKind::ConnectionReset,
1994            "connection reset by peer",
1995        ));
1996        assert!(err.is_transient_server_error());
1997    }
1998
1999    #[test]
2000    fn io_permission_denied_is_not_transient() {
2001        let err = PgError::Io(std::io::Error::new(
2002            std::io::ErrorKind::PermissionDenied,
2003            "permission denied",
2004        ));
2005        assert!(!err.is_transient_server_error());
2006    }
2007
2008    #[test]
2009    fn connection_error_is_transient() {
2010        let err = PgError::Connection("host not found".to_string());
2011        assert!(err.is_transient_server_error());
2012    }
2013
2014    #[test]
2015    fn prepared_stmt_retryable_counts_as_transient() {
2016        let err = server_error("26000", "prepared statement \"s1\" does not exist");
2017        assert!(err.is_transient_server_error());
2018    }
2019
2020    // ══════════════════════════════════════════════════════════════════
2021    // TlsMode parse_sslmode (Phase 1b)
2022    // ══════════════════════════════════════════════════════════════════
2023
2024    #[test]
2025    fn tls_mode_parse_disable() {
2026        assert_eq!(
2027            super::TlsMode::parse_sslmode("disable"),
2028            Some(super::TlsMode::Disable)
2029        );
2030    }
2031
2032    #[test]
2033    fn tls_mode_parse_prefer_variants() {
2034        assert_eq!(
2035            super::TlsMode::parse_sslmode("prefer"),
2036            Some(super::TlsMode::Prefer)
2037        );
2038        assert_eq!(
2039            super::TlsMode::parse_sslmode("allow"),
2040            Some(super::TlsMode::Prefer),
2041            "libpq 'allow' maps to Prefer"
2042        );
2043    }
2044
2045    #[test]
2046    fn tls_mode_parse_require_variants() {
2047        // All three map to Require — verify-ca and verify-full require
2048        // TLS but certificate validation is handled at the rustls layer.
2049        assert_eq!(
2050            super::TlsMode::parse_sslmode("require"),
2051            Some(super::TlsMode::Require)
2052        );
2053        assert_eq!(
2054            super::TlsMode::parse_sslmode("verify-ca"),
2055            Some(super::TlsMode::Require),
2056            "verify-ca → Require (CA validation at TLS layer)"
2057        );
2058        assert_eq!(
2059            super::TlsMode::parse_sslmode("verify-full"),
2060            Some(super::TlsMode::Require),
2061            "verify-full → Require (hostname validation at TLS layer)"
2062        );
2063    }
2064
2065    #[test]
2066    fn tls_mode_parse_case_insensitive() {
2067        assert_eq!(
2068            super::TlsMode::parse_sslmode("REQUIRE"),
2069            Some(super::TlsMode::Require)
2070        );
2071        assert_eq!(
2072            super::TlsMode::parse_sslmode("Verify-Full"),
2073            Some(super::TlsMode::Require)
2074        );
2075    }
2076
2077    #[test]
2078    fn tls_mode_parse_unknown_returns_none() {
2079        assert_eq!(super::TlsMode::parse_sslmode("invalid"), None);
2080        assert_eq!(super::TlsMode::parse_sslmode(""), None);
2081    }
2082
2083    #[test]
2084    fn tls_mode_parse_trims_whitespace() {
2085        assert_eq!(
2086            super::TlsMode::parse_sslmode("  require  "),
2087            Some(super::TlsMode::Require)
2088        );
2089    }
2090
2091    #[test]
2092    fn tls_mode_default_is_disable() {
2093        assert_eq!(super::TlsMode::default(), super::TlsMode::Disable);
2094    }
2095
2096    // ══════════════════════════════════════════════════════════════════
2097    // AuthSettings behavior matrix (Phase 1c)
2098    // ══════════════════════════════════════════════════════════════════
2099
2100    #[test]
2101    fn auth_default_allows_all_password_methods() {
2102        let auth = super::AuthSettings::default();
2103        assert!(auth.allow_cleartext_password);
2104        assert!(auth.allow_md5_password);
2105        assert!(auth.allow_scram_sha_256);
2106        assert!(auth.has_any_password_method());
2107    }
2108
2109    #[test]
2110    fn auth_default_disables_enterprise_methods() {
2111        let auth = super::AuthSettings::default();
2112        assert!(
2113            !auth.allow_kerberos_v5,
2114            "Kerberos V5 should be disabled by default"
2115        );
2116        assert!(!auth.allow_gssapi, "GSSAPI should be disabled by default");
2117        assert!(!auth.allow_sspi, "SSPI should be disabled by default");
2118    }
2119
2120    #[test]
2121    fn auth_scram_only_restricts_to_scram() {
2122        let auth = super::AuthSettings::scram_only();
2123        // Only SCRAM allowed
2124        assert!(auth.allow_scram_sha_256);
2125        assert!(!auth.allow_cleartext_password);
2126        assert!(!auth.allow_md5_password);
2127        // Enterprise auth still disabled
2128        assert!(!auth.allow_kerberos_v5);
2129        assert!(!auth.allow_gssapi);
2130        assert!(!auth.allow_sspi);
2131        // Still has a password method
2132        assert!(auth.has_any_password_method());
2133    }
2134
2135    #[test]
2136    fn auth_gssapi_only_disables_all_passwords() {
2137        let auth = super::AuthSettings::gssapi_only();
2138        // No password methods
2139        assert!(!auth.allow_cleartext_password);
2140        assert!(!auth.allow_md5_password);
2141        assert!(!auth.allow_scram_sha_256);
2142        assert!(!auth.has_any_password_method());
2143        // All enterprise methods enabled
2144        assert!(auth.allow_kerberos_v5);
2145        assert!(auth.allow_gssapi);
2146        assert!(auth.allow_sspi);
2147    }
2148
2149    #[test]
2150    fn auth_has_any_password_when_only_cleartext() {
2151        let auth = super::AuthSettings {
2152            allow_cleartext_password: true,
2153            allow_md5_password: false,
2154            allow_scram_sha_256: false,
2155            ..super::AuthSettings::default()
2156        };
2157        assert!(auth.has_any_password_method());
2158    }
2159
2160    #[test]
2161    fn auth_no_password_method_when_all_disabled() {
2162        let auth = super::AuthSettings {
2163            allow_cleartext_password: false,
2164            allow_md5_password: false,
2165            allow_scram_sha_256: false,
2166            ..super::AuthSettings::default()
2167        };
2168        assert!(!auth.has_any_password_method());
2169    }
2170
2171    #[test]
2172    fn auth_enterprise_mechanisms_are_distinct() {
2173        // Verify the three enterprise mechanisms are distinct values
2174        assert_ne!(
2175            super::EnterpriseAuthMechanism::KerberosV5,
2176            super::EnterpriseAuthMechanism::GssApi
2177        );
2178        assert_ne!(
2179            super::EnterpriseAuthMechanism::GssApi,
2180            super::EnterpriseAuthMechanism::Sspi
2181        );
2182        assert_ne!(
2183            super::EnterpriseAuthMechanism::KerberosV5,
2184            super::EnterpriseAuthMechanism::Sspi
2185        );
2186    }
2187
2188    #[test]
2189    fn auth_channel_binding_default_is_prefer() {
2190        let auth = super::AuthSettings::default();
2191        assert_eq!(auth.channel_binding, super::ScramChannelBindingMode::Prefer);
2192    }
2193
2194    // ══════════════════════════════════════════════════════════════════
2195    // parse_database_url — query-string stripping
2196    // ══════════════════════════════════════════════════════════════════
2197
2198    #[test]
2199    fn parse_database_url_basic() {
2200        let (host, port, user, db, pw) =
2201            super::PgDriver::parse_database_url("postgresql://admin:secret@localhost:5432/mydb")
2202                .unwrap();
2203        assert_eq!(host, "localhost");
2204        assert_eq!(port, 5432);
2205        assert_eq!(user, "admin");
2206        assert_eq!(db, "mydb");
2207        assert_eq!(pw, Some("secret".to_string()));
2208    }
2209
2210    #[test]
2211    fn parse_database_url_strips_query_params() {
2212        let (_, _, _, db, _) = super::PgDriver::parse_database_url(
2213            "postgresql://user:pass@host:5432/mydb?sslmode=require&auth_mode=scram_only",
2214        )
2215        .unwrap();
2216        assert_eq!(db, "mydb", "query params must not leak into database name");
2217    }
2218
2219    #[test]
2220    fn parse_database_url_strips_single_query_param() {
2221        let (_, _, _, db, _) =
2222            super::PgDriver::parse_database_url("postgres://u:p@h/testdb?gss_provider=linux_krb5")
2223                .unwrap();
2224        assert_eq!(db, "testdb");
2225    }
2226
2227    #[test]
2228    fn parse_database_url_no_query_still_works() {
2229        let (_, _, _, db, _) =
2230            super::PgDriver::parse_database_url("postgresql://user@host:5432/cleandb").unwrap();
2231        assert_eq!(db, "cleandb");
2232    }
2233}