qail_pg/driver/
mod.rs

1//! PostgreSQL Driver Module (Layer 3: Async I/O)
2//!
3//! This module contains the async runtime-specific code.
4//! Auto-detects the best I/O backend:
5//! - Linux 5.1+: io_uring (fastest)
6//! - Linux < 5.1 / macOS / Windows: tokio
7//!
8//! Connection methods are split across modules for easier maintenance:
9//! - `connection.rs` - Core struct and connect methods
10//! - `io.rs` - send, recv, recv_msg_type_fast
11//! - `query.rs` - query, query_cached, execute_simple
12//! - `transaction.rs` - begin_transaction, commit, rollback
13//! - `cursor.rs` - declare_cursor, fetch_cursor, close_cursor  
14//! - `copy.rs` - COPY protocol for bulk operations
15//! - `pipeline.rs` - High-performance pipelining (275k q/s)
16//! - `cancel.rs` - Query cancellation
17//! - `io_backend.rs` - Runtime I/O backend detection
18
19mod cancel;
20mod connection;
21mod copy;
22mod cursor;
23mod io;
24pub mod io_backend;
25mod pipeline;
26mod pool;
27mod prepared;
28mod query;
29mod row;
30mod stream;
31mod transaction;
32
33pub use connection::PgConnection;
34pub use connection::TlsConfig;
35pub(crate) use connection::{CANCEL_REQUEST_CODE, parse_affected_rows};
36pub use io_backend::{IoBackend, backend_name, detect as detect_io_backend};
37pub use pool::{PgPool, PoolConfig, PoolStats, PooledConnection};
38pub use prepared::PreparedStatement;
39
40use qail_core::ast::QailCmd;
41use std::collections::HashMap;
42use std::sync::Arc;
43
44/// Column metadata from RowDescription (shared across rows via Arc).
45#[derive(Debug, Clone)]
46pub struct ColumnInfo {
47    /// Column name -> index mapping
48    pub name_to_index: HashMap<String, usize>,
49    /// Column OIDs
50    pub oids: Vec<u32>,
51    /// Column format codes (0=text, 1=binary)
52    pub formats: Vec<i16>,
53}
54
55impl ColumnInfo {
56    /// Create from FieldDescriptions.
57    pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
58        let mut name_to_index = HashMap::with_capacity(fields.len());
59        let mut oids = Vec::with_capacity(fields.len());
60        let mut formats = Vec::with_capacity(fields.len());
61
62        for (i, field) in fields.iter().enumerate() {
63            name_to_index.insert(field.name.clone(), i);
64            oids.push(field.type_oid);
65            formats.push(field.format);
66        }
67
68        Self {
69            name_to_index,
70            oids,
71            formats,
72        }
73    }
74}
75
76/// PostgreSQL row with column data and metadata.
77pub struct PgRow {
78    /// Column values (None = NULL)
79    pub columns: Vec<Option<Vec<u8>>>,
80    /// Column metadata (shared across rows via Arc)
81    pub column_info: Option<Arc<ColumnInfo>>,
82}
83
84/// Error type for PostgreSQL driver operations.
85#[derive(Debug)]
86pub enum PgError {
87    /// Connection error
88    Connection(String),
89    /// Protocol error
90    Protocol(String),
91    /// Authentication error
92    Auth(String),
93    /// Query error
94    Query(String),
95    /// No rows returned
96    NoRows,
97    /// I/O error
98    Io(std::io::Error),
99}
100
101impl std::fmt::Display for PgError {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        match self {
104            PgError::Connection(e) => write!(f, "Connection error: {}", e),
105            PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
106            PgError::Auth(e) => write!(f, "Auth error: {}", e),
107            PgError::Query(e) => write!(f, "Query error: {}", e),
108            PgError::NoRows => write!(f, "No rows returned"),
109            PgError::Io(e) => write!(f, "I/O error: {}", e),
110        }
111    }
112}
113
114impl std::error::Error for PgError {}
115
116impl From<std::io::Error> for PgError {
117    fn from(e: std::io::Error) -> Self {
118        PgError::Io(e)
119    }
120}
121
122/// Result type for PostgreSQL operations.
123pub type PgResult<T> = Result<T, PgError>;
124
125/// PostgreSQL driver.
126///
127/// Combines the pure encoder (Layer 2) with async I/O (Layer 3).
128pub struct PgDriver {
129    #[allow(dead_code)]
130    connection: PgConnection,
131}
132
133impl PgDriver {
134    /// Create a new driver with an existing connection.
135    pub fn new(connection: PgConnection) -> Self {
136        Self { connection }
137    }
138
139    /// Builder pattern for ergonomic connection configuration.
140    ///
141    /// # Example
142    /// ```ignore
143    /// let driver = PgDriver::builder()
144    ///     .host("localhost")
145    ///     .port(5432)
146    ///     .user("admin")
147    ///     .database("mydb")
148    ///     .password("secret")  // Optional
149    ///     .connect()
150    ///     .await?;
151    /// ```
152    pub fn builder() -> PgDriverBuilder {
153        PgDriverBuilder::new()
154    }
155
156    /// Connect to PostgreSQL and create a driver (trust mode, no password).
157    pub async fn connect(host: &str, port: u16, user: &str, database: &str) -> PgResult<Self> {
158        let connection = PgConnection::connect(host, port, user, database).await?;
159        Ok(Self::new(connection))
160    }
161
162    /// Connect to PostgreSQL with password authentication (SCRAM-SHA-256).
163    pub async fn connect_with_password(
164        host: &str,
165        port: u16,
166        user: &str,
167        database: &str,
168        password: &str,
169    ) -> PgResult<Self> {
170        let connection =
171            PgConnection::connect_with_password(host, port, user, database, Some(password)).await?;
172        Ok(Self::new(connection))
173    }
174
175    /// Connect to PostgreSQL with a connection timeout.
176    ///
177    /// If the connection cannot be established within the timeout, returns an error.
178    ///
179    /// # Example
180    /// ```ignore
181    /// use std::time::Duration;
182    /// let driver = PgDriver::connect_with_timeout(
183    ///     "localhost", 5432, "user", "db", "password",
184    ///     Duration::from_secs(5)
185    /// ).await?;
186    /// ```
187    pub async fn connect_with_timeout(
188        host: &str,
189        port: u16,
190        user: &str,
191        database: &str,
192        password: &str,
193        timeout: std::time::Duration,
194    ) -> PgResult<Self> {
195        tokio::time::timeout(
196            timeout,
197            Self::connect_with_password(host, port, user, database, password),
198        )
199        .await
200        .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
201    }
202
203    /// Execute a QAIL command and fetch all rows (AST-NATIVE).
204    ///
205    /// Uses AstEncoder to directly encode AST to wire protocol bytes.
206    /// NO SQL STRING GENERATION!
207    pub async fn fetch_all(&mut self, cmd: &QailCmd) -> PgResult<Vec<PgRow>> {
208        use crate::protocol::AstEncoder;
209
210        // AST-NATIVE: Encode directly to wire bytes (no to_sql()!)
211        let (wire_bytes, _params) = AstEncoder::encode_cmd(cmd);
212
213        // Send wire bytes and receive response
214        self.connection.send_bytes(&wire_bytes).await?;
215
216        // Collect results
217        let mut rows: Vec<PgRow> = Vec::new();
218        let mut column_info: Option<Arc<ColumnInfo>> = None;
219
220        loop {
221            let msg = self.connection.recv().await?;
222            match msg {
223                crate::protocol::BackendMessage::ParseComplete
224                | crate::protocol::BackendMessage::BindComplete => {}
225                crate::protocol::BackendMessage::RowDescription(fields) => {
226                    // Create and share column metadata across all rows
227                    column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
228                }
229                crate::protocol::BackendMessage::DataRow(data) => {
230                    rows.push(PgRow {
231                        columns: data,
232                        column_info: column_info.clone(),
233                    });
234                }
235                crate::protocol::BackendMessage::CommandComplete(_) => {}
236                crate::protocol::BackendMessage::ReadyForQuery(_) => {
237                    return Ok(rows);
238                }
239                crate::protocol::BackendMessage::ErrorResponse(err) => {
240                    return Err(PgError::Query(err.message));
241                }
242                _ => {}
243            }
244        }
245    }
246
247    /// Execute a QAIL command and fetch one row.
248    pub async fn fetch_one(&mut self, cmd: &QailCmd) -> PgResult<PgRow> {
249        let rows = self.fetch_all(cmd).await?;
250        rows.into_iter().next().ok_or(PgError::NoRows)
251    }
252
253    /// Execute a QAIL command with PREPARED STATEMENT CACHING.
254    ///
255    /// Like fetch_all(), but caches the prepared statement on the server.
256    /// On first call: sends Parse + Bind + Execute + Sync
257    /// On subsequent calls: sends only Bind + Execute + Sync (much faster!)
258    ///
259    /// Use this for repeated queries with the same AST structure.
260    pub async fn fetch_all_cached(&mut self, cmd: &QailCmd) -> PgResult<Vec<PgRow>> {
261        use crate::protocol::AstEncoder;
262
263        // Generate SQL and params from AST (for caching)
264        let (sql, params) = AstEncoder::encode_cmd_sql(cmd);
265
266        // Use cached query - only parses on first call
267        let raw_rows = self.connection.query_cached(&sql, &params).await?;
268
269        // Convert to PgRow with column info
270        let rows: Vec<PgRow> = raw_rows
271            .into_iter()
272            .map(|data| PgRow {
273                columns: data,
274                column_info: None, // Simple version - no column metadata
275            })
276            .collect();
277
278        Ok(rows)
279    }
280
281    /// Execute a QAIL command (for mutations) - AST-NATIVE.
282    ///
283    /// Uses AstEncoder to directly encode AST to wire protocol bytes.
284    /// Returns the number of affected rows.
285    pub async fn execute(&mut self, cmd: &QailCmd) -> PgResult<u64> {
286        use crate::protocol::AstEncoder;
287
288        // AST-NATIVE: Encode directly to wire bytes (no to_sql()!)
289        let (wire_bytes, _params) = AstEncoder::encode_cmd(cmd);
290
291        // Send wire bytes and receive response
292        self.connection.send_bytes(&wire_bytes).await?;
293
294        // Parse response for affected rows
295        let mut affected = 0u64;
296        loop {
297            let msg = self.connection.recv().await?;
298            match msg {
299                crate::protocol::BackendMessage::ParseComplete
300                | crate::protocol::BackendMessage::BindComplete => {}
301                crate::protocol::BackendMessage::RowDescription(_) => {}
302                crate::protocol::BackendMessage::DataRow(_) => {}
303                crate::protocol::BackendMessage::CommandComplete(tag) => {
304                    // Parse "INSERT 0 5" or "UPDATE 3" etc
305                    if let Some(n) = tag.split_whitespace().last() {
306                        affected = n.parse().unwrap_or(0);
307                    }
308                }
309                crate::protocol::BackendMessage::ReadyForQuery(_) => {
310                    return Ok(affected);
311                }
312                crate::protocol::BackendMessage::ErrorResponse(err) => {
313                    return Err(PgError::Query(err.message));
314                }
315                _ => {}
316            }
317        }
318    }
319
320    // ==================== TRANSACTION CONTROL ====================
321
322    /// Begin a transaction (AST-native).
323    pub async fn begin(&mut self) -> PgResult<()> {
324        self.connection.begin_transaction().await
325    }
326
327    /// Commit the current transaction (AST-native).
328    pub async fn commit(&mut self) -> PgResult<()> {
329        self.connection.commit().await
330    }
331
332    /// Rollback the current transaction (AST-native).
333    pub async fn rollback(&mut self) -> PgResult<()> {
334        self.connection.rollback().await
335    }
336
337    /// Create a named savepoint within the current transaction.
338    ///
339    /// Savepoints allow partial rollback within a transaction.
340    /// Use `rollback_to()` to return to this savepoint.
341    ///
342    /// # Example
343    /// ```ignore
344    /// driver.begin().await?;
345    /// driver.execute(&insert1).await?;
346    /// driver.savepoint("sp1").await?;
347    /// driver.execute(&insert2).await?;
348    /// driver.rollback_to("sp1").await?; // Undo insert2, keep insert1
349    /// driver.commit().await?;
350    /// ```
351    pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
352        self.connection.savepoint(name).await
353    }
354
355    /// Rollback to a previously created savepoint.
356    ///
357    /// Discards all changes since the named savepoint was created,
358    /// but keeps the transaction open.
359    pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
360        self.connection.rollback_to(name).await
361    }
362
363    /// Release a savepoint (free resources, if no longer needed).
364    ///
365    /// After release, the savepoint cannot be rolled back to.
366    pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
367        self.connection.release_savepoint(name).await
368    }
369
370    // ==================== BATCH TRANSACTIONS ====================
371
372    /// Execute multiple commands in a single atomic transaction.
373    ///
374    /// All commands succeed or all are rolled back.
375    /// Returns the number of affected rows for each command.
376    ///
377    /// # Example
378    /// ```ignore
379    /// let cmds = vec![
380    ///     QailCmd::add("users").columns(["name"]).values(["Alice"]),
381    ///     QailCmd::add("users").columns(["name"]).values(["Bob"]),
382    /// ];
383    /// let results = driver.execute_batch(&cmds).await?;
384    /// // results = [1, 1] (rows affected)
385    /// ```
386    pub async fn execute_batch(&mut self, cmds: &[QailCmd]) -> PgResult<Vec<u64>> {
387        self.begin().await?;
388        let mut results = Vec::with_capacity(cmds.len());
389        for cmd in cmds {
390            match self.execute(cmd).await {
391                Ok(n) => results.push(n),
392                Err(e) => {
393                    self.rollback().await?;
394                    return Err(e);
395                }
396            }
397        }
398        self.commit().await?;
399        Ok(results)
400    }
401
402    // ==================== STATEMENT TIMEOUT ====================
403
404    /// Set statement timeout for this connection (in milliseconds).
405    ///
406    /// Queries that exceed this time will be cancelled.
407    /// This is a production safety feature.
408    ///
409    /// # Example
410    /// ```ignore
411    /// driver.set_statement_timeout(30_000).await?; // 30 seconds
412    /// ```
413    pub async fn set_statement_timeout(&mut self, ms: u32) -> PgResult<()> {
414        self.execute_raw(&format!("SET statement_timeout = {}", ms))
415            .await
416    }
417
418    /// Reset statement timeout to default (no limit).
419    pub async fn reset_statement_timeout(&mut self) -> PgResult<()> {
420        self.execute_raw("RESET statement_timeout").await
421    }
422
423    // ==================== PIPELINE (BATCH) ====================
424
425    /// Execute multiple QailCmd ASTs in a single network round-trip (PIPELINING).
426    ///
427    /// This is the high-performance path for batch operations.
428    /// Returns the count of successful queries.
429    ///
430    /// # Example
431    /// ```ignore
432    /// let cmds: Vec<QailCmd> = (1..=1000)
433    ///     .map(|i| QailCmd::get("harbors").columns(["id", "name"]).limit(i))
434    ///     .collect();
435    /// let count = driver.pipeline_batch(&cmds).await?;
436    /// assert_eq!(count, 1000);
437    /// ```
438    pub async fn pipeline_batch(&mut self, cmds: &[QailCmd]) -> PgResult<usize> {
439        self.connection.pipeline_ast_fast(cmds).await
440    }
441
442    /// Execute multiple QailCmd ASTs and return full row data.
443    ///
444    /// Unlike `pipeline_batch` which only returns count, this method
445    /// collects and returns all row data from each query.
446    ///
447    /// Returns: Vec of PgRow vectors, one per query in the batch.
448    pub async fn pipeline_fetch(&mut self, cmds: &[QailCmd]) -> PgResult<Vec<Vec<PgRow>>> {
449        let raw_results = self.connection.pipeline_ast(cmds).await?;
450
451        // Convert raw results to PgRow format
452        let results: Vec<Vec<PgRow>> = raw_results
453            .into_iter()
454            .map(|rows| {
455                rows.into_iter()
456                    .map(|columns| PgRow {
457                        columns,
458                        column_info: None,
459                    })
460                    .collect()
461            })
462            .collect();
463
464        Ok(results)
465    }
466
467    /// Prepare a SQL statement for repeated execution.
468    ///
469    /// Returns a PreparedStatement handle for use with pipeline_prepared_fast.
470    pub async fn prepare(&mut self, sql: &str) -> PgResult<PreparedStatement> {
471        self.connection.prepare(sql).await
472    }
473
474    /// Execute a prepared statement pipeline in FAST mode (count only).
475    ///
476    /// This is the fastest possible path - Parse once, Bind+Execute many.
477    /// Matches native Rust benchmark performance (~355k q/s).
478    pub async fn pipeline_prepared_fast(
479        &mut self,
480        stmt: &PreparedStatement,
481        params_batch: &[Vec<Option<Vec<u8>>>],
482    ) -> PgResult<usize> {
483        self.connection
484            .pipeline_prepared_fast(stmt, params_batch)
485            .await
486    }
487
488    // ==================== LEGACY/BOOTSTRAP ====================
489
490    /// Execute a raw SQL string.
491    ///
492    /// ⚠️ **Discouraged**: Violates AST-native philosophy.
493    /// Use for bootstrap DDL only (e.g., migration table creation).
494    /// For transactions, use `begin()`, `commit()`, `rollback()`.
495    pub async fn execute_raw(&mut self, sql: &str) -> PgResult<()> {
496        self.connection.execute_simple(sql).await
497    }
498
499    /// Bulk insert data using PostgreSQL COPY protocol (AST-native).
500    ///
501    /// Uses a QailCmd::Add to get validated table and column names from the AST,
502    /// not user-provided strings. This is the sound, AST-native approach.
503    ///
504    /// # Example
505    /// ```ignore
506    /// // Create a QailCmd::Add to define table and columns
507    /// let cmd = QailCmd::add("users")
508    ///     .columns(["id", "name", "email"]);
509    ///
510    /// // Bulk insert rows
511    /// let rows: Vec<Vec<Value>> = vec![
512    ///     vec![Value::Int(1), Value::String("Alice"), Value::String("alice@ex.com")],
513    ///     vec![Value::Int(2), Value::String("Bob"), Value::String("bob@ex.com")],
514    /// ];
515    /// driver.copy_bulk(&cmd, &rows).await?;
516    /// ```
517    pub async fn copy_bulk(
518        &mut self,
519        cmd: &QailCmd,
520        rows: &[Vec<qail_core::ast::Value>],
521    ) -> PgResult<u64> {
522        use qail_core::ast::Action;
523
524        // Validate this is an Add command
525        if cmd.action != Action::Add {
526            return Err(PgError::Query(
527                "copy_bulk requires QailCmd::Add action".to_string(),
528            ));
529        }
530
531        // Extract table from AST (already validated at parse time)
532        let table = &cmd.table;
533
534        // Extract column names from AST expressions
535        let columns: Vec<String> = cmd
536            .columns
537            .iter()
538            .filter_map(|expr| {
539                use qail_core::ast::Expr;
540                match expr {
541                    Expr::Named(name) => Some(name.clone()),
542                    Expr::Aliased { name, .. } => Some(name.clone()),
543                    Expr::Star => None, // Can't COPY with *
544                    _ => None,
545                }
546            })
547            .collect();
548
549        if columns.is_empty() {
550            return Err(PgError::Query(
551                "copy_bulk requires columns in QailCmd".to_string(),
552            ));
553        }
554
555        // Use optimized COPY path: direct Value → bytes encoding, single syscall
556        self.connection.copy_in_fast(table, &columns, rows).await
557    }
558
559    /// **Fastest** bulk insert using pre-encoded COPY data.
560    ///
561    /// Accepts raw COPY text format bytes. Use when caller has already
562    /// encoded rows to avoid any encoding overhead.
563    ///
564    /// # Format
565    /// Data should be tab-separated rows with newlines (COPY text format):
566    /// `1\thello\t3.14\n2\tworld\t2.71\n`
567    ///
568    /// # Example
569    /// ```ignore
570    /// let cmd = QailCmd::add("users").columns(["id", "name"]);
571    /// let data = b"1\tAlice\n2\tBob\n";
572    /// driver.copy_bulk_bytes(&cmd, data).await?;
573    /// ```
574    pub async fn copy_bulk_bytes(&mut self, cmd: &QailCmd, data: &[u8]) -> PgResult<u64> {
575        use qail_core::ast::Action;
576
577        if cmd.action != Action::Add {
578            return Err(PgError::Query(
579                "copy_bulk_bytes requires QailCmd::Add action".to_string(),
580            ));
581        }
582
583        let table = &cmd.table;
584        let columns: Vec<String> = cmd
585            .columns
586            .iter()
587            .filter_map(|expr| {
588                use qail_core::ast::Expr;
589                match expr {
590                    Expr::Named(name) => Some(name.clone()),
591                    Expr::Aliased { name, .. } => Some(name.clone()),
592                    _ => None,
593                }
594            })
595            .collect();
596
597        if columns.is_empty() {
598            return Err(PgError::Query(
599                "copy_bulk_bytes requires columns in QailCmd".to_string(),
600            ));
601        }
602
603        // Direct to raw COPY - zero encoding!
604        self.connection.copy_in_raw(table, &columns, data).await
605    }
606
607    /// Stream large result sets using PostgreSQL cursors.
608    ///
609    /// This method uses DECLARE CURSOR internally to stream rows in batches,
610    /// avoiding loading the entire result set into memory.
611    ///
612    /// # Example
613    /// ```ignore
614    /// let cmd = QailCmd::get("large_table");
615    /// let batches = driver.stream_cmd(&cmd, 100).await?;
616    /// for batch in batches {
617    ///     for row in batch {
618    ///         // process row
619    ///     }
620    /// }
621    /// ```
622    pub async fn stream_cmd(
623        &mut self,
624        cmd: &QailCmd,
625        batch_size: usize,
626    ) -> PgResult<Vec<Vec<PgRow>>> {
627        use std::sync::atomic::{AtomicU64, Ordering};
628        static CURSOR_ID: AtomicU64 = AtomicU64::new(0);
629
630        // Generate unique cursor name
631        let cursor_name = format!("qail_cursor_{}", CURSOR_ID.fetch_add(1, Ordering::SeqCst));
632
633        // AST-NATIVE: Generate SQL directly from AST (no to_sql_parameterized!)
634        use crate::protocol::AstEncoder;
635        let mut sql_buf = bytes::BytesMut::with_capacity(256);
636        let mut params: Vec<Option<Vec<u8>>> = Vec::new();
637        AstEncoder::encode_select_sql(cmd, &mut sql_buf, &mut params);
638        let sql = String::from_utf8_lossy(&sql_buf).to_string();
639
640        // Must be in a transaction for cursors
641        self.connection.begin_transaction().await?;
642
643        // Declare cursor
644        self.connection.declare_cursor(&cursor_name, &sql).await?;
645
646        // Fetch all batches
647        let mut all_batches = Vec::new();
648        while let Some(rows) = self
649            .connection
650            .fetch_cursor(&cursor_name, batch_size)
651            .await?
652        {
653            let pg_rows: Vec<PgRow> = rows
654                .into_iter()
655                .map(|cols| PgRow {
656                    columns: cols,
657                    column_info: None,
658                })
659                .collect();
660            all_batches.push(pg_rows);
661        }
662
663        // Cleanup
664        self.connection.close_cursor(&cursor_name).await?;
665        self.connection.commit().await?;
666
667        Ok(all_batches)
668    }
669}
670
671// ============================================================================
672// Connection Builder
673// ============================================================================
674
675/// Builder for creating PgDriver connections with named parameters.
676///
677/// # Example
678/// ```ignore
679/// let driver = PgDriver::builder()
680///     .host("localhost")
681///     .port(5432)
682///     .user("admin")
683///     .database("mydb")
684///     .password("secret")
685///     .connect()
686///     .await?;
687/// ```
688#[derive(Default)]
689pub struct PgDriverBuilder {
690    host: Option<String>,
691    port: Option<u16>,
692    user: Option<String>,
693    database: Option<String>,
694    password: Option<String>,
695    timeout: Option<std::time::Duration>,
696}
697
698impl PgDriverBuilder {
699    /// Create a new builder with default values.
700    pub fn new() -> Self {
701        Self::default()
702    }
703
704    /// Set the host (default: "127.0.0.1").
705    pub fn host(mut self, host: impl Into<String>) -> Self {
706        self.host = Some(host.into());
707        self
708    }
709
710    /// Set the port (default: 5432).
711    pub fn port(mut self, port: u16) -> Self {
712        self.port = Some(port);
713        self
714    }
715
716    /// Set the username (required).
717    pub fn user(mut self, user: impl Into<String>) -> Self {
718        self.user = Some(user.into());
719        self
720    }
721
722    /// Set the database name (required).
723    pub fn database(mut self, database: impl Into<String>) -> Self {
724        self.database = Some(database.into());
725        self
726    }
727
728    /// Set the password (optional, for SCRAM-SHA-256 auth).
729    pub fn password(mut self, password: impl Into<String>) -> Self {
730        self.password = Some(password.into());
731        self
732    }
733
734    /// Set connection timeout (optional).
735    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
736        self.timeout = Some(timeout);
737        self
738    }
739
740    /// Connect to PostgreSQL using the configured parameters.
741    pub async fn connect(self) -> PgResult<PgDriver> {
742        let host = self.host.as_deref().unwrap_or("127.0.0.1");
743        let port = self.port.unwrap_or(5432);
744        let user = self.user.as_deref().ok_or_else(|| {
745            PgError::Connection("User is required".to_string())
746        })?;
747        let database = self.database.as_deref().ok_or_else(|| {
748            PgError::Connection("Database is required".to_string())
749        })?;
750
751        match (self.password.as_deref(), self.timeout) {
752            (Some(password), Some(timeout)) => {
753                PgDriver::connect_with_timeout(host, port, user, database, password, timeout).await
754            }
755            (Some(password), None) => {
756                PgDriver::connect_with_password(host, port, user, database, password).await
757            }
758            (None, Some(timeout)) => {
759                tokio::time::timeout(
760                    timeout,
761                    PgDriver::connect(host, port, user, database),
762                )
763                .await
764                .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
765            }
766            (None, None) => {
767                PgDriver::connect(host, port, user, database).await
768            }
769        }
770    }
771}