Skip to main content

bsql_driver_postgres/
pool.rs

1//! Connection pool — LIFO ordering, fail-fast acquire, Condvar-based waiting.
2//!
3//! The pool maintains a stack of idle connections. `acquire()` pops the top
4//! (most recently used = warmest caches). On drop, the guard pushes the
5//! connection back. If the pool is exhausted and no `acquire_timeout` is set,
6//! `acquire()` returns an error immediately — no blocking, no waiting.
7//!
8//! When `acquire_timeout` is set, blocked callers wait on a `Condvar` and are
9//! woken when a connection is returned to the pool.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::time::Duration;
14
15use crate::DriverError;
16use crate::arena::Arena;
17use crate::codec::Encode;
18use crate::conn::Connection;
19use crate::types::{Config, PgDataRow, QueryResult, SimpleRow};
20
21// --- Pool ---
22
23/// A connection pool with LIFO ordering and fail-fast semantics.
24///
25/// # Example
26///
27/// ```no_run
28/// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
29/// let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
30/// let mut conn = pool.acquire()?;
31/// conn.simple_query("SELECT 1")?;
32/// // conn is returned to pool on drop
33/// # Ok(())
34/// # }
35/// ```
36pub struct Pool {
37    inner: Arc<PoolInner>,
38}
39
40struct PoolInner {
41    /// Idle connections. Uses std::sync::Mutex because the critical section is
42    /// trivial (push/pop — no I/O). This lets PoolGuard::Drop return connections
43    /// synchronously.
44    stack: std::sync::Mutex<Vec<Connection>>,
45    max_size: usize,
46    open_count: AtomicUsize,
47    config: Arc<Config>,
48    /// When true, no new acquires are accepted.
49    closed: AtomicBool,
50    /// Condvar pair for release notification. Waiters block on the Condvar
51    /// when the pool is exhausted and `acquire_timeout` is set.
52    release_pair: (std::sync::Mutex<()>, std::sync::Condvar),
53    /// Maximum lifetime of a connection. Connections older than this
54    /// are discarded when popped from the pool. Default: 30 minutes.
55    max_lifetime: Option<Duration>,
56    /// Maximum time to wait for a connection. Default: None (fail-fast).
57    acquire_timeout: Option<Duration>,
58    /// Minimum number of idle connections to maintain. Default: 0.
59    min_idle: usize,
60    /// SQL statements to PREPARE on new connections (warmup).
61    warmup_sqls: std::sync::Mutex<Arc<Vec<Box<str>>>>,
62    /// Maximum number of cached prepared statements per connection.
63    max_stmt_cache_size: usize,
64}
65
66impl Pool {
67    /// Create a pool from a connection URL with default settings (max_size = 10).
68    ///
69    /// Validates the URL but does not open any connections yet (lazy initialization).
70    pub fn connect(url: &str) -> Result<Self, DriverError> {
71        PoolBuilder::new().url(url).build()
72    }
73
74    /// Create a pool builder for custom configuration.
75    pub fn builder() -> PoolBuilder {
76        PoolBuilder::new()
77    }
78
79    /// Acquire a connection from the pool.
80    ///
81    /// Returns immediately with the most recently used idle connection (LIFO).
82    /// If no idle connections are available and the pool is below max_size, a new
83    /// connection is created. If the pool is at max_size and no `acquire_timeout`
84    /// is set, returns `DriverError::Pool` immediately. If `acquire_timeout` is
85    /// set, blocks until a connection is returned or the timeout expires.
86    #[inline]
87    pub fn acquire(&self) -> Result<PoolGuard, DriverError> {
88        if self.inner.closed.load(Ordering::Acquire) {
89            return Err(DriverError::Pool("pool is closed".into()));
90        }
91
92        // Try to pop an idle connection (fast path).
93        if let Some(guard) = self.try_pop_idle()? {
94            return Ok(guard);
95        }
96
97        // No idle connections — try to claim a slot with a proper CAS loop.
98        loop {
99            let current = self.inner.open_count.load(Ordering::Acquire);
100            if current >= self.inner.max_size {
101                if let Some(timeout) = self.inner.acquire_timeout {
102                    let (lock, cvar) = &self.inner.release_pair;
103                    let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
104                    let (_guard, result) = cvar
105                        .wait_timeout(guard, timeout)
106                        .unwrap_or_else(|e| e.into_inner());
107                    if result.timed_out() {
108                        return Err(DriverError::Pool(
109                            "pool exhausted: acquire timeout expired".into(),
110                        ));
111                    }
112                    // A connection was returned — try again
113                    if let Some(guard) = self.try_pop_idle()? {
114                        return Ok(guard);
115                    }
116                    // Popped nothing — retry CAS
117                    continue;
118                }
119                return Err(DriverError::Pool(
120                    "pool exhausted: all connections in use".into(),
121                ));
122            }
123            if self
124                .inner
125                .open_count
126                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
127                .is_ok()
128            {
129                break;
130            }
131            // CAS failed — another thread incremented. Retry.
132        }
133
134        // Open a new connection
135        let conn_result = Connection::connect_arc(self.inner.config.clone());
136        match conn_result {
137            Ok(mut conn) => {
138                // Configure statement cache size
139                conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
140                // Warmup: pre-PREPARE frequently used statements
141                self.warmup_conn(&mut conn);
142
143                Ok(PoolGuard {
144                    conn: Some(conn),
145                    pool: self.inner.clone(),
146                    discard: false,
147                })
148            }
149            Err(e) => {
150                // Give back the slot
151                self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
152                Err(e)
153            }
154        }
155    }
156
157    /// Try to pop a valid idle connection from the stack.
158    #[inline]
159    fn try_pop_idle(&self) -> Result<Option<PoolGuard>, DriverError> {
160        let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
161        while let Some(conn) = stack.pop() {
162            if let Some(max_lifetime) = self.inner.max_lifetime {
163                if conn.created_at().elapsed() >= max_lifetime {
164                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
165                    continue;
166                }
167            }
168            if conn.idle_duration() < Duration::from_secs(30) {
169                return Ok(Some(PoolGuard {
170                    conn: Some(conn),
171                    pool: self.inner.clone(),
172                    discard: false,
173                }));
174            }
175            // Stale connection — drop it, free the slot
176            self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
177        }
178        Ok(None)
179    }
180
181    /// Whether this pool uses UDS connections.
182    ///
183    /// Returns `true` when the pool URL points to a Unix domain socket.
184    /// On non-Unix platforms, always returns `false`.
185    pub fn is_uds(&self) -> bool {
186        #[cfg(unix)]
187        {
188            self.inner.config.host_is_uds()
189        }
190        #[cfg(not(unix))]
191        {
192            false
193        }
194    }
195
196    /// Begin a transaction. Acquires a connection and sends BEGIN.
197    pub fn begin(&self) -> Result<Transaction, DriverError> {
198        let mut guard = self.acquire()?;
199        guard.simple_query("BEGIN")?;
200        Ok(Transaction {
201            guard,
202            committed: false,
203            deferred_buf: Vec::new(),
204            deferred_count: 0,
205        })
206    }
207
208    /// Current number of open connections (idle + in-use).
209    pub fn open_count(&self) -> usize {
210        self.inner.open_count.load(Ordering::Relaxed)
211    }
212
213    /// Maximum pool size.
214    pub fn max_size(&self) -> usize {
215        self.inner.max_size
216    }
217
218    /// Pool status metrics.
219    pub fn status(&self) -> PoolStatus {
220        let idle = self
221            .inner
222            .stack
223            .lock()
224            .unwrap_or_else(|e| e.into_inner())
225            .len();
226        let open = self.inner.open_count.load(Ordering::Relaxed);
227        let active = open.saturating_sub(idle);
228        PoolStatus {
229            idle,
230            active,
231            open,
232            max_size: self.inner.max_size,
233        }
234    }
235
236    /// Pre-PREPARE warmup statements on a new connection.
237    ///
238    /// Uses `prepare_only()` which sends Parse+Describe+Sync without
239    /// Bind+Execute — no query execution, only statement caching.
240    ///
241    /// Best-effort: errors on individual statements are silently ignored.
242    /// The connection remains usable even if warmup fails.
243    fn warmup_conn(&self, conn: &mut Connection) {
244        let sqls = self
245            .inner
246            .warmup_sqls
247            .lock()
248            .unwrap_or_else(|e| e.into_inner())
249            .clone();
250
251        if sqls.is_empty() {
252            return;
253        }
254
255        for sql in sqls.iter() {
256            let sql_hash = crate::types::hash_sql(sql);
257            let _ = conn.prepare_only(sql, sql_hash);
258        }
259    }
260
261    /// Set the SQL statements to pre-PREPARE on new connections.
262    ///
263    /// Each SQL string is PREPAREd (Parse+Describe+Sync) on new connections
264    /// before they are returned from `acquire()`. This eliminates the first-use
265    /// Parse overhead for frequently executed queries.
266    ///
267    /// Warmup errors are silently ignored — a bad warmup SQL must not prevent
268    /// the connection from being usable.
269    ///
270    /// # Example
271    ///
272    /// ```no_run
273    /// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
274    /// let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
275    /// pool.set_warmup_sqls(&[
276    ///     "SELECT id, name FROM users WHERE id = $1::int4",
277    ///     "SELECT id, title FROM tickets WHERE status = ANY($1::text[])",
278    /// ]);
279    /// # Ok(())
280    /// # }
281    /// ```
282    pub fn set_warmup_sqls(&self, sqls: &[&str]) {
283        let boxed: Arc<Vec<Box<str>>> =
284            Arc::new(sqls.iter().map(|s| (*s).into()).collect::<Vec<_>>());
285        *self
286            .inner
287            .warmup_sqls
288            .lock()
289            .unwrap_or_else(|e| e.into_inner()) = boxed;
290    }
291
292    /// Close the pool. No new acquires are accepted. All idle connections
293    /// are sent Terminate and dropped.
294    pub fn close(&self) {
295        self.inner.closed.store(true, Ordering::Release);
296        // Drain and close all idle connections
297        let conns: Vec<Connection> = {
298            let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
299            std::mem::take(&mut *stack)
300        };
301        for conn in conns {
302            self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
303            let _ = conn.close();
304        }
305        // Notify any waiters so they get the "pool is closed" error
306        let (_, cvar) = &self.inner.release_pair;
307        cvar.notify_all();
308    }
309
310    /// Whether the pool has been closed.
311    pub fn is_closed(&self) -> bool {
312        self.inner.closed.load(Ordering::Acquire)
313    }
314}
315
316impl Clone for Pool {
317    fn clone(&self) -> Self {
318        Pool {
319            inner: self.inner.clone(),
320        }
321    }
322}
323
324// --- PoolStatus ---
325
326/// Pool status metrics.
327#[derive(Debug, Clone, Copy)]
328pub struct PoolStatus {
329    /// Number of idle connections in the pool.
330    pub idle: usize,
331    /// Number of connections currently in use.
332    pub active: usize,
333    /// Total open connections (idle + active).
334    pub open: usize,
335    /// Maximum pool size.
336    pub max_size: usize,
337}
338
339// --- PoolBuilder ---
340
341/// Builder for configuring a connection pool.
342pub struct PoolBuilder {
343    url: Option<String>,
344    max_size: usize,
345    /// Maximum lifetime of a connection.
346    max_lifetime: Option<Duration>,
347    /// Maximum time to wait for a connection when pool is exhausted.
348    acquire_timeout: Option<Duration>,
349    /// Minimum number of idle connections to maintain.
350    min_idle: usize,
351    /// Maximum number of cached prepared statements per connection.
352    max_stmt_cache_size: usize,
353}
354
355impl PoolBuilder {
356    fn new() -> Self {
357        Self {
358            url: None,
359            max_size: 10,
360            max_lifetime: Some(Duration::from_secs(30 * 60)), // 30 min default
361            acquire_timeout: None,                            // fail-fast by default (CREDO #17)
362            min_idle: 0,                                      // no minimum by default
363            max_stmt_cache_size: 256,                         // LRU eviction at 256 stmts
364        }
365    }
366
367    /// Set the connection URL.
368    pub fn url(mut self, url: &str) -> Self {
369        self.url = Some(url.to_owned());
370        self
371    }
372
373    /// Set the maximum pool size. Default: 10.
374    ///
375    /// A max_size of 0 means the pool will reject all acquire() calls immediately.
376    pub fn max_size(mut self, size: usize) -> Self {
377        self.max_size = size;
378        self
379    }
380
381    /// Set the maximum lifetime of a connection. Default: 30 minutes.
382    /// Set to None for unlimited lifetime.
383    pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
384        self.max_lifetime = lifetime;
385        self
386    }
387
388    /// Set the acquire timeout. Default: None (fail-fast, per CREDO #17).
389    /// Set to a Duration to enable waiting when the pool is exhausted.
390    pub fn acquire_timeout(mut self, timeout: Option<Duration>) -> Self {
391        self.acquire_timeout = timeout;
392        self
393    }
394
395    /// Set the minimum number of idle connections. Default: 0.
396    /// When > 0, a background thread maintains this many idle connections.
397    pub fn min_idle(mut self, count: usize) -> Self {
398        self.min_idle = count;
399        self
400    }
401
402    /// Set the maximum number of cached prepared statements per connection.
403    /// Default: 256. When the cache exceeds this size, the least recently
404    /// used statement is evicted (Close sent to PG to free server memory).
405    pub fn max_stmt_cache_size(mut self, size: usize) -> Self {
406        self.max_stmt_cache_size = size;
407        self
408    }
409
410    /// Build the pool. Validates the URL but does not open connections.
411    pub fn build(self) -> Result<Pool, DriverError> {
412        let url = self
413            .url
414            .ok_or_else(|| DriverError::Pool("pool builder requires a URL".into()))?;
415
416        let config = Arc::new(Config::from_url(&url)?);
417
418        let pool = Pool {
419            inner: Arc::new(PoolInner {
420                stack: std::sync::Mutex::new(Vec::with_capacity(self.max_size)),
421                max_size: self.max_size,
422                open_count: AtomicUsize::new(0),
423                config,
424                closed: AtomicBool::new(false),
425                release_pair: (std::sync::Mutex::new(()), std::sync::Condvar::new()),
426                max_lifetime: self.max_lifetime,
427                acquire_timeout: self.acquire_timeout,
428                min_idle: self.min_idle,
429                warmup_sqls: std::sync::Mutex::new(Arc::new(Vec::new())),
430                max_stmt_cache_size: self.max_stmt_cache_size,
431            }),
432        };
433
434        if self.min_idle > 0 {
435            let inner = pool.inner.clone();
436            std::thread::spawn(move || {
437                maintain_min_idle(inner);
438            });
439        }
440
441        Ok(pool)
442    }
443}
444
445/// Background thread that maintains min_idle connections.
446fn maintain_min_idle(inner: Arc<PoolInner>) {
447    loop {
448        if inner.closed.load(Ordering::Acquire) {
449            return;
450        }
451
452        let idle_count = inner.stack.lock().unwrap_or_else(|e| e.into_inner()).len();
453        let needed = inner.min_idle.saturating_sub(idle_count);
454
455        for _ in 0..needed {
456            if inner.closed.load(Ordering::Acquire) {
457                return;
458            }
459            let current = inner.open_count.load(Ordering::Acquire);
460            if current >= inner.max_size {
461                break;
462            }
463            if inner
464                .open_count
465                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
466                .is_err()
467            {
468                continue;
469            }
470
471            match Connection::connect_arc(inner.config.clone()) {
472                Ok(conn) => {
473                    let mut stack = inner.stack.lock().unwrap_or_else(|e| e.into_inner());
474                    stack.push(conn);
475                    let (_, cvar) = &inner.release_pair;
476                    cvar.notify_one();
477                }
478                Err(_) => {
479                    inner.open_count.fetch_sub(1, Ordering::AcqRel);
480                }
481            }
482        }
483
484        // Check every 5 seconds
485        std::thread::sleep(Duration::from_secs(5));
486    }
487}
488
489// --- PoolGuard ---
490
491/// A borrowed connection from the pool. Returns to the pool on drop.
492///
493/// If the connection is in a failed transaction state, broken, or marked for
494/// discard, it is dropped (decrements open_count) instead of returned.
495pub struct PoolGuard {
496    conn: Option<Connection>,
497    pool: Arc<PoolInner>,
498    /// When true, the connection is dropped instead of returned to the pool.
499    discard: bool,
500}
501
502impl PoolGuard {
503    /// Mark this connection for discard — it will NOT be returned to the pool
504    /// on drop. The open_count is decremented and the TCP connection is closed.
505    pub fn mark_discard(&mut self) {
506        self.discard = true;
507    }
508
509    /// Cancel the currently running query on the underlying connection.
510    ///
511    /// Opens a new TCP connection and sends a CancelRequest to PG.
512    /// The cancel connection is closed immediately after.
513    pub fn cancel(&self) -> Result<(), DriverError> {
514        let conn = self
515            .conn
516            .as_ref()
517            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?;
518        conn.cancel()
519    }
520
521    // --- Introspection dispatch methods ---
522
523    /// Get the backend process ID for this connection.
524    pub fn pid(&self) -> i32 {
525        self.conn.as_ref().expect("connection taken").pid()
526    }
527
528    /// Whether the connection is idle (not in a transaction).
529    pub fn is_idle(&self) -> bool {
530        self.conn.as_ref().expect("connection taken").is_idle()
531    }
532
533    /// Whether the connection is inside a transaction.
534    pub fn is_in_transaction(&self) -> bool {
535        self.conn
536            .as_ref()
537            .expect("connection taken")
538            .is_in_transaction()
539    }
540
541    // --- Query dispatch methods ---
542
543    /// Execute a prepared query and return rows.
544    #[inline]
545    pub fn query(
546        &mut self,
547        sql: &str,
548        sql_hash: u64,
549        params: &[&(dyn Encode + Sync)],
550    ) -> Result<QueryResult, DriverError> {
551        self.conn
552            .as_mut()
553            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
554            .query(sql, sql_hash, params)
555    }
556
557    /// Execute a query without result rows (INSERT/UPDATE/DELETE).
558    #[inline]
559    pub fn execute(
560        &mut self,
561        sql: &str,
562        sql_hash: u64,
563        params: &[&(dyn Encode + Sync)],
564    ) -> Result<u64, DriverError> {
565        self.conn
566            .as_mut()
567            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
568            .execute(sql, sql_hash, params)
569    }
570
571    /// Execute the same statement N times with different params in one pipeline.
572    ///
573    /// Sends all N Bind+Execute messages + one Sync. One round-trip for N operations.
574    /// Returns the affected row count for each parameter set.
575    pub fn execute_pipeline(
576        &mut self,
577        sql: &str,
578        sql_hash: u64,
579        param_sets: &[&[&(dyn Encode + Sync)]],
580    ) -> Result<Vec<u64>, DriverError> {
581        self.conn
582            .as_mut()
583            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
584            .execute_pipeline(sql, sql_hash, param_sets)
585    }
586
587    /// Execute a simple (unprepared) query.
588    pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
589        self.conn
590            .as_mut()
591            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
592            .simple_query(sql)
593    }
594
595    /// Execute a simple query and return rows as text.
596    ///
597    /// Uses PostgreSQL's simple query protocol — all values are strings.
598    pub fn simple_query_rows(&mut self, sql: &str) -> Result<Vec<SimpleRow>, DriverError> {
599        self.conn
600            .as_mut()
601            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
602            .simple_query_rows(sql)
603    }
604
605    /// Process each row via a closure with zero-copy `PgDataRow`.
606    pub fn for_each<F>(
607        &mut self,
608        sql: &str,
609        sql_hash: u64,
610        params: &[&(dyn Encode + Sync)],
611        f: F,
612    ) -> Result<(), DriverError>
613    where
614        F: FnMut(PgDataRow<'_>) -> Result<(), DriverError>,
615    {
616        self.conn
617            .as_mut()
618            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
619            .for_each(sql, sql_hash, params, f)
620    }
621
622    /// Process each DataRow as raw bytes — fastest path.
623    pub fn for_each_raw<F>(
624        &mut self,
625        sql: &str,
626        sql_hash: u64,
627        params: &[&(dyn Encode + Sync)],
628        f: F,
629    ) -> Result<(), DriverError>
630    where
631        F: FnMut(&[u8]) -> Result<(), DriverError>,
632    {
633        self.conn
634            .as_mut()
635            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
636            .for_each_raw(sql, sql_hash, params, f)
637    }
638
639    // --- Streaming ---
640
641    /// Start a streaming query.
642    pub fn query_streaming_start(
643        &mut self,
644        sql: &str,
645        sql_hash: u64,
646        params: &[&(dyn Encode + Sync)],
647        chunk_size: i32,
648    ) -> Result<(std::sync::Arc<[crate::types::ColumnDesc]>, bool), DriverError> {
649        self.conn
650            .as_mut()
651            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
652            .query_streaming_start(sql, sql_hash, params, chunk_size)
653    }
654
655    /// Send Execute+Flush for a streaming query (2nd+ chunks).
656    pub fn streaming_send_execute(&mut self, chunk_size: i32) -> Result<(), DriverError> {
657        self.conn
658            .as_mut()
659            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
660            .streaming_send_execute(chunk_size)
661    }
662
663    /// Read the next chunk of rows from an in-progress streaming query.
664    pub fn streaming_next_chunk(
665        &mut self,
666        arena: &mut Arena,
667        all_col_offsets: &mut Vec<(usize, i32)>,
668    ) -> Result<bool, DriverError> {
669        self.conn
670            .as_mut()
671            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
672            .streaming_next_chunk(arena, all_col_offsets)
673    }
674
675    /// Whether this guard holds a sync connection (always true now).
676    pub fn is_sync(&self) -> bool {
677        true
678    }
679
680    // --- Deferred pipeline support ---
681
682    /// Ensure a statement is prepared and cached.
683    pub(crate) fn ensure_stmt_prepared(
684        &mut self,
685        sql: &str,
686        sql_hash: u64,
687        params: &[&(dyn Encode + Sync)],
688    ) -> Result<[u8; 18], DriverError> {
689        self.conn
690            .as_mut()
691            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
692            .ensure_stmt_prepared(sql, sql_hash, params)
693    }
694
695    /// Write Bind+Execute bytes for a prepared statement into an external buffer.
696    pub(crate) fn write_deferred_bind_execute(
697        &self,
698        sql_hash: u64,
699        params: &[&(dyn Encode + Sync)],
700        buf: &mut Vec<u8>,
701    ) {
702        let conn = self.conn.as_ref().expect("connection taken");
703        conn.write_deferred_bind_execute(sql_hash, params, buf);
704    }
705
706    /// Flush a buffer of deferred Bind+Execute messages as a single pipeline.
707    pub(crate) fn flush_deferred_pipeline(
708        &mut self,
709        buf: &mut Vec<u8>,
710        count: usize,
711    ) -> Result<Vec<u64>, DriverError> {
712        self.conn
713            .as_mut()
714            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
715            .flush_deferred_pipeline(buf, count)
716    }
717}
718
719impl Drop for PoolGuard {
720    fn drop(&mut self) {
721        if let Some(mut conn) = self.conn.take() {
722            // Discard if:
723            //   - explicitly marked for discard
724            //   - in a failed transaction (tx_status == 'E')
725            //   - in an active transaction (tx_status == 'T') — uncommitted tx
726            //   - streaming query in progress — connection in indeterminate state
727            //   - pool is closed
728            if self.discard
729                || conn.is_in_failed_transaction()
730                || conn.is_in_transaction()
731                || conn.is_streaming()
732                || self.pool.closed.load(Ordering::Acquire)
733            {
734                self.pool.open_count.fetch_sub(1, Ordering::AcqRel);
735                return;
736            }
737
738            // Stamp last-used time for idle connection tracking.
739            // Amortized: only call Instant::now() every 64 returns.
740            // Max error: 64 queries × ~15us = ~1ms on a 30s idle threshold.
741            if conn.query_counter() & 63 == 0 {
742                conn.touch();
743            }
744
745            // Return to pool
746            {
747                let mut stack = self.pool.stack.lock().unwrap_or_else(|e| e.into_inner());
748                stack.push(conn);
749            }
750
751            // Notify waiters only if pool was exhausted (someone might be waiting).
752            // Skip notify when pool has spare capacity — saves ~20ns per release.
753            if self.pool.open_count.load(Ordering::Relaxed) >= self.pool.max_size {
754                let (_, cvar) = &self.pool.release_pair;
755                cvar.notify_one();
756            }
757        }
758    }
759}
760
761// --- Transaction ---
762
763/// A database transaction. Sends ROLLBACK on drop if not committed.
764///
765/// # Example
766///
767/// ```no_run
768/// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
769/// # let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
770/// let mut tx = pool.begin()?;
771/// tx.simple_query("INSERT INTO t VALUES (1)")?;
772/// tx.commit()?;
773/// # Ok(())
774/// # }
775/// ```
776pub struct Transaction {
777    guard: PoolGuard,
778    committed: bool,
779    /// Accumulated Bind+Execute message bytes for deferred operations.
780    deferred_buf: Vec<u8>,
781    /// Number of deferred operations buffered.
782    deferred_count: usize,
783}
784
785impl Transaction {
786    /// Commit the transaction.
787    ///
788    /// Automatically flushes any deferred operations before committing.
789    pub fn commit(mut self) -> Result<(), DriverError> {
790        if self.deferred_count > 0 {
791            self.flush_deferred()?;
792        }
793        self.guard.simple_query("COMMIT")?;
794        self.committed = true;
795        Ok(())
796    }
797
798    /// Rollback the transaction explicitly.
799    ///
800    /// Discards any deferred operations without sending them.
801    pub fn rollback(mut self) -> Result<(), DriverError> {
802        self.deferred_buf.clear();
803        self.deferred_count = 0;
804        self.guard.simple_query("ROLLBACK")?;
805        self.committed = true; // prevent double rollback in drop
806        Ok(())
807    }
808
809    /// Execute a prepared query within the transaction.
810    ///
811    /// Automatically flushes any deferred operations before executing the query,
812    /// ensuring read-your-writes consistency.
813    pub fn query(
814        &mut self,
815        sql: &str,
816        sql_hash: u64,
817        params: &[&(dyn Encode + Sync)],
818    ) -> Result<QueryResult, DriverError> {
819        if self.deferred_count > 0 {
820            self.flush_deferred()?;
821        }
822        self.guard.query(sql, sql_hash, params)
823    }
824
825    /// Execute without result rows within the transaction.
826    pub fn execute(
827        &mut self,
828        sql: &str,
829        sql_hash: u64,
830        params: &[&(dyn Encode + Sync)],
831    ) -> Result<u64, DriverError> {
832        self.guard.execute(sql, sql_hash, params)
833    }
834
835    /// Execute the same statement N times with different params in one pipeline.
836    pub fn execute_pipeline(
837        &mut self,
838        sql: &str,
839        sql_hash: u64,
840        param_sets: &[&[&(dyn Encode + Sync)]],
841    ) -> Result<Vec<u64>, DriverError> {
842        self.guard.execute_pipeline(sql, sql_hash, param_sets)
843    }
844
845    /// Process each row directly from the wire buffer within a transaction.
846    ///
847    /// Automatically flushes any deferred operations first.
848    pub fn for_each<F>(
849        &mut self,
850        sql: &str,
851        sql_hash: u64,
852        params: &[&(dyn Encode + Sync)],
853        f: F,
854    ) -> Result<(), DriverError>
855    where
856        F: FnMut(crate::types::PgDataRow<'_>) -> Result<(), DriverError>,
857    {
858        if self.deferred_count > 0 {
859            self.flush_deferred()?;
860        }
861        self.guard.for_each(sql, sql_hash, params, f)
862    }
863
864    /// Process each DataRow as raw bytes within a transaction.
865    ///
866    /// Automatically flushes any deferred operations first.
867    pub fn for_each_raw<F>(
868        &mut self,
869        sql: &str,
870        sql_hash: u64,
871        params: &[&(dyn Encode + Sync)],
872        f: F,
873    ) -> Result<(), DriverError>
874    where
875        F: FnMut(&[u8]) -> Result<(), DriverError>,
876    {
877        if self.deferred_count > 0 {
878            self.flush_deferred()?;
879        }
880        self.guard.for_each_raw(sql, sql_hash, params, f)
881    }
882
883    /// Simple query within the transaction.
884    ///
885    /// Automatically flushes any deferred operations first.
886    pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
887        if self.deferred_count > 0 {
888            self.flush_deferred()?;
889        }
890        self.guard.simple_query(sql)
891    }
892
893    // --- Deferred pipeline API ---
894
895    /// Buffer an execute for deferred pipeline flush.
896    ///
897    /// The operation is not sent to the server immediately. Instead, the
898    /// Bind+Execute message bytes are buffered internally. The buffered
899    /// operations are sent as a single pipeline on [`commit()`](Self::commit)
900    /// or [`flush_deferred()`](Self::flush_deferred).
901    ///
902    /// # Example
903    ///
904    /// ```no_run
905    /// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
906    /// # let pool = bsql_driver_postgres::Pool::connect("postgres://u:p@localhost/db")?;
907    /// let mut tx = pool.begin()?;
908    /// let sql = "INSERT INTO t (v) VALUES ($1)";
909    /// let hash = bsql_driver_postgres::hash_sql(sql);
910    ///
911    /// // These are buffered, not sent:
912    /// tx.defer_execute(sql, hash, &[&1i32])?;
913    /// tx.defer_execute(sql, hash, &[&2i32])?;
914    /// tx.defer_execute(sql, hash, &[&3i32])?;
915    ///
916    /// // commit() flushes all 3 as one pipeline + COMMIT = 2 round-trips total
917    /// tx.commit()?;
918    /// # Ok(())
919    /// # }
920    /// ```
921    pub fn defer_execute(
922        &mut self,
923        sql: &str,
924        sql_hash: u64,
925        params: &[&(dyn Encode + Sync)],
926    ) -> Result<(), DriverError> {
927        if params.len() > i16::MAX as usize {
928            return Err(DriverError::Protocol(format!(
929                "parameter count {} exceeds maximum {}",
930                params.len(),
931                i16::MAX
932            )));
933        }
934
935        // Ensure statement is prepared (may require one round-trip on first call)
936        self.guard.ensure_stmt_prepared(sql, sql_hash, params)?;
937
938        // Buffer the Bind+Execute bytes — no I/O
939        self.guard
940            .write_deferred_bind_execute(sql_hash, params, &mut self.deferred_buf);
941        self.deferred_count += 1;
942        Ok(())
943    }
944
945    /// Flush all deferred operations as a single pipeline.
946    ///
947    /// Sends all buffered Bind+Execute messages + one Sync in a single TCP write.
948    /// Returns the affected row count for each deferred operation.
949    pub fn flush_deferred(&mut self) -> Result<Vec<u64>, DriverError> {
950        let count = self.deferred_count;
951        self.deferred_count = 0;
952        self.guard
953            .flush_deferred_pipeline(&mut self.deferred_buf, count)
954    }
955
956    /// Number of operations currently buffered for deferred execution.
957    pub fn deferred_count(&self) -> usize {
958        self.deferred_count
959    }
960}
961
962impl Drop for Transaction {
963    fn drop(&mut self) {
964        if !self.committed {
965            // Connection is in an uncommitted transaction — discard it from the pool.
966            // Take the connection out of the guard and drop it, decrementing open_count.
967            if let Some(_conn) = self.guard.conn.take() {
968                self.guard.pool.open_count.fetch_sub(1, Ordering::AcqRel);
969                // Connection dropped — PG server will auto-rollback when it sees disconnect
970            }
971        }
972    }
973}
974
975#[cfg(test)]
976mod tests {
977    use super::*;
978
979    #[test]
980    fn pool_builder_requires_url() {
981        let result = PoolBuilder::new().build();
982        assert!(result.is_err());
983    }
984
985    #[test]
986    fn pool_builder_validates_url() {
987        let result = PoolBuilder::new().url("not_a_url").build();
988        assert!(result.is_err());
989    }
990
991    #[test]
992    fn pool_builder_accepts_valid_url() {
993        let pool = PoolBuilder::new()
994            .url("postgres://user:pass@localhost/db")
995            .max_size(5)
996            .build()
997            .unwrap();
998        assert_eq!(pool.max_size(), 5);
999        assert_eq!(pool.open_count(), 0);
1000    }
1001
1002    #[test]
1003    fn pool_connect_validates_url() {
1004        let result = Pool::connect("not_a_url");
1005        assert!(result.is_err());
1006    }
1007
1008    #[test]
1009    fn pool_max_size_zero() {
1010        let pool = PoolBuilder::new()
1011            .url("postgres://user:pass@localhost/db")
1012            .max_size(0)
1013            .build()
1014            .unwrap();
1015
1016        let result = pool.acquire();
1017        assert!(result.is_err());
1018        match result {
1019            Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1020            Err(e) => panic!("expected Pool error, got: {e:?}"),
1021            Ok(_) => panic!("expected error, got Ok"),
1022        }
1023    }
1024
1025    #[test]
1026    fn pool_clone_shares_state() {
1027        let pool = PoolBuilder::new()
1028            .url("postgres://user:pass@localhost/db")
1029            .max_size(5)
1030            .build()
1031            .unwrap();
1032
1033        let pool2 = pool.clone();
1034        assert_eq!(pool.max_size(), pool2.max_size());
1035    }
1036
1037    // --- Audit gap tests ---
1038
1039    // #60: max_lifetime is configurable
1040    #[test]
1041    fn pool_builder_max_lifetime() {
1042        let pool = PoolBuilder::new()
1043            .url("postgres://user:pass@localhost/db")
1044            .max_lifetime(Some(Duration::from_secs(60)))
1045            .build()
1046            .unwrap();
1047        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(60)));
1048    }
1049
1050    // #60: max_lifetime None
1051    #[test]
1052    fn pool_builder_max_lifetime_none() {
1053        let pool = PoolBuilder::new()
1054            .url("postgres://user:pass@localhost/db")
1055            .max_lifetime(None)
1056            .build()
1057            .unwrap();
1058        assert_eq!(pool.inner.max_lifetime, None);
1059    }
1060
1061    // #62: acquire_timeout set to None (fail-fast)
1062    #[test]
1063    fn pool_builder_acquire_timeout_none() {
1064        let pool = PoolBuilder::new()
1065            .url("postgres://user:pass@localhost/db")
1066            .acquire_timeout(None)
1067            .build()
1068            .unwrap();
1069        assert_eq!(pool.inner.acquire_timeout, None);
1070    }
1071
1072    // #62: acquire_timeout custom value
1073    #[test]
1074    fn pool_builder_acquire_timeout_custom() {
1075        let pool = PoolBuilder::new()
1076            .url("postgres://user:pass@localhost/db")
1077            .acquire_timeout(Some(Duration::from_secs(10)))
1078            .build()
1079            .unwrap();
1080        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(10)));
1081    }
1082
1083    // #63: min_idle setting
1084    #[test]
1085    fn pool_builder_min_idle() {
1086        let pool = PoolBuilder::new()
1087            .url("postgres://user:pass@localhost/db")
1088            .min_idle(2)
1089            .build()
1090            .unwrap();
1091        assert_eq!(pool.inner.min_idle, 2);
1092    }
1093
1094    // #64: Pool close marks pool as closed
1095    #[test]
1096    fn pool_close_marks_closed() {
1097        let pool = PoolBuilder::new()
1098            .url("postgres://user:pass@localhost/db")
1099            .max_size(5)
1100            .build()
1101            .unwrap();
1102
1103        assert!(!pool.is_closed());
1104        pool.close();
1105        assert!(pool.is_closed());
1106
1107        // New acquires should fail
1108        let result = pool.acquire();
1109        assert!(result.is_err());
1110        match result {
1111            Err(DriverError::Pool(msg)) => assert!(msg.contains("closed")),
1112            Err(e) => panic!("expected Pool(closed) error, got: {e:?}"),
1113            Ok(_) => panic!("expected error, got Ok"),
1114        }
1115    }
1116
1117    // #67: PoolStatus idle/active counts
1118    #[test]
1119    fn pool_status_initial() {
1120        let pool = PoolBuilder::new()
1121            .url("postgres://user:pass@localhost/db")
1122            .max_size(10)
1123            .build()
1124            .unwrap();
1125
1126        let status = pool.status();
1127        assert_eq!(status.idle, 0);
1128        assert_eq!(status.active, 0);
1129        assert_eq!(status.open, 0);
1130        assert_eq!(status.max_size, 10);
1131    }
1132
1133    // Default pool builder values
1134    #[test]
1135    fn pool_builder_defaults() {
1136        let pool = PoolBuilder::new()
1137            .url("postgres://user:pass@localhost/db")
1138            .build()
1139            .unwrap();
1140
1141        assert_eq!(pool.max_size(), 10);
1142        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(30 * 60)));
1143        assert_eq!(pool.inner.acquire_timeout, None); // fail-fast by default (CREDO #17)
1144        assert_eq!(pool.inner.min_idle, 0);
1145    }
1146
1147    // Pool open_count starts at 0
1148    #[test]
1149    fn pool_open_count_initial() {
1150        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1151        assert_eq!(pool.open_count(), 0);
1152    }
1153
1154    // --- Task 7: max_stmt_cache_size ---
1155
1156    #[test]
1157    fn pool_builder_max_stmt_cache_size_default() {
1158        let pool = PoolBuilder::new()
1159            .url("postgres://user:pass@localhost/db")
1160            .build()
1161            .unwrap();
1162        assert_eq!(pool.inner.max_stmt_cache_size, 256);
1163    }
1164
1165    #[test]
1166    fn pool_builder_max_stmt_cache_size_custom() {
1167        let pool = PoolBuilder::new()
1168            .url("postgres://user:pass@localhost/db")
1169            .max_stmt_cache_size(512)
1170            .build()
1171            .unwrap();
1172        assert_eq!(pool.inner.max_stmt_cache_size, 512);
1173    }
1174
1175    // --- Auto-UDS detection tests ---
1176
1177    #[test]
1178    fn pool_is_uds_false_for_tcp() {
1179        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1180        assert!(!pool.is_uds());
1181    }
1182
1183    #[cfg(unix)]
1184    #[test]
1185    fn pool_is_uds_true_for_unix_socket() {
1186        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1187        assert!(pool.is_uds());
1188    }
1189
1190    #[cfg(unix)]
1191    #[test]
1192    fn pool_is_uds_true_for_var_run_socket() {
1193        let pool = Pool::connect("postgres://user@localhost/db?host=/var/run/postgresql").unwrap();
1194        assert!(pool.is_uds());
1195    }
1196
1197    #[test]
1198    fn pool_is_uds_false_for_ip_address() {
1199        let pool = Pool::connect("postgres://user:pass@127.0.0.1/db").unwrap();
1200        assert!(!pool.is_uds());
1201    }
1202
1203    #[cfg(unix)]
1204    #[test]
1205    fn pool_slot_sync_created_for_uds_config() {
1206        let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1207        assert!(config.host_is_uds());
1208    }
1209
1210    #[test]
1211    fn pool_slot_tcp_config() {
1212        let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1213        assert!(!config.host_is_uds());
1214    }
1215
1216    // ===============================================================
1217    // Pool::is_uds — extended tests
1218    // ===============================================================
1219
1220    #[test]
1221    fn pool_is_uds_false_for_hostname() {
1222        let pool = Pool::connect("postgres://user:pass@db.example.com/db").unwrap();
1223        assert!(!pool.is_uds());
1224    }
1225
1226    #[cfg(unix)]
1227    #[test]
1228    fn pool_is_uds_true_for_tmp() {
1229        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1230        assert!(pool.is_uds());
1231    }
1232
1233    // ===============================================================
1234    // Pool close semantics
1235    // ===============================================================
1236
1237    #[test]
1238    fn pool_close_then_acquire_fails() {
1239        let pool = PoolBuilder::new()
1240            .url("postgres://user:pass@localhost/db")
1241            .max_size(5)
1242            .build()
1243            .unwrap();
1244        pool.close();
1245        let result = pool.acquire();
1246        assert!(result.is_err());
1247        match result {
1248            Err(DriverError::Pool(msg)) => {
1249                assert!(msg.contains("closed"), "should say closed: {msg}")
1250            }
1251            Err(e) => panic!("expected Pool error, got: {e:?}"),
1252            Ok(_) => panic!("expected error"),
1253        }
1254    }
1255
1256    #[test]
1257    fn pool_is_closed_before_and_after() {
1258        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1259        assert!(!pool.is_closed());
1260        pool.close();
1261        assert!(pool.is_closed());
1262    }
1263
1264    // ===============================================================
1265    // Pool exhaustion (fail-fast without timeout)
1266    // ===============================================================
1267
1268    #[test]
1269    fn pool_exhausted_no_timeout() {
1270        let pool = PoolBuilder::new()
1271            .url("postgres://user:pass@localhost/db")
1272            .max_size(0)
1273            .acquire_timeout(None) // fail-fast
1274            .build()
1275            .unwrap();
1276        let result = pool.acquire();
1277        assert!(result.is_err());
1278        match result {
1279            Err(DriverError::Pool(msg)) => {
1280                assert!(msg.contains("exhausted"), "should say exhausted: {msg}")
1281            }
1282            Err(e) => panic!("expected Pool error, got: {e:?}"),
1283            Ok(_) => panic!("expected error"),
1284        }
1285    }
1286
1287    // ===============================================================
1288    // PoolBuilder validation
1289    // ===============================================================
1290
1291    #[test]
1292    fn pool_builder_no_url_error() {
1293        let result = PoolBuilder::new().max_size(5).build();
1294        assert!(result.is_err());
1295        match result {
1296            Err(DriverError::Pool(msg)) => {
1297                assert!(msg.contains("URL"), "should mention URL: {msg}")
1298            }
1299            Err(e) => panic!("expected Pool error, got: {e:?}"),
1300            Ok(_) => panic!("expected error"),
1301        }
1302    }
1303
1304    #[test]
1305    fn pool_builder_invalid_url_error() {
1306        let result = PoolBuilder::new().url("ftp://something").build();
1307        assert!(result.is_err());
1308    }
1309
1310    #[test]
1311    fn pool_builder_stmt_cache_size_zero() {
1312        let pool = PoolBuilder::new()
1313            .url("postgres://user:pass@localhost/db")
1314            .max_stmt_cache_size(0)
1315            .build()
1316            .unwrap();
1317        assert_eq!(pool.inner.max_stmt_cache_size, 0);
1318    }
1319
1320    // ===============================================================
1321    // PoolStatus
1322    // ===============================================================
1323
1324    #[test]
1325    fn pool_status_reflects_max_size() {
1326        let pool = PoolBuilder::new()
1327            .url("postgres://user:pass@localhost/db")
1328            .max_size(20)
1329            .build()
1330            .unwrap();
1331        let status = pool.status();
1332        assert_eq!(status.max_size, 20);
1333        assert_eq!(status.idle, 0);
1334        assert_eq!(status.active, 0);
1335        assert_eq!(status.open, 0);
1336    }
1337
1338    // ===============================================================
1339    // Pool clone
1340    // ===============================================================
1341
1342    #[test]
1343    fn pool_clone_shares_config() {
1344        let pool = PoolBuilder::new()
1345            .url("postgres://user:pass@localhost/db")
1346            .max_size(7)
1347            .build()
1348            .unwrap();
1349        let p2 = pool.clone();
1350        assert_eq!(pool.max_size(), 7);
1351        assert_eq!(p2.max_size(), 7);
1352        assert_eq!(pool.open_count(), p2.open_count());
1353    }
1354
1355    // ===============================================================
1356    // set_warmup_sqls
1357    // ===============================================================
1358
1359    #[test]
1360    fn pool_set_warmup_sqls_empty() {
1361        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1362        pool.set_warmup_sqls(&[]);
1363        let sqls = pool
1364            .inner
1365            .warmup_sqls
1366            .lock()
1367            .unwrap_or_else(|e| e.into_inner())
1368            .clone();
1369        assert!(sqls.is_empty());
1370    }
1371
1372    #[test]
1373    fn pool_set_warmup_sqls_multiple() {
1374        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1375        pool.set_warmup_sqls(&["SELECT 1", "SELECT 2", "SELECT 3"]);
1376        let sqls = pool
1377            .inner
1378            .warmup_sqls
1379            .lock()
1380            .unwrap_or_else(|e| e.into_inner())
1381            .clone();
1382        assert_eq!(sqls.len(), 3);
1383        assert_eq!(&*sqls[0], "SELECT 1");
1384        assert_eq!(&*sqls[1], "SELECT 2");
1385        assert_eq!(&*sqls[2], "SELECT 3");
1386    }
1387
1388    #[test]
1389    fn pool_set_warmup_sqls_overwrite() {
1390        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1391        pool.set_warmup_sqls(&["SELECT 1"]);
1392        pool.set_warmup_sqls(&["SELECT 99"]);
1393        let sqls = pool
1394            .inner
1395            .warmup_sqls
1396            .lock()
1397            .unwrap_or_else(|e| e.into_inner())
1398            .clone();
1399        assert_eq!(sqls.len(), 1);
1400        assert_eq!(&*sqls[0], "SELECT 99");
1401    }
1402
1403    // ===============================================================
1404    // PoolStatus Debug
1405    // ===============================================================
1406
1407    #[test]
1408    fn pool_status_debug() {
1409        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1410        let status = pool.status();
1411        let dbg = format!("{status:?}");
1412        assert!(dbg.contains("PoolStatus"));
1413        assert!(dbg.contains("idle"));
1414        assert!(dbg.contains("active"));
1415        assert!(dbg.contains("open"));
1416        assert!(dbg.contains("max_size"));
1417    }
1418
1419    // ===============================================================
1420    // Config host_is_uds via pool (structural tests)
1421    // ===============================================================
1422
1423    #[test]
1424    fn config_host_is_uds_returns_true_for_slash() {
1425        let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1426        assert!(config.host_is_uds());
1427    }
1428
1429    #[test]
1430    fn config_host_is_uds_returns_false_for_tcp() {
1431        let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1432        assert!(!config.host_is_uds());
1433    }
1434
1435    #[test]
1436    fn config_host_is_uds_returns_false_for_ip() {
1437        let config = Config::from_url("postgres://user:pass@192.168.1.1/db").unwrap();
1438        assert!(!config.host_is_uds());
1439    }
1440
1441    // ===============================================================
1442    // PoolBuilder chaining
1443    // ===============================================================
1444
1445    #[test]
1446    fn pool_builder_full_chain() {
1447        let pool = PoolBuilder::new()
1448            .url("postgres://user:pass@localhost/db")
1449            .max_size(3)
1450            .max_lifetime(Some(Duration::from_secs(600)))
1451            .acquire_timeout(Some(Duration::from_secs(5)))
1452            .min_idle(1)
1453            .max_stmt_cache_size(128)
1454            .build()
1455            .unwrap();
1456        assert_eq!(pool.max_size(), 3);
1457        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(600)));
1458        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(5)));
1459        assert_eq!(pool.inner.min_idle, 1);
1460        assert_eq!(pool.inner.max_stmt_cache_size, 128);
1461    }
1462
1463    // --- Audit: PoolGuard drop discards connections in bad state ---
1464
1465    #[test]
1466    fn pool_max_size_zero_rejects_all_acquires() {
1467        let pool = PoolBuilder::new()
1468            .url("postgres://user:pass@localhost/db")
1469            .max_size(0)
1470            .build()
1471            .unwrap();
1472        let result = pool.acquire();
1473        assert!(result.is_err());
1474        match &result {
1475            Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1476            _ => panic!("expected pool exhausted error"),
1477        }
1478    }
1479
1480    // --- Audit: URL parsing edge cases ---
1481
1482    #[test]
1483    fn url_parse_unknown_sslmode_returns_error() {
1484        let result = Config::from_url("postgres://u:p@h/d?sslmode=bogus");
1485        assert!(result.is_err());
1486        let msg = format!("{}", result.unwrap_err());
1487        assert!(msg.contains("unknown sslmode"));
1488    }
1489
1490    #[test]
1491    fn url_parse_invalid_port_returns_error() {
1492        let result = Config::from_url("postgres://u:p@h:abc/d");
1493        assert!(result.is_err());
1494        let msg = format!("{}", result.unwrap_err());
1495        assert!(msg.contains("invalid port"));
1496    }
1497
1498    #[test]
1499    fn url_parse_missing_at_sign_returns_error() {
1500        let result = Config::from_url("postgres://u:plocalhost/d");
1501        assert!(result.is_err());
1502        let msg = format!("{}", result.unwrap_err());
1503        assert!(msg.contains("missing @"));
1504    }
1505
1506    #[test]
1507    fn url_parse_empty_host_returns_error() {
1508        let result = Config::from_url("postgres://u:p@/d");
1509        assert!(result.is_err());
1510    }
1511
1512    #[test]
1513    fn url_parse_empty_user_returns_error() {
1514        let result = Config::from_url("postgres://:p@h/d");
1515        assert!(result.is_err());
1516    }
1517
1518    #[test]
1519    fn url_parse_statement_timeout_invalid_uses_default() {
1520        let config = Config::from_url("postgres://u:p@h/d?statement_timeout=notnum").unwrap();
1521        assert_eq!(config.statement_timeout_secs, 30);
1522    }
1523
1524    #[test]
1525    fn url_parse_malformed_percent_encoding() {
1526        let result = Config::from_url("postgres://u%:p@h/d");
1527        assert!(result.is_err());
1528    }
1529
1530    #[test]
1531    fn url_parse_invalid_hex_in_percent_encoding() {
1532        let result = Config::from_url("postgres://u%ZZ:p@h/d");
1533        assert!(result.is_err());
1534    }
1535}