qail_pg/driver/mod.rs
1//! PostgreSQL Driver Module (Layer 3: Async I/O)
2//!
3//! This module contains the async runtime-specific code.
4//! Auto-detects the best I/O backend:
5//! - Linux 5.1+: io_uring (fastest)
6//! - Linux < 5.1 / macOS / Windows: tokio
7//!
8//! Connection methods are split across modules for easier maintenance:
9//! - `connection.rs` - Core struct and connect methods
10//! - `io.rs` - send, recv, recv_msg_type_fast
11//! - `query.rs` - query, query_cached, execute_simple
12//! - `transaction.rs` - begin_transaction, commit, rollback
13//! - `cursor.rs` - declare_cursor, fetch_cursor, close_cursor
14//! - `copy.rs` - COPY protocol for bulk operations
15//! - `pipeline.rs` - High-performance pipelining (275k q/s)
16//! - `cancel.rs` - Query cancellation
17//! - `io_backend.rs` - Runtime I/O backend detection
18
19mod cancel;
20mod connection;
21mod copy;
22mod cursor;
23mod io;
24pub mod io_backend;
25mod pipeline;
26mod pool;
27mod prepared;
28mod query;
29mod row;
30mod stream;
31mod transaction;
32
33pub use connection::PgConnection;
34pub use connection::TlsConfig;
35pub(crate) use connection::{CANCEL_REQUEST_CODE, parse_affected_rows};
36pub use io_backend::{IoBackend, backend_name, detect as detect_io_backend};
37pub use pool::{PgPool, PoolConfig, PoolStats, PooledConnection};
38pub use prepared::PreparedStatement;
39
40use qail_core::ast::QailCmd;
41use std::collections::HashMap;
42use std::sync::Arc;
43
44/// Column metadata from RowDescription (shared across rows via Arc).
45#[derive(Debug, Clone)]
46pub struct ColumnInfo {
47 /// Column name -> index mapping
48 pub name_to_index: HashMap<String, usize>,
49 /// Column OIDs
50 pub oids: Vec<u32>,
51 /// Column format codes (0=text, 1=binary)
52 pub formats: Vec<i16>,
53}
54
55impl ColumnInfo {
56 /// Create from FieldDescriptions.
57 pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
58 let mut name_to_index = HashMap::with_capacity(fields.len());
59 let mut oids = Vec::with_capacity(fields.len());
60 let mut formats = Vec::with_capacity(fields.len());
61
62 for (i, field) in fields.iter().enumerate() {
63 name_to_index.insert(field.name.clone(), i);
64 oids.push(field.type_oid);
65 formats.push(field.format);
66 }
67
68 Self {
69 name_to_index,
70 oids,
71 formats,
72 }
73 }
74}
75
76/// PostgreSQL row with column data and metadata.
77pub struct PgRow {
78 /// Column values (None = NULL)
79 pub columns: Vec<Option<Vec<u8>>>,
80 /// Column metadata (shared across rows via Arc)
81 pub column_info: Option<Arc<ColumnInfo>>,
82}
83
84/// Error type for PostgreSQL driver operations.
85#[derive(Debug)]
86pub enum PgError {
87 /// Connection error
88 Connection(String),
89 /// Protocol error
90 Protocol(String),
91 /// Authentication error
92 Auth(String),
93 /// Query error
94 Query(String),
95 /// No rows returned
96 NoRows,
97 /// I/O error
98 Io(std::io::Error),
99}
100
101impl std::fmt::Display for PgError {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 match self {
104 PgError::Connection(e) => write!(f, "Connection error: {}", e),
105 PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
106 PgError::Auth(e) => write!(f, "Auth error: {}", e),
107 PgError::Query(e) => write!(f, "Query error: {}", e),
108 PgError::NoRows => write!(f, "No rows returned"),
109 PgError::Io(e) => write!(f, "I/O error: {}", e),
110 }
111 }
112}
113
114impl std::error::Error for PgError {}
115
116impl From<std::io::Error> for PgError {
117 fn from(e: std::io::Error) -> Self {
118 PgError::Io(e)
119 }
120}
121
122/// Result type for PostgreSQL operations.
123pub type PgResult<T> = Result<T, PgError>;
124
125/// PostgreSQL driver.
126///
127/// Combines the pure encoder (Layer 2) with async I/O (Layer 3).
128pub struct PgDriver {
129 #[allow(dead_code)]
130 connection: PgConnection,
131}
132
133impl PgDriver {
134 /// Create a new driver with an existing connection.
135 pub fn new(connection: PgConnection) -> Self {
136 Self { connection }
137 }
138
139 /// Builder pattern for ergonomic connection configuration.
140 ///
141 /// # Example
142 /// ```ignore
143 /// let driver = PgDriver::builder()
144 /// .host("localhost")
145 /// .port(5432)
146 /// .user("admin")
147 /// .database("mydb")
148 /// .password("secret") // Optional
149 /// .connect()
150 /// .await?;
151 /// ```
152 pub fn builder() -> PgDriverBuilder {
153 PgDriverBuilder::new()
154 }
155
156 /// Connect to PostgreSQL and create a driver (trust mode, no password).
157 pub async fn connect(host: &str, port: u16, user: &str, database: &str) -> PgResult<Self> {
158 let connection = PgConnection::connect(host, port, user, database).await?;
159 Ok(Self::new(connection))
160 }
161
162 /// Connect to PostgreSQL with password authentication (SCRAM-SHA-256).
163 pub async fn connect_with_password(
164 host: &str,
165 port: u16,
166 user: &str,
167 database: &str,
168 password: &str,
169 ) -> PgResult<Self> {
170 let connection =
171 PgConnection::connect_with_password(host, port, user, database, Some(password)).await?;
172 Ok(Self::new(connection))
173 }
174
175 /// Connect to PostgreSQL with a connection timeout.
176 ///
177 /// If the connection cannot be established within the timeout, returns an error.
178 ///
179 /// # Example
180 /// ```ignore
181 /// use std::time::Duration;
182 /// let driver = PgDriver::connect_with_timeout(
183 /// "localhost", 5432, "user", "db", "password",
184 /// Duration::from_secs(5)
185 /// ).await?;
186 /// ```
187 pub async fn connect_with_timeout(
188 host: &str,
189 port: u16,
190 user: &str,
191 database: &str,
192 password: &str,
193 timeout: std::time::Duration,
194 ) -> PgResult<Self> {
195 tokio::time::timeout(
196 timeout,
197 Self::connect_with_password(host, port, user, database, password),
198 )
199 .await
200 .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
201 }
202
203 /// Execute a QAIL command and fetch all rows (AST-NATIVE).
204 ///
205 /// Uses AstEncoder to directly encode AST to wire protocol bytes.
206 /// NO SQL STRING GENERATION!
207 pub async fn fetch_all(&mut self, cmd: &QailCmd) -> PgResult<Vec<PgRow>> {
208 use crate::protocol::AstEncoder;
209
210 // AST-NATIVE: Encode directly to wire bytes (no to_sql()!)
211 let (wire_bytes, _params) = AstEncoder::encode_cmd(cmd);
212
213 // Send wire bytes and receive response
214 self.connection.send_bytes(&wire_bytes).await?;
215
216 // Collect results
217 let mut rows: Vec<PgRow> = Vec::new();
218 let mut column_info: Option<Arc<ColumnInfo>> = None;
219
220 loop {
221 let msg = self.connection.recv().await?;
222 match msg {
223 crate::protocol::BackendMessage::ParseComplete
224 | crate::protocol::BackendMessage::BindComplete => {}
225 crate::protocol::BackendMessage::RowDescription(fields) => {
226 // Create and share column metadata across all rows
227 column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
228 }
229 crate::protocol::BackendMessage::DataRow(data) => {
230 rows.push(PgRow {
231 columns: data,
232 column_info: column_info.clone(),
233 });
234 }
235 crate::protocol::BackendMessage::CommandComplete(_) => {}
236 crate::protocol::BackendMessage::ReadyForQuery(_) => {
237 return Ok(rows);
238 }
239 crate::protocol::BackendMessage::ErrorResponse(err) => {
240 return Err(PgError::Query(err.message));
241 }
242 _ => {}
243 }
244 }
245 }
246
247 /// Execute a QAIL command and fetch one row.
248 pub async fn fetch_one(&mut self, cmd: &QailCmd) -> PgResult<PgRow> {
249 let rows = self.fetch_all(cmd).await?;
250 rows.into_iter().next().ok_or(PgError::NoRows)
251 }
252
253 /// Execute a QAIL command with PREPARED STATEMENT CACHING.
254 ///
255 /// Like fetch_all(), but caches the prepared statement on the server.
256 /// On first call: sends Parse + Bind + Execute + Sync
257 /// On subsequent calls: sends only Bind + Execute + Sync (much faster!)
258 ///
259 /// Use this for repeated queries with the same AST structure.
260 pub async fn fetch_all_cached(&mut self, cmd: &QailCmd) -> PgResult<Vec<PgRow>> {
261 use crate::protocol::AstEncoder;
262
263 // Generate SQL and params from AST (for caching)
264 let (sql, params) = AstEncoder::encode_cmd_sql(cmd);
265
266 // Use cached query - only parses on first call
267 let raw_rows = self.connection.query_cached(&sql, ¶ms).await?;
268
269 // Convert to PgRow with column info
270 let rows: Vec<PgRow> = raw_rows
271 .into_iter()
272 .map(|data| PgRow {
273 columns: data,
274 column_info: None, // Simple version - no column metadata
275 })
276 .collect();
277
278 Ok(rows)
279 }
280
281 /// Execute a QAIL command (for mutations) - AST-NATIVE.
282 ///
283 /// Uses AstEncoder to directly encode AST to wire protocol bytes.
284 /// Returns the number of affected rows.
285 pub async fn execute(&mut self, cmd: &QailCmd) -> PgResult<u64> {
286 use crate::protocol::AstEncoder;
287
288 // AST-NATIVE: Encode directly to wire bytes (no to_sql()!)
289 let (wire_bytes, _params) = AstEncoder::encode_cmd(cmd);
290
291 // Send wire bytes and receive response
292 self.connection.send_bytes(&wire_bytes).await?;
293
294 // Parse response for affected rows
295 let mut affected = 0u64;
296 loop {
297 let msg = self.connection.recv().await?;
298 match msg {
299 crate::protocol::BackendMessage::ParseComplete
300 | crate::protocol::BackendMessage::BindComplete => {}
301 crate::protocol::BackendMessage::RowDescription(_) => {}
302 crate::protocol::BackendMessage::DataRow(_) => {}
303 crate::protocol::BackendMessage::CommandComplete(tag) => {
304 // Parse "INSERT 0 5" or "UPDATE 3" etc
305 if let Some(n) = tag.split_whitespace().last() {
306 affected = n.parse().unwrap_or(0);
307 }
308 }
309 crate::protocol::BackendMessage::ReadyForQuery(_) => {
310 return Ok(affected);
311 }
312 crate::protocol::BackendMessage::ErrorResponse(err) => {
313 return Err(PgError::Query(err.message));
314 }
315 _ => {}
316 }
317 }
318 }
319
320 // ==================== TRANSACTION CONTROL ====================
321
322 /// Begin a transaction (AST-native).
323 pub async fn begin(&mut self) -> PgResult<()> {
324 self.connection.begin_transaction().await
325 }
326
327 /// Commit the current transaction (AST-native).
328 pub async fn commit(&mut self) -> PgResult<()> {
329 self.connection.commit().await
330 }
331
332 /// Rollback the current transaction (AST-native).
333 pub async fn rollback(&mut self) -> PgResult<()> {
334 self.connection.rollback().await
335 }
336
337 /// Create a named savepoint within the current transaction.
338 ///
339 /// Savepoints allow partial rollback within a transaction.
340 /// Use `rollback_to()` to return to this savepoint.
341 ///
342 /// # Example
343 /// ```ignore
344 /// driver.begin().await?;
345 /// driver.execute(&insert1).await?;
346 /// driver.savepoint("sp1").await?;
347 /// driver.execute(&insert2).await?;
348 /// driver.rollback_to("sp1").await?; // Undo insert2, keep insert1
349 /// driver.commit().await?;
350 /// ```
351 pub async fn savepoint(&mut self, name: &str) -> PgResult<()> {
352 self.connection.savepoint(name).await
353 }
354
355 /// Rollback to a previously created savepoint.
356 ///
357 /// Discards all changes since the named savepoint was created,
358 /// but keeps the transaction open.
359 pub async fn rollback_to(&mut self, name: &str) -> PgResult<()> {
360 self.connection.rollback_to(name).await
361 }
362
363 /// Release a savepoint (free resources, if no longer needed).
364 ///
365 /// After release, the savepoint cannot be rolled back to.
366 pub async fn release_savepoint(&mut self, name: &str) -> PgResult<()> {
367 self.connection.release_savepoint(name).await
368 }
369
370 // ==================== BATCH TRANSACTIONS ====================
371
372 /// Execute multiple commands in a single atomic transaction.
373 ///
374 /// All commands succeed or all are rolled back.
375 /// Returns the number of affected rows for each command.
376 ///
377 /// # Example
378 /// ```ignore
379 /// let cmds = vec![
380 /// QailCmd::add("users").columns(["name"]).values(["Alice"]),
381 /// QailCmd::add("users").columns(["name"]).values(["Bob"]),
382 /// ];
383 /// let results = driver.execute_batch(&cmds).await?;
384 /// // results = [1, 1] (rows affected)
385 /// ```
386 pub async fn execute_batch(&mut self, cmds: &[QailCmd]) -> PgResult<Vec<u64>> {
387 self.begin().await?;
388 let mut results = Vec::with_capacity(cmds.len());
389 for cmd in cmds {
390 match self.execute(cmd).await {
391 Ok(n) => results.push(n),
392 Err(e) => {
393 self.rollback().await?;
394 return Err(e);
395 }
396 }
397 }
398 self.commit().await?;
399 Ok(results)
400 }
401
402 // ==================== STATEMENT TIMEOUT ====================
403
404 /// Set statement timeout for this connection (in milliseconds).
405 ///
406 /// Queries that exceed this time will be cancelled.
407 /// This is a production safety feature.
408 ///
409 /// # Example
410 /// ```ignore
411 /// driver.set_statement_timeout(30_000).await?; // 30 seconds
412 /// ```
413 pub async fn set_statement_timeout(&mut self, ms: u32) -> PgResult<()> {
414 self.execute_raw(&format!("SET statement_timeout = {}", ms))
415 .await
416 }
417
418 /// Reset statement timeout to default (no limit).
419 pub async fn reset_statement_timeout(&mut self) -> PgResult<()> {
420 self.execute_raw("RESET statement_timeout").await
421 }
422
423 // ==================== PIPELINE (BATCH) ====================
424
425 /// Execute multiple QailCmd ASTs in a single network round-trip (PIPELINING).
426 ///
427 /// This is the high-performance path for batch operations.
428 /// Returns the count of successful queries.
429 ///
430 /// # Example
431 /// ```ignore
432 /// let cmds: Vec<QailCmd> = (1..=1000)
433 /// .map(|i| QailCmd::get("harbors").columns(["id", "name"]).limit(i))
434 /// .collect();
435 /// let count = driver.pipeline_batch(&cmds).await?;
436 /// assert_eq!(count, 1000);
437 /// ```
438 pub async fn pipeline_batch(&mut self, cmds: &[QailCmd]) -> PgResult<usize> {
439 self.connection.pipeline_ast_fast(cmds).await
440 }
441
442 /// Execute multiple QailCmd ASTs and return full row data.
443 ///
444 /// Unlike `pipeline_batch` which only returns count, this method
445 /// collects and returns all row data from each query.
446 ///
447 /// Returns: Vec of PgRow vectors, one per query in the batch.
448 pub async fn pipeline_fetch(&mut self, cmds: &[QailCmd]) -> PgResult<Vec<Vec<PgRow>>> {
449 let raw_results = self.connection.pipeline_ast(cmds).await?;
450
451 // Convert raw results to PgRow format
452 let results: Vec<Vec<PgRow>> = raw_results
453 .into_iter()
454 .map(|rows| {
455 rows.into_iter()
456 .map(|columns| PgRow {
457 columns,
458 column_info: None,
459 })
460 .collect()
461 })
462 .collect();
463
464 Ok(results)
465 }
466
467 /// Prepare a SQL statement for repeated execution.
468 ///
469 /// Returns a PreparedStatement handle for use with pipeline_prepared_fast.
470 pub async fn prepare(&mut self, sql: &str) -> PgResult<PreparedStatement> {
471 self.connection.prepare(sql).await
472 }
473
474 /// Execute a prepared statement pipeline in FAST mode (count only).
475 ///
476 /// This is the fastest possible path - Parse once, Bind+Execute many.
477 /// Matches native Rust benchmark performance (~355k q/s).
478 pub async fn pipeline_prepared_fast(
479 &mut self,
480 stmt: &PreparedStatement,
481 params_batch: &[Vec<Option<Vec<u8>>>],
482 ) -> PgResult<usize> {
483 self.connection
484 .pipeline_prepared_fast(stmt, params_batch)
485 .await
486 }
487
488 // ==================== LEGACY/BOOTSTRAP ====================
489
490 /// Execute a raw SQL string.
491 ///
492 /// ⚠️ **Discouraged**: Violates AST-native philosophy.
493 /// Use for bootstrap DDL only (e.g., migration table creation).
494 /// For transactions, use `begin()`, `commit()`, `rollback()`.
495 pub async fn execute_raw(&mut self, sql: &str) -> PgResult<()> {
496 self.connection.execute_simple(sql).await
497 }
498
499 /// Bulk insert data using PostgreSQL COPY protocol (AST-native).
500 ///
501 /// Uses a QailCmd::Add to get validated table and column names from the AST,
502 /// not user-provided strings. This is the sound, AST-native approach.
503 ///
504 /// # Example
505 /// ```ignore
506 /// // Create a QailCmd::Add to define table and columns
507 /// let cmd = QailCmd::add("users")
508 /// .columns(["id", "name", "email"]);
509 ///
510 /// // Bulk insert rows
511 /// let rows: Vec<Vec<Value>> = vec![
512 /// vec![Value::Int(1), Value::String("Alice"), Value::String("alice@ex.com")],
513 /// vec![Value::Int(2), Value::String("Bob"), Value::String("bob@ex.com")],
514 /// ];
515 /// driver.copy_bulk(&cmd, &rows).await?;
516 /// ```
517 pub async fn copy_bulk(
518 &mut self,
519 cmd: &QailCmd,
520 rows: &[Vec<qail_core::ast::Value>],
521 ) -> PgResult<u64> {
522 use qail_core::ast::Action;
523
524 // Validate this is an Add command
525 if cmd.action != Action::Add {
526 return Err(PgError::Query(
527 "copy_bulk requires QailCmd::Add action".to_string(),
528 ));
529 }
530
531 // Extract table from AST (already validated at parse time)
532 let table = &cmd.table;
533
534 // Extract column names from AST expressions
535 let columns: Vec<String> = cmd
536 .columns
537 .iter()
538 .filter_map(|expr| {
539 use qail_core::ast::Expr;
540 match expr {
541 Expr::Named(name) => Some(name.clone()),
542 Expr::Aliased { name, .. } => Some(name.clone()),
543 Expr::Star => None, // Can't COPY with *
544 _ => None,
545 }
546 })
547 .collect();
548
549 if columns.is_empty() {
550 return Err(PgError::Query(
551 "copy_bulk requires columns in QailCmd".to_string(),
552 ));
553 }
554
555 // Use optimized COPY path: direct Value → bytes encoding, single syscall
556 self.connection.copy_in_fast(table, &columns, rows).await
557 }
558
559 /// **Fastest** bulk insert using pre-encoded COPY data.
560 ///
561 /// Accepts raw COPY text format bytes. Use when caller has already
562 /// encoded rows to avoid any encoding overhead.
563 ///
564 /// # Format
565 /// Data should be tab-separated rows with newlines (COPY text format):
566 /// `1\thello\t3.14\n2\tworld\t2.71\n`
567 ///
568 /// # Example
569 /// ```ignore
570 /// let cmd = QailCmd::add("users").columns(["id", "name"]);
571 /// let data = b"1\tAlice\n2\tBob\n";
572 /// driver.copy_bulk_bytes(&cmd, data).await?;
573 /// ```
574 pub async fn copy_bulk_bytes(&mut self, cmd: &QailCmd, data: &[u8]) -> PgResult<u64> {
575 use qail_core::ast::Action;
576
577 if cmd.action != Action::Add {
578 return Err(PgError::Query(
579 "copy_bulk_bytes requires QailCmd::Add action".to_string(),
580 ));
581 }
582
583 let table = &cmd.table;
584 let columns: Vec<String> = cmd
585 .columns
586 .iter()
587 .filter_map(|expr| {
588 use qail_core::ast::Expr;
589 match expr {
590 Expr::Named(name) => Some(name.clone()),
591 Expr::Aliased { name, .. } => Some(name.clone()),
592 _ => None,
593 }
594 })
595 .collect();
596
597 if columns.is_empty() {
598 return Err(PgError::Query(
599 "copy_bulk_bytes requires columns in QailCmd".to_string(),
600 ));
601 }
602
603 // Direct to raw COPY - zero encoding!
604 self.connection.copy_in_raw(table, &columns, data).await
605 }
606
607 /// Stream large result sets using PostgreSQL cursors.
608 ///
609 /// This method uses DECLARE CURSOR internally to stream rows in batches,
610 /// avoiding loading the entire result set into memory.
611 ///
612 /// # Example
613 /// ```ignore
614 /// let cmd = QailCmd::get("large_table");
615 /// let batches = driver.stream_cmd(&cmd, 100).await?;
616 /// for batch in batches {
617 /// for row in batch {
618 /// // process row
619 /// }
620 /// }
621 /// ```
622 pub async fn stream_cmd(
623 &mut self,
624 cmd: &QailCmd,
625 batch_size: usize,
626 ) -> PgResult<Vec<Vec<PgRow>>> {
627 use std::sync::atomic::{AtomicU64, Ordering};
628 static CURSOR_ID: AtomicU64 = AtomicU64::new(0);
629
630 // Generate unique cursor name
631 let cursor_name = format!("qail_cursor_{}", CURSOR_ID.fetch_add(1, Ordering::SeqCst));
632
633 // AST-NATIVE: Generate SQL directly from AST (no to_sql_parameterized!)
634 use crate::protocol::AstEncoder;
635 let mut sql_buf = bytes::BytesMut::with_capacity(256);
636 let mut params: Vec<Option<Vec<u8>>> = Vec::new();
637 AstEncoder::encode_select_sql(cmd, &mut sql_buf, &mut params);
638 let sql = String::from_utf8_lossy(&sql_buf).to_string();
639
640 // Must be in a transaction for cursors
641 self.connection.begin_transaction().await?;
642
643 // Declare cursor
644 self.connection.declare_cursor(&cursor_name, &sql).await?;
645
646 // Fetch all batches
647 let mut all_batches = Vec::new();
648 while let Some(rows) = self
649 .connection
650 .fetch_cursor(&cursor_name, batch_size)
651 .await?
652 {
653 let pg_rows: Vec<PgRow> = rows
654 .into_iter()
655 .map(|cols| PgRow {
656 columns: cols,
657 column_info: None,
658 })
659 .collect();
660 all_batches.push(pg_rows);
661 }
662
663 // Cleanup
664 self.connection.close_cursor(&cursor_name).await?;
665 self.connection.commit().await?;
666
667 Ok(all_batches)
668 }
669}
670
671// ============================================================================
672// Connection Builder
673// ============================================================================
674
675/// Builder for creating PgDriver connections with named parameters.
676///
677/// # Example
678/// ```ignore
679/// let driver = PgDriver::builder()
680/// .host("localhost")
681/// .port(5432)
682/// .user("admin")
683/// .database("mydb")
684/// .password("secret")
685/// .connect()
686/// .await?;
687/// ```
688#[derive(Default)]
689pub struct PgDriverBuilder {
690 host: Option<String>,
691 port: Option<u16>,
692 user: Option<String>,
693 database: Option<String>,
694 password: Option<String>,
695 timeout: Option<std::time::Duration>,
696}
697
698impl PgDriverBuilder {
699 /// Create a new builder with default values.
700 pub fn new() -> Self {
701 Self::default()
702 }
703
704 /// Set the host (default: "127.0.0.1").
705 pub fn host(mut self, host: impl Into<String>) -> Self {
706 self.host = Some(host.into());
707 self
708 }
709
710 /// Set the port (default: 5432).
711 pub fn port(mut self, port: u16) -> Self {
712 self.port = Some(port);
713 self
714 }
715
716 /// Set the username (required).
717 pub fn user(mut self, user: impl Into<String>) -> Self {
718 self.user = Some(user.into());
719 self
720 }
721
722 /// Set the database name (required).
723 pub fn database(mut self, database: impl Into<String>) -> Self {
724 self.database = Some(database.into());
725 self
726 }
727
728 /// Set the password (optional, for SCRAM-SHA-256 auth).
729 pub fn password(mut self, password: impl Into<String>) -> Self {
730 self.password = Some(password.into());
731 self
732 }
733
734 /// Set connection timeout (optional).
735 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
736 self.timeout = Some(timeout);
737 self
738 }
739
740 /// Connect to PostgreSQL using the configured parameters.
741 pub async fn connect(self) -> PgResult<PgDriver> {
742 let host = self.host.as_deref().unwrap_or("127.0.0.1");
743 let port = self.port.unwrap_or(5432);
744 let user = self.user.as_deref().ok_or_else(|| {
745 PgError::Connection("User is required".to_string())
746 })?;
747 let database = self.database.as_deref().ok_or_else(|| {
748 PgError::Connection("Database is required".to_string())
749 })?;
750
751 match (self.password.as_deref(), self.timeout) {
752 (Some(password), Some(timeout)) => {
753 PgDriver::connect_with_timeout(host, port, user, database, password, timeout).await
754 }
755 (Some(password), None) => {
756 PgDriver::connect_with_password(host, port, user, database, password).await
757 }
758 (None, Some(timeout)) => {
759 tokio::time::timeout(
760 timeout,
761 PgDriver::connect(host, port, user, database),
762 )
763 .await
764 .map_err(|_| PgError::Connection(format!("Connection timeout after {:?}", timeout)))?
765 }
766 (None, None) => {
767 PgDriver::connect(host, port, user, database).await
768 }
769 }
770 }
771}