Skip to main content

bsql_driver_postgres/
pool.rs

1//! Connection pool — LIFO ordering, fail-fast acquire, Condvar-based waiting.
2//!
3//! The pool maintains a stack of idle connections. `acquire()` pops the top
4//! (most recently used = warmest caches). On drop, the guard pushes the
5//! connection back. If the pool is exhausted and no `acquire_timeout` is set,
6//! `acquire()` returns an error immediately — no blocking, no waiting.
7//!
8//! When `acquire_timeout` is set, blocked callers wait on a `Condvar` and are
9//! woken when a connection is returned to the pool.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::time::Duration;
14
15use crate::DriverError;
16use crate::arena::Arena;
17use crate::codec::Encode;
18use crate::conn::Connection;
19use crate::types::{Config, PgDataRow, QueryResult, SimpleRow};
20
21// --- Pool ---
22
23/// A connection pool with LIFO ordering and fail-fast semantics.
24///
25/// # Example
26///
27/// ```no_run
28/// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
29/// let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
30/// let mut conn = pool.acquire()?;
31/// conn.simple_query("SELECT 1")?;
32/// // conn is returned to pool on drop
33/// # Ok(())
34/// # }
35/// ```
36pub struct Pool {
37    inner: Arc<PoolInner>,
38}
39
40struct PoolInner {
41    /// Idle connections. Uses std::sync::Mutex because the critical section is
42    /// trivial (push/pop — no I/O). This lets PoolGuard::Drop return connections
43    /// synchronously.
44    stack: std::sync::Mutex<Vec<Connection>>,
45    max_size: usize,
46    open_count: AtomicUsize,
47    config: Arc<Config>,
48    /// When true, no new acquires are accepted.
49    closed: AtomicBool,
50    /// Condvar pair for release notification. Waiters block on the Condvar
51    /// when the pool is exhausted and `acquire_timeout` is set.
52    release_pair: (std::sync::Mutex<()>, std::sync::Condvar),
53    /// Maximum lifetime of a connection. Connections older than this
54    /// are discarded when popped from the pool. Default: 30 minutes.
55    max_lifetime: Option<Duration>,
56    /// Maximum time to wait for a connection. Default: None (fail-fast).
57    acquire_timeout: Option<Duration>,
58    /// Minimum number of idle connections to maintain. Default: 0.
59    min_idle: usize,
60    /// SQL statements to PREPARE on new connections (warmup).
61    warmup_sqls: std::sync::Mutex<Arc<Vec<Box<str>>>>,
62    /// Maximum number of cached prepared statements per connection.
63    max_stmt_cache_size: usize,
64}
65
66impl Pool {
67    /// Create a pool from a connection URL with default settings (max_size = 10).
68    ///
69    /// Validates the URL but does not open any connections yet (lazy initialization).
70    pub fn connect(url: &str) -> Result<Self, DriverError> {
71        PoolBuilder::new().url(url).build()
72    }
73
74    /// Create a pool builder for custom configuration.
75    pub fn builder() -> PoolBuilder {
76        PoolBuilder::new()
77    }
78
79    /// Acquire a connection from the pool.
80    ///
81    /// Returns immediately with the most recently used idle connection (LIFO).
82    /// If no idle connections are available and the pool is below max_size, a new
83    /// connection is created. If the pool is at max_size and no `acquire_timeout`
84    /// is set, returns `DriverError::Pool` immediately. If `acquire_timeout` is
85    /// set, blocks until a connection is returned or the timeout expires.
86    #[inline]
87    pub fn acquire(&self) -> Result<PoolGuard, DriverError> {
88        if self.inner.closed.load(Ordering::Acquire) {
89            return Err(DriverError::Pool("pool is closed".into()));
90        }
91
92        // Try to pop an idle connection (fast path).
93        if let Some(guard) = self.try_pop_idle()? {
94            return Ok(guard);
95        }
96
97        // No idle connections — try to claim a slot with a proper CAS loop.
98        loop {
99            let current = self.inner.open_count.load(Ordering::Acquire);
100            if current >= self.inner.max_size {
101                if let Some(timeout) = self.inner.acquire_timeout {
102                    let (lock, cvar) = &self.inner.release_pair;
103                    let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
104                    let (_guard, result) = cvar
105                        .wait_timeout(guard, timeout)
106                        .unwrap_or_else(|e| e.into_inner());
107                    if result.timed_out() {
108                        return Err(DriverError::Pool(
109                            "pool exhausted: acquire timeout expired".into(),
110                        ));
111                    }
112                    // A connection was returned — try again
113                    if let Some(guard) = self.try_pop_idle()? {
114                        return Ok(guard);
115                    }
116                    // Popped nothing — retry CAS
117                    continue;
118                }
119                return Err(DriverError::Pool(
120                    "pool exhausted: all connections in use".into(),
121                ));
122            }
123            if self
124                .inner
125                .open_count
126                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
127                .is_ok()
128            {
129                break;
130            }
131            // CAS failed — another thread incremented. Retry.
132        }
133
134        // Open a new connection
135        let conn_result = Connection::connect_arc(self.inner.config.clone());
136        match conn_result {
137            Ok(mut conn) => {
138                // Configure statement cache size
139                conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
140                // Warmup: pre-PREPARE frequently used statements
141                self.warmup_conn(&mut conn);
142
143                Ok(PoolGuard {
144                    conn: Some(conn),
145                    pool: self.inner.clone(),
146                    discard: false,
147                })
148            }
149            Err(e) => {
150                // Give back the slot
151                self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
152                Err(e)
153            }
154        }
155    }
156
157    /// Try to pop a valid idle connection from the stack.
158    #[inline]
159    fn try_pop_idle(&self) -> Result<Option<PoolGuard>, DriverError> {
160        let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
161        while let Some(conn) = stack.pop() {
162            if let Some(max_lifetime) = self.inner.max_lifetime {
163                if conn.created_at().elapsed() >= max_lifetime {
164                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
165                    continue;
166                }
167            }
168            if conn.idle_duration() < Duration::from_secs(30) {
169                return Ok(Some(PoolGuard {
170                    conn: Some(conn),
171                    pool: self.inner.clone(),
172                    discard: false,
173                }));
174            }
175            // Stale connection — drop it, free the slot
176            self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
177        }
178        Ok(None)
179    }
180
181    /// Whether this pool uses UDS connections.
182    ///
183    /// Returns `true` when the pool URL points to a Unix domain socket.
184    /// On non-Unix platforms, always returns `false`.
185    pub fn is_uds(&self) -> bool {
186        #[cfg(unix)]
187        {
188            self.inner.config.host_is_uds()
189        }
190        #[cfg(not(unix))]
191        {
192            false
193        }
194    }
195
196    /// Begin a transaction. Acquires a connection and sends BEGIN.
197    pub fn begin(&self) -> Result<Transaction, DriverError> {
198        let mut guard = self.acquire()?;
199        guard.simple_query("BEGIN")?;
200        Ok(Transaction {
201            guard,
202            committed: false,
203            deferred_buf: Vec::new(),
204            deferred_count: 0,
205        })
206    }
207
208    /// Current number of open connections (idle + in-use).
209    pub fn open_count(&self) -> usize {
210        self.inner.open_count.load(Ordering::Relaxed)
211    }
212
213    /// Maximum pool size.
214    pub fn max_size(&self) -> usize {
215        self.inner.max_size
216    }
217
218    /// Pool status metrics.
219    pub fn status(&self) -> PoolStatus {
220        let idle = self
221            .inner
222            .stack
223            .lock()
224            .unwrap_or_else(|e| e.into_inner())
225            .len();
226        let open = self.inner.open_count.load(Ordering::Relaxed);
227        let active = open.saturating_sub(idle);
228        PoolStatus {
229            idle,
230            active,
231            open,
232            max_size: self.inner.max_size,
233        }
234    }
235
236    /// Pre-PREPARE warmup statements on a new connection.
237    ///
238    /// Uses `prepare_only()` which sends Parse+Describe+Sync without
239    /// Bind+Execute — no query execution, only statement caching.
240    ///
241    /// Best-effort: errors on individual statements are silently ignored.
242    /// The connection remains usable even if warmup fails.
243    fn warmup_conn(&self, conn: &mut Connection) {
244        let sqls = self
245            .inner
246            .warmup_sqls
247            .lock()
248            .unwrap_or_else(|e| e.into_inner())
249            .clone();
250
251        if sqls.is_empty() {
252            return;
253        }
254
255        for sql in sqls.iter() {
256            let sql_hash = crate::types::hash_sql(sql);
257            let _ = conn.prepare_only(sql, sql_hash);
258        }
259    }
260
261    /// Set the SQL statements to pre-PREPARE on new connections.
262    ///
263    /// Each SQL string is PREPAREd (Parse+Describe+Sync) on new connections
264    /// before they are returned from `acquire()`. This eliminates the first-use
265    /// Parse overhead for frequently executed queries.
266    ///
267    /// Warmup errors are silently ignored — a bad warmup SQL must not prevent
268    /// the connection from being usable.
269    ///
270    /// # Example
271    ///
272    /// ```no_run
273    /// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
274    /// let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
275    /// pool.set_warmup_sqls(&[
276    ///     "SELECT id, name FROM users WHERE id = $1::int4",
277    ///     "SELECT id, title FROM tickets WHERE status = ANY($1::text[])",
278    /// ]);
279    /// # Ok(())
280    /// # }
281    /// ```
282    pub fn set_warmup_sqls(&self, sqls: &[&str]) {
283        let boxed: Arc<Vec<Box<str>>> =
284            Arc::new(sqls.iter().map(|s| (*s).into()).collect::<Vec<_>>());
285        *self
286            .inner
287            .warmup_sqls
288            .lock()
289            .unwrap_or_else(|e| e.into_inner()) = boxed;
290    }
291
292    /// Close the pool. No new acquires are accepted. All idle connections
293    /// are sent Terminate and dropped.
294    pub fn close(&self) {
295        self.inner.closed.store(true, Ordering::Release);
296        // Drain and close all idle connections
297        let conns: Vec<Connection> = {
298            let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
299            std::mem::take(&mut *stack)
300        };
301        for conn in conns {
302            self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
303            let _ = conn.close();
304        }
305        // Notify any waiters so they get the "pool is closed" error
306        let (_, cvar) = &self.inner.release_pair;
307        cvar.notify_all();
308    }
309
310    /// Whether the pool has been closed.
311    pub fn is_closed(&self) -> bool {
312        self.inner.closed.load(Ordering::Acquire)
313    }
314}
315
316impl Clone for Pool {
317    fn clone(&self) -> Self {
318        Pool {
319            inner: self.inner.clone(),
320        }
321    }
322}
323
324// --- PoolStatus ---
325
326/// Pool status metrics.
327#[derive(Debug, Clone, Copy)]
328pub struct PoolStatus {
329    /// Number of idle connections in the pool.
330    pub idle: usize,
331    /// Number of connections currently in use.
332    pub active: usize,
333    /// Total open connections (idle + active).
334    pub open: usize,
335    /// Maximum pool size.
336    pub max_size: usize,
337}
338
339// --- PoolBuilder ---
340
341/// Builder for configuring a connection pool.
342pub struct PoolBuilder {
343    url: Option<String>,
344    max_size: usize,
345    /// Maximum lifetime of a connection.
346    max_lifetime: Option<Duration>,
347    /// Maximum time to wait for a connection when pool is exhausted.
348    acquire_timeout: Option<Duration>,
349    /// Minimum number of idle connections to maintain.
350    min_idle: usize,
351    /// Maximum number of cached prepared statements per connection.
352    max_stmt_cache_size: usize,
353}
354
355impl PoolBuilder {
356    fn new() -> Self {
357        Self {
358            url: None,
359            max_size: 10,
360            max_lifetime: Some(Duration::from_secs(30 * 60)), // 30 min default
361            acquire_timeout: None,                            // fail-fast by default (CREDO #17)
362            min_idle: 0,                                      // no minimum by default
363            max_stmt_cache_size: 256,                         // LRU eviction at 256 stmts
364        }
365    }
366
367    /// Set the connection URL.
368    pub fn url(mut self, url: &str) -> Self {
369        self.url = Some(url.to_owned());
370        self
371    }
372
373    /// Set the maximum pool size. Default: 10.
374    ///
375    /// A max_size of 0 means the pool will reject all acquire() calls immediately.
376    pub fn max_size(mut self, size: usize) -> Self {
377        self.max_size = size;
378        self
379    }
380
381    /// Set the maximum lifetime of a connection. Default: 30 minutes.
382    /// Set to None for unlimited lifetime.
383    pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
384        self.max_lifetime = lifetime;
385        self
386    }
387
388    /// Set the acquire timeout. Default: None (fail-fast, per CREDO #17).
389    /// Set to a Duration to enable waiting when the pool is exhausted.
390    pub fn acquire_timeout(mut self, timeout: Option<Duration>) -> Self {
391        self.acquire_timeout = timeout;
392        self
393    }
394
395    /// Set the minimum number of idle connections. Default: 0.
396    /// When > 0, a background thread maintains this many idle connections.
397    pub fn min_idle(mut self, count: usize) -> Self {
398        self.min_idle = count;
399        self
400    }
401
402    /// Set the maximum number of cached prepared statements per connection.
403    /// Default: 256. When the cache exceeds this size, the least recently
404    /// used statement is evicted (Close sent to PG to free server memory).
405    pub fn max_stmt_cache_size(mut self, size: usize) -> Self {
406        self.max_stmt_cache_size = size;
407        self
408    }
409
410    /// Build the pool. Validates the URL but does not open connections.
411    pub fn build(self) -> Result<Pool, DriverError> {
412        let url = self
413            .url
414            .ok_or_else(|| DriverError::Pool("pool builder requires a URL".into()))?;
415
416        let config = Arc::new(Config::from_url(&url)?);
417
418        let pool = Pool {
419            inner: Arc::new(PoolInner {
420                stack: std::sync::Mutex::new(Vec::with_capacity(self.max_size)),
421                max_size: self.max_size,
422                open_count: AtomicUsize::new(0),
423                config,
424                closed: AtomicBool::new(false),
425                release_pair: (std::sync::Mutex::new(()), std::sync::Condvar::new()),
426                max_lifetime: self.max_lifetime,
427                acquire_timeout: self.acquire_timeout,
428                min_idle: self.min_idle,
429                warmup_sqls: std::sync::Mutex::new(Arc::new(Vec::new())),
430                max_stmt_cache_size: self.max_stmt_cache_size,
431            }),
432        };
433
434        if self.min_idle > 0 {
435            let inner = pool.inner.clone();
436            std::thread::spawn(move || {
437                maintain_min_idle(inner);
438            });
439        }
440
441        Ok(pool)
442    }
443}
444
445/// Background thread that maintains min_idle connections.
446fn maintain_min_idle(inner: Arc<PoolInner>) {
447    loop {
448        if inner.closed.load(Ordering::Acquire) {
449            return;
450        }
451
452        let idle_count = inner.stack.lock().unwrap_or_else(|e| e.into_inner()).len();
453        let needed = inner.min_idle.saturating_sub(idle_count);
454
455        for _ in 0..needed {
456            if inner.closed.load(Ordering::Acquire) {
457                return;
458            }
459            let current = inner.open_count.load(Ordering::Acquire);
460            if current >= inner.max_size {
461                break;
462            }
463            if inner
464                .open_count
465                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
466                .is_err()
467            {
468                continue;
469            }
470
471            match Connection::connect_arc(inner.config.clone()) {
472                Ok(conn) => {
473                    let mut stack = inner.stack.lock().unwrap_or_else(|e| e.into_inner());
474                    stack.push(conn);
475                    let (_, cvar) = &inner.release_pair;
476                    cvar.notify_one();
477                }
478                Err(_) => {
479                    inner.open_count.fetch_sub(1, Ordering::AcqRel);
480                }
481            }
482        }
483
484        // Check every 5 seconds
485        std::thread::sleep(Duration::from_secs(5));
486    }
487}
488
489// --- PoolGuard ---
490
491/// A borrowed connection from the pool. Returns to the pool on drop.
492///
493/// If the connection is in a failed transaction state, broken, or marked for
494/// discard, it is dropped (decrements open_count) instead of returned.
495pub struct PoolGuard {
496    conn: Option<Connection>,
497    pool: Arc<PoolInner>,
498    /// When true, the connection is dropped instead of returned to the pool.
499    discard: bool,
500}
501
502impl PoolGuard {
503    /// Mark this connection for discard — it will NOT be returned to the pool
504    /// on drop. The open_count is decremented and the TCP connection is closed.
505    pub fn mark_discard(&mut self) {
506        self.discard = true;
507    }
508
509    /// Cancel the currently running query on the underlying connection.
510    ///
511    /// Opens a new TCP connection and sends a CancelRequest to PG.
512    /// The cancel connection is closed immediately after.
513    pub fn cancel(&self) -> Result<(), DriverError> {
514        let conn = self
515            .conn
516            .as_ref()
517            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?;
518        conn.cancel()
519    }
520
521    // --- Introspection dispatch methods ---
522
523    /// Get the backend process ID for this connection.
524    pub fn pid(&self) -> i32 {
525        self.conn.as_ref().expect("connection taken").pid()
526    }
527
528    /// Whether the connection is idle (not in a transaction).
529    pub fn is_idle(&self) -> bool {
530        self.conn.as_ref().expect("connection taken").is_idle()
531    }
532
533    /// Whether the connection is inside a transaction.
534    pub fn is_in_transaction(&self) -> bool {
535        self.conn
536            .as_ref()
537            .expect("connection taken")
538            .is_in_transaction()
539    }
540
541    // --- Query dispatch methods ---
542
543    /// Execute a prepared query and return rows.
544    #[inline]
545    pub fn query(
546        &mut self,
547        sql: &str,
548        sql_hash: u64,
549        params: &[&(dyn Encode + Sync)],
550    ) -> Result<QueryResult, DriverError> {
551        self.conn
552            .as_mut()
553            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
554            .query(sql, sql_hash, params)
555    }
556
557    /// Execute a query without result rows (INSERT/UPDATE/DELETE).
558    #[inline]
559    pub fn execute(
560        &mut self,
561        sql: &str,
562        sql_hash: u64,
563        params: &[&(dyn Encode + Sync)],
564    ) -> Result<u64, DriverError> {
565        self.conn
566            .as_mut()
567            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
568            .execute(sql, sql_hash, params)
569    }
570
571    /// Execute the same statement N times with different params in one pipeline.
572    ///
573    /// Sends all N Bind+Execute messages + one Sync. One round-trip for N operations.
574    /// Returns the affected row count for each parameter set.
575    pub fn execute_pipeline(
576        &mut self,
577        sql: &str,
578        sql_hash: u64,
579        param_sets: &[&[&(dyn Encode + Sync)]],
580    ) -> Result<Vec<u64>, DriverError> {
581        self.conn
582            .as_mut()
583            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
584            .execute_pipeline(sql, sql_hash, param_sets)
585    }
586
587    /// Execute a simple (unprepared) query.
588    pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
589        self.conn
590            .as_mut()
591            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
592            .simple_query(sql)
593    }
594
595    /// Execute a simple query and return rows as text.
596    ///
597    /// Uses PostgreSQL's simple query protocol — all values are strings.
598    pub fn simple_query_rows(&mut self, sql: &str) -> Result<Vec<SimpleRow>, DriverError> {
599        self.conn
600            .as_mut()
601            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
602            .simple_query_rows(sql)
603    }
604
605    /// Process each row via a closure with zero-copy `PgDataRow`.
606    pub fn for_each<F>(
607        &mut self,
608        sql: &str,
609        sql_hash: u64,
610        params: &[&(dyn Encode + Sync)],
611        f: F,
612    ) -> Result<(), DriverError>
613    where
614        F: FnMut(PgDataRow<'_>) -> Result<(), DriverError>,
615    {
616        self.conn
617            .as_mut()
618            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
619            .for_each(sql, sql_hash, params, f)
620    }
621
622    /// Process each DataRow as raw bytes — fastest path.
623    pub fn for_each_raw<F>(
624        &mut self,
625        sql: &str,
626        sql_hash: u64,
627        params: &[&(dyn Encode + Sync)],
628        f: F,
629    ) -> Result<(), DriverError>
630    where
631        F: FnMut(&[u8]) -> Result<(), DriverError>,
632    {
633        self.conn
634            .as_mut()
635            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
636            .for_each_raw(sql, sql_hash, params, f)
637    }
638
639    // --- Streaming ---
640
641    /// Start a streaming query.
642    pub fn query_streaming_start(
643        &mut self,
644        sql: &str,
645        sql_hash: u64,
646        params: &[&(dyn Encode + Sync)],
647        chunk_size: i32,
648    ) -> Result<(std::sync::Arc<[crate::types::ColumnDesc]>, bool), DriverError> {
649        self.conn
650            .as_mut()
651            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
652            .query_streaming_start(sql, sql_hash, params, chunk_size)
653    }
654
655    /// Send Execute+Flush for a streaming query (2nd+ chunks).
656    pub fn streaming_send_execute(&mut self, chunk_size: i32) -> Result<(), DriverError> {
657        self.conn
658            .as_mut()
659            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
660            .streaming_send_execute(chunk_size)
661    }
662
663    /// Read the next chunk of rows from an in-progress streaming query.
664    pub fn streaming_next_chunk(
665        &mut self,
666        arena: &mut Arena,
667        all_col_offsets: &mut Vec<(usize, i32)>,
668    ) -> Result<bool, DriverError> {
669        self.conn
670            .as_mut()
671            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
672            .streaming_next_chunk(arena, all_col_offsets)
673    }
674
675    // --- COPY protocol ---
676
677    /// Bulk copy data INTO a table from an iterator of text rows.
678    ///
679    /// Each row is a tab-separated string (TSV format). Returns the row count.
680    pub fn copy_in<'a, I>(
681        &mut self,
682        table: &str,
683        columns: &[&str],
684        rows: I,
685    ) -> Result<u64, DriverError>
686    where
687        I: IntoIterator<Item = &'a str>,
688    {
689        self.conn
690            .as_mut()
691            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
692            .copy_in(table, columns, rows)
693    }
694
695    /// Bulk copy data OUT of a table/query to a writer.
696    ///
697    /// Writes TSV-formatted rows. Returns the row count.
698    pub fn copy_out<W: std::io::Write>(
699        &mut self,
700        query: &str,
701        writer: &mut W,
702    ) -> Result<u64, DriverError> {
703        self.conn
704            .as_mut()
705            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
706            .copy_out(query, writer)
707    }
708
709    /// Whether this guard holds a sync connection (always true now).
710    pub fn is_sync(&self) -> bool {
711        true
712    }
713
714    // --- Deferred pipeline support ---
715
716    /// Ensure a statement is prepared and cached.
717    pub(crate) fn ensure_stmt_prepared(
718        &mut self,
719        sql: &str,
720        sql_hash: u64,
721        params: &[&(dyn Encode + Sync)],
722    ) -> Result<[u8; 18], DriverError> {
723        self.conn
724            .as_mut()
725            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
726            .ensure_stmt_prepared(sql, sql_hash, params)
727    }
728
729    /// Write Bind+Execute bytes for a prepared statement into an external buffer.
730    pub(crate) fn write_deferred_bind_execute(
731        &self,
732        sql_hash: u64,
733        params: &[&(dyn Encode + Sync)],
734        buf: &mut Vec<u8>,
735    ) {
736        let conn = self.conn.as_ref().expect("connection taken");
737        conn.write_deferred_bind_execute(sql_hash, params, buf);
738    }
739
740    /// Flush a buffer of deferred Bind+Execute messages as a single pipeline.
741    pub(crate) fn flush_deferred_pipeline(
742        &mut self,
743        buf: &mut Vec<u8>,
744        count: usize,
745    ) -> Result<Vec<u64>, DriverError> {
746        self.conn
747            .as_mut()
748            .ok_or_else(|| DriverError::Pool("connection already taken".into()))?
749            .flush_deferred_pipeline(buf, count)
750    }
751}
752
753impl Drop for PoolGuard {
754    fn drop(&mut self) {
755        if let Some(mut conn) = self.conn.take() {
756            // Discard if:
757            //   - explicitly marked for discard
758            //   - in a failed transaction (tx_status == 'E')
759            //   - in an active transaction (tx_status == 'T') — uncommitted tx
760            //   - streaming query in progress — connection in indeterminate state
761            //   - pool is closed
762            if self.discard
763                || conn.is_in_failed_transaction()
764                || conn.is_in_transaction()
765                || conn.is_streaming()
766                || self.pool.closed.load(Ordering::Acquire)
767            {
768                self.pool.open_count.fetch_sub(1, Ordering::AcqRel);
769                return;
770            }
771
772            // Stamp last-used time for idle connection tracking.
773            // Amortized: only call Instant::now() every 64 returns.
774            // Max error: 64 queries × ~15us = ~1ms on a 30s idle threshold.
775            if conn.query_counter() & 63 == 0 {
776                conn.touch();
777            }
778
779            // Return to pool
780            {
781                let mut stack = self.pool.stack.lock().unwrap_or_else(|e| e.into_inner());
782                stack.push(conn);
783            }
784
785            // Notify waiters only if pool was exhausted (someone might be waiting).
786            // Skip notify when pool has spare capacity — saves ~20ns per release.
787            if self.pool.open_count.load(Ordering::Relaxed) >= self.pool.max_size {
788                let (_, cvar) = &self.pool.release_pair;
789                cvar.notify_one();
790            }
791        }
792    }
793}
794
795// --- Transaction ---
796
797/// A database transaction. Sends ROLLBACK on drop if not committed.
798///
799/// # Example
800///
801/// ```no_run
802/// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
803/// # let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
804/// let mut tx = pool.begin()?;
805/// tx.simple_query("INSERT INTO t VALUES (1)")?;
806/// tx.commit()?;
807/// # Ok(())
808/// # }
809/// ```
810pub struct Transaction {
811    guard: PoolGuard,
812    committed: bool,
813    /// Accumulated Bind+Execute message bytes for deferred operations.
814    deferred_buf: Vec<u8>,
815    /// Number of deferred operations buffered.
816    deferred_count: usize,
817}
818
819impl Transaction {
820    /// Commit the transaction.
821    ///
822    /// Automatically flushes any deferred operations before committing.
823    pub fn commit(mut self) -> Result<(), DriverError> {
824        if self.deferred_count > 0 {
825            self.flush_deferred()?;
826        }
827        self.guard.simple_query("COMMIT")?;
828        self.committed = true;
829        Ok(())
830    }
831
832    /// Rollback the transaction explicitly.
833    ///
834    /// Discards any deferred operations without sending them.
835    pub fn rollback(mut self) -> Result<(), DriverError> {
836        self.deferred_buf.clear();
837        self.deferred_count = 0;
838        self.guard.simple_query("ROLLBACK")?;
839        self.committed = true; // prevent double rollback in drop
840        Ok(())
841    }
842
843    /// Execute a prepared query within the transaction.
844    ///
845    /// Automatically flushes any deferred operations before executing the query,
846    /// ensuring read-your-writes consistency.
847    pub fn query(
848        &mut self,
849        sql: &str,
850        sql_hash: u64,
851        params: &[&(dyn Encode + Sync)],
852    ) -> Result<QueryResult, DriverError> {
853        if self.deferred_count > 0 {
854            self.flush_deferred()?;
855        }
856        self.guard.query(sql, sql_hash, params)
857    }
858
859    /// Execute without result rows within the transaction.
860    pub fn execute(
861        &mut self,
862        sql: &str,
863        sql_hash: u64,
864        params: &[&(dyn Encode + Sync)],
865    ) -> Result<u64, DriverError> {
866        self.guard.execute(sql, sql_hash, params)
867    }
868
869    /// Execute the same statement N times with different params in one pipeline.
870    pub fn execute_pipeline(
871        &mut self,
872        sql: &str,
873        sql_hash: u64,
874        param_sets: &[&[&(dyn Encode + Sync)]],
875    ) -> Result<Vec<u64>, DriverError> {
876        self.guard.execute_pipeline(sql, sql_hash, param_sets)
877    }
878
879    /// Process each row directly from the wire buffer within a transaction.
880    ///
881    /// Automatically flushes any deferred operations first.
882    pub fn for_each<F>(
883        &mut self,
884        sql: &str,
885        sql_hash: u64,
886        params: &[&(dyn Encode + Sync)],
887        f: F,
888    ) -> Result<(), DriverError>
889    where
890        F: FnMut(crate::types::PgDataRow<'_>) -> Result<(), DriverError>,
891    {
892        if self.deferred_count > 0 {
893            self.flush_deferred()?;
894        }
895        self.guard.for_each(sql, sql_hash, params, f)
896    }
897
898    /// Process each DataRow as raw bytes within a transaction.
899    ///
900    /// Automatically flushes any deferred operations first.
901    pub fn for_each_raw<F>(
902        &mut self,
903        sql: &str,
904        sql_hash: u64,
905        params: &[&(dyn Encode + Sync)],
906        f: F,
907    ) -> Result<(), DriverError>
908    where
909        F: FnMut(&[u8]) -> Result<(), DriverError>,
910    {
911        if self.deferred_count > 0 {
912            self.flush_deferred()?;
913        }
914        self.guard.for_each_raw(sql, sql_hash, params, f)
915    }
916
917    /// Simple query within the transaction.
918    ///
919    /// Automatically flushes any deferred operations first.
920    pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
921        if self.deferred_count > 0 {
922            self.flush_deferred()?;
923        }
924        self.guard.simple_query(sql)
925    }
926
927    // --- Deferred pipeline API ---
928
929    /// Buffer an execute for deferred pipeline flush.
930    ///
931    /// The operation is not sent to the server immediately. Instead, the
932    /// Bind+Execute message bytes are buffered internally. The buffered
933    /// operations are sent as a single pipeline on [`commit()`](Self::commit)
934    /// or [`flush_deferred()`](Self::flush_deferred).
935    ///
936    /// # Example
937    ///
938    /// ```no_run
939    /// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
940    /// # let pool = bsql_driver_postgres::Pool::connect("postgres://u:p@localhost/db")?;
941    /// let mut tx = pool.begin()?;
942    /// let sql = "INSERT INTO t (v) VALUES ($1)";
943    /// let hash = bsql_driver_postgres::hash_sql(sql);
944    ///
945    /// // These are buffered, not sent:
946    /// tx.defer_execute(sql, hash, &[&1i32])?;
947    /// tx.defer_execute(sql, hash, &[&2i32])?;
948    /// tx.defer_execute(sql, hash, &[&3i32])?;
949    ///
950    /// // commit() flushes all 3 as one pipeline + COMMIT = 2 round-trips total
951    /// tx.commit()?;
952    /// # Ok(())
953    /// # }
954    /// ```
955    pub fn defer_execute(
956        &mut self,
957        sql: &str,
958        sql_hash: u64,
959        params: &[&(dyn Encode + Sync)],
960    ) -> Result<(), DriverError> {
961        if params.len() > i16::MAX as usize {
962            return Err(DriverError::Protocol(format!(
963                "parameter count {} exceeds maximum {}",
964                params.len(),
965                i16::MAX
966            )));
967        }
968
969        // Ensure statement is prepared (may require one round-trip on first call)
970        self.guard.ensure_stmt_prepared(sql, sql_hash, params)?;
971
972        // Buffer the Bind+Execute bytes — no I/O
973        self.guard
974            .write_deferred_bind_execute(sql_hash, params, &mut self.deferred_buf);
975        self.deferred_count += 1;
976        Ok(())
977    }
978
979    /// Flush all deferred operations as a single pipeline.
980    ///
981    /// Sends all buffered Bind+Execute messages + one Sync in a single TCP write.
982    /// Returns the affected row count for each deferred operation.
983    pub fn flush_deferred(&mut self) -> Result<Vec<u64>, DriverError> {
984        let count = self.deferred_count;
985        self.deferred_count = 0;
986        self.guard
987            .flush_deferred_pipeline(&mut self.deferred_buf, count)
988    }
989
990    /// Number of operations currently buffered for deferred execution.
991    pub fn deferred_count(&self) -> usize {
992        self.deferred_count
993    }
994}
995
996impl Drop for Transaction {
997    fn drop(&mut self) {
998        if !self.committed {
999            // Connection is in an uncommitted transaction — discard it from the pool.
1000            // Take the connection out of the guard and drop it, decrementing open_count.
1001            if let Some(_conn) = self.guard.conn.take() {
1002                self.guard.pool.open_count.fetch_sub(1, Ordering::AcqRel);
1003                // Connection dropped — PG server will auto-rollback when it sees disconnect
1004            }
1005        }
1006    }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use super::*;
1012
1013    #[test]
1014    fn pool_builder_requires_url() {
1015        let result = PoolBuilder::new().build();
1016        assert!(result.is_err());
1017    }
1018
1019    #[test]
1020    fn pool_builder_validates_url() {
1021        let result = PoolBuilder::new().url("not_a_url").build();
1022        assert!(result.is_err());
1023    }
1024
1025    #[test]
1026    fn pool_builder_accepts_valid_url() {
1027        let pool = PoolBuilder::new()
1028            .url("postgres://user:pass@localhost/db")
1029            .max_size(5)
1030            .build()
1031            .unwrap();
1032        assert_eq!(pool.max_size(), 5);
1033        assert_eq!(pool.open_count(), 0);
1034    }
1035
1036    #[test]
1037    fn pool_connect_validates_url() {
1038        let result = Pool::connect("not_a_url");
1039        assert!(result.is_err());
1040    }
1041
1042    #[test]
1043    fn pool_max_size_zero() {
1044        let pool = PoolBuilder::new()
1045            .url("postgres://user:pass@localhost/db")
1046            .max_size(0)
1047            .build()
1048            .unwrap();
1049
1050        let result = pool.acquire();
1051        assert!(result.is_err());
1052        match result {
1053            Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1054            Err(e) => panic!("expected Pool error, got: {e:?}"),
1055            Ok(_) => panic!("expected error, got Ok"),
1056        }
1057    }
1058
1059    #[test]
1060    fn pool_clone_shares_state() {
1061        let pool = PoolBuilder::new()
1062            .url("postgres://user:pass@localhost/db")
1063            .max_size(5)
1064            .build()
1065            .unwrap();
1066
1067        let pool2 = pool.clone();
1068        assert_eq!(pool.max_size(), pool2.max_size());
1069    }
1070
1071    // --- Audit gap tests ---
1072
1073    // #60: max_lifetime is configurable
1074    #[test]
1075    fn pool_builder_max_lifetime() {
1076        let pool = PoolBuilder::new()
1077            .url("postgres://user:pass@localhost/db")
1078            .max_lifetime(Some(Duration::from_secs(60)))
1079            .build()
1080            .unwrap();
1081        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(60)));
1082    }
1083
1084    // #60: max_lifetime None
1085    #[test]
1086    fn pool_builder_max_lifetime_none() {
1087        let pool = PoolBuilder::new()
1088            .url("postgres://user:pass@localhost/db")
1089            .max_lifetime(None)
1090            .build()
1091            .unwrap();
1092        assert_eq!(pool.inner.max_lifetime, None);
1093    }
1094
1095    // #62: acquire_timeout set to None (fail-fast)
1096    #[test]
1097    fn pool_builder_acquire_timeout_none() {
1098        let pool = PoolBuilder::new()
1099            .url("postgres://user:pass@localhost/db")
1100            .acquire_timeout(None)
1101            .build()
1102            .unwrap();
1103        assert_eq!(pool.inner.acquire_timeout, None);
1104    }
1105
1106    // #62: acquire_timeout custom value
1107    #[test]
1108    fn pool_builder_acquire_timeout_custom() {
1109        let pool = PoolBuilder::new()
1110            .url("postgres://user:pass@localhost/db")
1111            .acquire_timeout(Some(Duration::from_secs(10)))
1112            .build()
1113            .unwrap();
1114        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(10)));
1115    }
1116
1117    // #63: min_idle setting
1118    #[test]
1119    fn pool_builder_min_idle() {
1120        let pool = PoolBuilder::new()
1121            .url("postgres://user:pass@localhost/db")
1122            .min_idle(2)
1123            .build()
1124            .unwrap();
1125        assert_eq!(pool.inner.min_idle, 2);
1126    }
1127
1128    // #64: Pool close marks pool as closed
1129    #[test]
1130    fn pool_close_marks_closed() {
1131        let pool = PoolBuilder::new()
1132            .url("postgres://user:pass@localhost/db")
1133            .max_size(5)
1134            .build()
1135            .unwrap();
1136
1137        assert!(!pool.is_closed());
1138        pool.close();
1139        assert!(pool.is_closed());
1140
1141        // New acquires should fail
1142        let result = pool.acquire();
1143        assert!(result.is_err());
1144        match result {
1145            Err(DriverError::Pool(msg)) => assert!(msg.contains("closed")),
1146            Err(e) => panic!("expected Pool(closed) error, got: {e:?}"),
1147            Ok(_) => panic!("expected error, got Ok"),
1148        }
1149    }
1150
1151    // #67: PoolStatus idle/active counts
1152    #[test]
1153    fn pool_status_initial() {
1154        let pool = PoolBuilder::new()
1155            .url("postgres://user:pass@localhost/db")
1156            .max_size(10)
1157            .build()
1158            .unwrap();
1159
1160        let status = pool.status();
1161        assert_eq!(status.idle, 0);
1162        assert_eq!(status.active, 0);
1163        assert_eq!(status.open, 0);
1164        assert_eq!(status.max_size, 10);
1165    }
1166
1167    // Default pool builder values
1168    #[test]
1169    fn pool_builder_defaults() {
1170        let pool = PoolBuilder::new()
1171            .url("postgres://user:pass@localhost/db")
1172            .build()
1173            .unwrap();
1174
1175        assert_eq!(pool.max_size(), 10);
1176        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(30 * 60)));
1177        assert_eq!(pool.inner.acquire_timeout, None); // fail-fast by default (CREDO #17)
1178        assert_eq!(pool.inner.min_idle, 0);
1179    }
1180
1181    // Pool open_count starts at 0
1182    #[test]
1183    fn pool_open_count_initial() {
1184        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1185        assert_eq!(pool.open_count(), 0);
1186    }
1187
1188    // --- Task 7: max_stmt_cache_size ---
1189
1190    #[test]
1191    fn pool_builder_max_stmt_cache_size_default() {
1192        let pool = PoolBuilder::new()
1193            .url("postgres://user:pass@localhost/db")
1194            .build()
1195            .unwrap();
1196        assert_eq!(pool.inner.max_stmt_cache_size, 256);
1197    }
1198
1199    #[test]
1200    fn pool_builder_max_stmt_cache_size_custom() {
1201        let pool = PoolBuilder::new()
1202            .url("postgres://user:pass@localhost/db")
1203            .max_stmt_cache_size(512)
1204            .build()
1205            .unwrap();
1206        assert_eq!(pool.inner.max_stmt_cache_size, 512);
1207    }
1208
1209    // --- Auto-UDS detection tests ---
1210
1211    #[test]
1212    fn pool_is_uds_false_for_tcp() {
1213        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1214        assert!(!pool.is_uds());
1215    }
1216
1217    #[cfg(unix)]
1218    #[test]
1219    fn pool_is_uds_true_for_unix_socket() {
1220        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1221        assert!(pool.is_uds());
1222    }
1223
1224    #[cfg(unix)]
1225    #[test]
1226    fn pool_is_uds_true_for_var_run_socket() {
1227        let pool = Pool::connect("postgres://user@localhost/db?host=/var/run/postgresql").unwrap();
1228        assert!(pool.is_uds());
1229    }
1230
1231    #[test]
1232    fn pool_is_uds_false_for_ip_address() {
1233        let pool = Pool::connect("postgres://user:pass@127.0.0.1/db").unwrap();
1234        assert!(!pool.is_uds());
1235    }
1236
1237    #[cfg(unix)]
1238    #[test]
1239    fn pool_slot_sync_created_for_uds_config() {
1240        let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1241        assert!(config.host_is_uds());
1242    }
1243
1244    #[test]
1245    fn pool_slot_tcp_config() {
1246        let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1247        assert!(!config.host_is_uds());
1248    }
1249
1250    // ===============================================================
1251    // Pool::is_uds — extended tests
1252    // ===============================================================
1253
1254    #[test]
1255    fn pool_is_uds_false_for_hostname() {
1256        let pool = Pool::connect("postgres://user:pass@db.example.com/db").unwrap();
1257        assert!(!pool.is_uds());
1258    }
1259
1260    #[cfg(unix)]
1261    #[test]
1262    fn pool_is_uds_true_for_tmp() {
1263        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1264        assert!(pool.is_uds());
1265    }
1266
1267    // ===============================================================
1268    // Pool close semantics
1269    // ===============================================================
1270
1271    #[test]
1272    fn pool_close_then_acquire_fails() {
1273        let pool = PoolBuilder::new()
1274            .url("postgres://user:pass@localhost/db")
1275            .max_size(5)
1276            .build()
1277            .unwrap();
1278        pool.close();
1279        let result = pool.acquire();
1280        assert!(result.is_err());
1281        match result {
1282            Err(DriverError::Pool(msg)) => {
1283                assert!(msg.contains("closed"), "should say closed: {msg}")
1284            }
1285            Err(e) => panic!("expected Pool error, got: {e:?}"),
1286            Ok(_) => panic!("expected error"),
1287        }
1288    }
1289
1290    #[test]
1291    fn pool_is_closed_before_and_after() {
1292        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1293        assert!(!pool.is_closed());
1294        pool.close();
1295        assert!(pool.is_closed());
1296    }
1297
1298    // ===============================================================
1299    // Pool exhaustion (fail-fast without timeout)
1300    // ===============================================================
1301
1302    #[test]
1303    fn pool_exhausted_no_timeout() {
1304        let pool = PoolBuilder::new()
1305            .url("postgres://user:pass@localhost/db")
1306            .max_size(0)
1307            .acquire_timeout(None) // fail-fast
1308            .build()
1309            .unwrap();
1310        let result = pool.acquire();
1311        assert!(result.is_err());
1312        match result {
1313            Err(DriverError::Pool(msg)) => {
1314                assert!(msg.contains("exhausted"), "should say exhausted: {msg}")
1315            }
1316            Err(e) => panic!("expected Pool error, got: {e:?}"),
1317            Ok(_) => panic!("expected error"),
1318        }
1319    }
1320
1321    // ===============================================================
1322    // PoolBuilder validation
1323    // ===============================================================
1324
1325    #[test]
1326    fn pool_builder_no_url_error() {
1327        let result = PoolBuilder::new().max_size(5).build();
1328        assert!(result.is_err());
1329        match result {
1330            Err(DriverError::Pool(msg)) => {
1331                assert!(msg.contains("URL"), "should mention URL: {msg}")
1332            }
1333            Err(e) => panic!("expected Pool error, got: {e:?}"),
1334            Ok(_) => panic!("expected error"),
1335        }
1336    }
1337
1338    #[test]
1339    fn pool_builder_invalid_url_error() {
1340        let result = PoolBuilder::new().url("ftp://something").build();
1341        assert!(result.is_err());
1342    }
1343
1344    #[test]
1345    fn pool_builder_stmt_cache_size_zero() {
1346        let pool = PoolBuilder::new()
1347            .url("postgres://user:pass@localhost/db")
1348            .max_stmt_cache_size(0)
1349            .build()
1350            .unwrap();
1351        assert_eq!(pool.inner.max_stmt_cache_size, 0);
1352    }
1353
1354    // ===============================================================
1355    // PoolStatus
1356    // ===============================================================
1357
1358    #[test]
1359    fn pool_status_reflects_max_size() {
1360        let pool = PoolBuilder::new()
1361            .url("postgres://user:pass@localhost/db")
1362            .max_size(20)
1363            .build()
1364            .unwrap();
1365        let status = pool.status();
1366        assert_eq!(status.max_size, 20);
1367        assert_eq!(status.idle, 0);
1368        assert_eq!(status.active, 0);
1369        assert_eq!(status.open, 0);
1370    }
1371
1372    // ===============================================================
1373    // Pool clone
1374    // ===============================================================
1375
1376    #[test]
1377    fn pool_clone_shares_config() {
1378        let pool = PoolBuilder::new()
1379            .url("postgres://user:pass@localhost/db")
1380            .max_size(7)
1381            .build()
1382            .unwrap();
1383        let p2 = pool.clone();
1384        assert_eq!(pool.max_size(), 7);
1385        assert_eq!(p2.max_size(), 7);
1386        assert_eq!(pool.open_count(), p2.open_count());
1387    }
1388
1389    // ===============================================================
1390    // set_warmup_sqls
1391    // ===============================================================
1392
1393    #[test]
1394    fn pool_set_warmup_sqls_empty() {
1395        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1396        pool.set_warmup_sqls(&[]);
1397        let sqls = pool
1398            .inner
1399            .warmup_sqls
1400            .lock()
1401            .unwrap_or_else(|e| e.into_inner())
1402            .clone();
1403        assert!(sqls.is_empty());
1404    }
1405
1406    #[test]
1407    fn pool_set_warmup_sqls_multiple() {
1408        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1409        pool.set_warmup_sqls(&["SELECT 1", "SELECT 2", "SELECT 3"]);
1410        let sqls = pool
1411            .inner
1412            .warmup_sqls
1413            .lock()
1414            .unwrap_or_else(|e| e.into_inner())
1415            .clone();
1416        assert_eq!(sqls.len(), 3);
1417        assert_eq!(&*sqls[0], "SELECT 1");
1418        assert_eq!(&*sqls[1], "SELECT 2");
1419        assert_eq!(&*sqls[2], "SELECT 3");
1420    }
1421
1422    #[test]
1423    fn pool_set_warmup_sqls_overwrite() {
1424        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1425        pool.set_warmup_sqls(&["SELECT 1"]);
1426        pool.set_warmup_sqls(&["SELECT 99"]);
1427        let sqls = pool
1428            .inner
1429            .warmup_sqls
1430            .lock()
1431            .unwrap_or_else(|e| e.into_inner())
1432            .clone();
1433        assert_eq!(sqls.len(), 1);
1434        assert_eq!(&*sqls[0], "SELECT 99");
1435    }
1436
1437    // ===============================================================
1438    // PoolStatus Debug
1439    // ===============================================================
1440
1441    #[test]
1442    fn pool_status_debug() {
1443        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1444        let status = pool.status();
1445        let dbg = format!("{status:?}");
1446        assert!(dbg.contains("PoolStatus"));
1447        assert!(dbg.contains("idle"));
1448        assert!(dbg.contains("active"));
1449        assert!(dbg.contains("open"));
1450        assert!(dbg.contains("max_size"));
1451    }
1452
1453    // ===============================================================
1454    // Config host_is_uds via pool (structural tests)
1455    // ===============================================================
1456
1457    #[test]
1458    fn config_host_is_uds_returns_true_for_slash() {
1459        let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1460        assert!(config.host_is_uds());
1461    }
1462
1463    #[test]
1464    fn config_host_is_uds_returns_false_for_tcp() {
1465        let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1466        assert!(!config.host_is_uds());
1467    }
1468
1469    #[test]
1470    fn config_host_is_uds_returns_false_for_ip() {
1471        let config = Config::from_url("postgres://user:pass@192.168.1.1/db").unwrap();
1472        assert!(!config.host_is_uds());
1473    }
1474
1475    // ===============================================================
1476    // PoolBuilder chaining
1477    // ===============================================================
1478
1479    #[test]
1480    fn pool_builder_full_chain() {
1481        let pool = PoolBuilder::new()
1482            .url("postgres://user:pass@localhost/db")
1483            .max_size(3)
1484            .max_lifetime(Some(Duration::from_secs(600)))
1485            .acquire_timeout(Some(Duration::from_secs(5)))
1486            .min_idle(1)
1487            .max_stmt_cache_size(128)
1488            .build()
1489            .unwrap();
1490        assert_eq!(pool.max_size(), 3);
1491        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(600)));
1492        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(5)));
1493        assert_eq!(pool.inner.min_idle, 1);
1494        assert_eq!(pool.inner.max_stmt_cache_size, 128);
1495    }
1496
1497    // --- Audit: PoolGuard drop discards connections in bad state ---
1498
1499    #[test]
1500    fn pool_max_size_zero_rejects_all_acquires() {
1501        let pool = PoolBuilder::new()
1502            .url("postgres://user:pass@localhost/db")
1503            .max_size(0)
1504            .build()
1505            .unwrap();
1506        let result = pool.acquire();
1507        assert!(result.is_err());
1508        match &result {
1509            Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1510            _ => panic!("expected pool exhausted error"),
1511        }
1512    }
1513
1514    // --- Audit: URL parsing edge cases ---
1515
1516    #[test]
1517    fn url_parse_unknown_sslmode_returns_error() {
1518        let result = Config::from_url("postgres://u:p@h/d?sslmode=bogus");
1519        assert!(result.is_err());
1520        let msg = format!("{}", result.unwrap_err());
1521        assert!(msg.contains("unknown sslmode"));
1522    }
1523
1524    #[test]
1525    fn url_parse_invalid_port_returns_error() {
1526        let result = Config::from_url("postgres://u:p@h:abc/d");
1527        assert!(result.is_err());
1528        let msg = format!("{}", result.unwrap_err());
1529        assert!(msg.contains("invalid port"));
1530    }
1531
1532    #[test]
1533    fn url_parse_missing_at_sign_returns_error() {
1534        let result = Config::from_url("postgres://u:plocalhost/d");
1535        assert!(result.is_err());
1536        let msg = format!("{}", result.unwrap_err());
1537        assert!(msg.contains("missing @"));
1538    }
1539
1540    #[test]
1541    fn url_parse_empty_host_returns_error() {
1542        let result = Config::from_url("postgres://u:p@/d");
1543        assert!(result.is_err());
1544    }
1545
1546    #[test]
1547    fn url_parse_empty_user_returns_error() {
1548        let result = Config::from_url("postgres://:p@h/d");
1549        assert!(result.is_err());
1550    }
1551
1552    #[test]
1553    fn url_parse_statement_timeout_invalid_uses_default() {
1554        let config = Config::from_url("postgres://u:p@h/d?statement_timeout=notnum").unwrap();
1555        assert_eq!(config.statement_timeout_secs, 30);
1556    }
1557
1558    #[test]
1559    fn url_parse_malformed_percent_encoding() {
1560        let result = Config::from_url("postgres://u%:p@h/d");
1561        assert!(result.is_err());
1562    }
1563
1564    #[test]
1565    fn url_parse_invalid_hex_in_percent_encoding() {
1566        let result = Config::from_url("postgres://u%ZZ:p@h/d");
1567        assert!(result.is_err());
1568    }
1569}