Skip to main content

bsql_driver_postgres/
pool.rs

1//! Connection pool — LIFO ordering, Condvar-based waiting.
2//!
3//! The pool maintains a stack of idle connections. `acquire()` pops the top
4//! (most recently used = warmest caches). On drop, the guard pushes the
5//! connection back. If the pool is exhausted, callers wait on a `Condvar`
6//! up to `acquire_timeout` (default: 5 seconds). Set `acquire_timeout` to
7//! `None` for fail-fast behavior (immediate error when exhausted).
8
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11use std::time::Duration;
12
13use crate::DriverError;
14use crate::arena::Arena;
15use crate::codec::Encode;
16use crate::conn::Connection;
17use crate::types::{Config, PgDataRow, QueryResult, SimpleRow};
18
19#[cfg(feature = "async")]
20use crate::async_conn::AsyncConnection;
21
22// --- PoolSlot ---
23
24/// A connection slot — either sync (UDS/TCP) or async (TCP only).
25///
26/// The pool auto-detects: UDS hosts get sync `Connection`, TCP hosts get
27/// `AsyncConnection` (when the `async` feature is enabled). When `async`
28/// is disabled, all connections are sync.
29pub(crate) enum PoolSlot {
30    /// Sync connection (UDS or TCP without async feature).
31    Sync(Connection),
32    /// Async TCP connection (requires async feature + tokio runtime).
33    #[cfg(feature = "async")]
34    Async(AsyncConnection),
35}
36
37// --- Pool ---
38
39/// A connection pool with LIFO ordering and fail-fast semantics.
40///
41/// # Example
42///
43/// ```no_run
44/// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
45/// let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
46/// let mut conn = pool.acquire()?;
47/// conn.simple_query("SELECT 1")?;
48/// // conn is returned to pool on drop
49/// # Ok(())
50/// # }
51/// ```
52pub struct Pool {
53    inner: Arc<PoolInner>,
54}
55
56struct PoolInner {
57    /// Idle connections. Uses std::sync::Mutex because the critical section is
58    /// trivial (push/pop — no I/O). This lets PoolGuard::Drop return connections
59    /// synchronously.
60    stack: std::sync::Mutex<Vec<PoolSlot>>,
61    max_size: usize,
62    open_count: AtomicUsize,
63    config: Arc<Config>,
64    /// When true, no new acquires are accepted.
65    closed: AtomicBool,
66    /// Condvar pair for release notification. Waiters block on the Condvar
67    /// when the pool is exhausted and `acquire_timeout` is set.
68    release_pair: (std::sync::Mutex<()>, std::sync::Condvar),
69    /// Maximum lifetime of a connection. Connections older than this
70    /// are discarded when popped from the pool. Default: 30 minutes.
71    max_lifetime: Option<Duration>,
72    /// Maximum time to wait for a connection. Default: None (fail-fast).
73    acquire_timeout: Option<Duration>,
74    /// Minimum number of idle connections to maintain. Default: 0.
75    min_idle: usize,
76    /// SQL statements to PREPARE on new connections (warmup).
77    warmup_sqls: std::sync::Mutex<Arc<Vec<Box<str>>>>,
78    /// Maximum number of cached prepared statements per connection.
79    max_stmt_cache_size: usize,
80    /// Maximum idle duration before a connection is considered stale and discarded.
81    /// Connections idle longer than this are dropped on acquire. Default: 30 seconds.
82    stale_timeout: Duration,
83}
84
85impl Pool {
86    /// Create a pool from a connection URL with default settings (max_size = 10).
87    ///
88    /// Validates the URL but does not open any connections yet (lazy initialization).
89    pub fn connect(url: &str) -> Result<Self, DriverError> {
90        PoolBuilder::new().url(url).build()
91    }
92
93    /// Create a pool builder for custom configuration.
94    pub fn builder() -> PoolBuilder {
95        PoolBuilder::new()
96    }
97
98    /// Acquire a connection from the pool.
99    ///
100    /// Returns immediately with the most recently used idle connection (LIFO).
101    /// If no idle connections are available and the pool is below max_size, a new
102    /// connection is created. If the pool is at max_size and no `acquire_timeout`
103    /// is set, returns `DriverError::Pool` immediately. If `acquire_timeout` is
104    /// set, blocks until a connection is returned or the timeout expires.
105    #[inline]
106    pub fn acquire(&self) -> Result<PoolGuard, DriverError> {
107        if self.inner.closed.load(Ordering::Acquire) {
108            return Err(DriverError::Pool("pool is closed".into()));
109        }
110
111        // Try to pop an idle connection (fast path).
112        if let Some(guard) = self.try_pop_idle()? {
113            return Ok(guard);
114        }
115
116        // No idle connections — try to claim a slot with a proper CAS loop.
117        loop {
118            let current = self.inner.open_count.load(Ordering::Acquire);
119            if current >= self.inner.max_size {
120                if let Some(timeout) = self.inner.acquire_timeout {
121                    let (lock, cvar) = &self.inner.release_pair;
122                    let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
123                    let (_guard, result) = cvar
124                        .wait_timeout(guard, timeout)
125                        .unwrap_or_else(|e| e.into_inner());
126                    if result.timed_out() {
127                        return Err(DriverError::Pool(
128                            "pool exhausted: acquire timeout expired".into(),
129                        ));
130                    }
131                    // A connection was returned — try again
132                    if let Some(guard) = self.try_pop_idle()? {
133                        return Ok(guard);
134                    }
135                    // Popped nothing — retry CAS
136                    continue;
137                }
138                return Err(DriverError::Pool(
139                    "pool exhausted: all connections in use".into(),
140                ));
141            }
142            if self
143                .inner
144                .open_count
145                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
146                .is_ok()
147            {
148                break;
149            }
150            // CAS failed — another thread incremented. Retry.
151        }
152
153        // Open a new connection
154        let conn_result = Connection::connect_arc(self.inner.config.clone());
155        match conn_result {
156            Ok(mut conn) => {
157                // Configure statement cache size
158                conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
159                // Warmup: pre-PREPARE frequently used statements
160                self.warmup_conn(&mut conn);
161
162                Ok(PoolGuard {
163                    conn: Some(PoolSlot::Sync(conn)),
164                    pool: self.inner.clone(),
165                    discard: false,
166                })
167            }
168            Err(e) => {
169                // Give back the slot
170                self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
171                Err(e)
172            }
173        }
174    }
175
176    /// Try to pop a valid idle connection from the stack.
177    ///
178    /// Performs lifetime and stale checks. For connections idle > 5 seconds
179    /// (but within the stale timeout), sends an empty query as a health check
180    /// to verify the connection is still alive before returning it.
181    #[inline]
182    fn try_pop_idle(&self) -> Result<Option<PoolGuard>, DriverError> {
183        let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
184        while let Some(mut slot) = stack.pop() {
185            let (created_at, idle_dur) = match &slot {
186                PoolSlot::Sync(conn) => (conn.created_at(), conn.idle_duration()),
187                #[cfg(feature = "async")]
188                PoolSlot::Async(conn) => (conn.created_at(), conn.idle_duration()),
189            };
190            if let Some(max_lifetime) = self.inner.max_lifetime {
191                if created_at.elapsed() >= max_lifetime {
192                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
193                    continue;
194                }
195            }
196            if idle_dur >= self.inner.stale_timeout {
197                // Stale connection — drop it, free the slot
198                self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
199                continue;
200            }
201            // Health check: verify connection is still alive if idle > 5 seconds.
202            // Sends an empty query — PG returns EmptyQueryResponse + ReadyForQuery.
203            // Fast: one round-trip, ~15us on UDS. Skip for hot connections.
204            if idle_dur > Duration::from_secs(5) {
205                let alive = match &mut slot {
206                    PoolSlot::Sync(conn) => conn.simple_query("").is_ok(),
207                    #[cfg(feature = "async")]
208                    PoolSlot::Async(_) => true, // async connections are checked at I/O time
209                };
210                if !alive {
211                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
212                    continue;
213                }
214            }
215            return Ok(Some(PoolGuard {
216                conn: Some(slot),
217                pool: self.inner.clone(),
218                discard: false,
219            }));
220        }
221        Ok(None)
222    }
223
224    /// Whether this pool uses UDS connections.
225    ///
226    /// Returns `true` when the pool URL points to a Unix domain socket.
227    /// On non-Unix platforms, always returns `false`.
228    pub fn is_uds(&self) -> bool {
229        #[cfg(unix)]
230        {
231            self.inner.config.host_is_uds()
232        }
233        #[cfg(not(unix))]
234        {
235            false
236        }
237    }
238
239    /// Begin a transaction. Acquires a connection and sends BEGIN.
240    pub fn begin(&self) -> Result<Transaction, DriverError> {
241        let mut guard = self.acquire()?;
242        guard.simple_query("BEGIN")?;
243        Ok(Transaction {
244            guard,
245            committed: false,
246            deferred_buf: Vec::new(),
247            deferred_count: 0,
248        })
249    }
250
251    /// Current number of open connections (idle + in-use).
252    pub fn open_count(&self) -> usize {
253        self.inner.open_count.load(Ordering::Relaxed)
254    }
255
256    /// Maximum pool size.
257    pub fn max_size(&self) -> usize {
258        self.inner.max_size
259    }
260
261    /// Pool status metrics.
262    pub fn status(&self) -> PoolStatus {
263        let idle = self
264            .inner
265            .stack
266            .lock()
267            .unwrap_or_else(|e| e.into_inner())
268            .len();
269        let open = self.inner.open_count.load(Ordering::Relaxed);
270        let active = open.saturating_sub(idle);
271        PoolStatus {
272            idle,
273            active,
274            open,
275            max_size: self.inner.max_size,
276        }
277    }
278
279    /// Pre-PREPARE warmup statements on a new connection.
280    ///
281    /// Uses `prepare_only()` which sends Parse+Describe+Sync without
282    /// Bind+Execute — no query execution, only statement caching.
283    ///
284    /// Best-effort: errors on individual statements are silently ignored.
285    /// The connection remains usable even if warmup fails.
286    fn warmup_conn(&self, conn: &mut Connection) {
287        let sqls = self
288            .inner
289            .warmup_sqls
290            .lock()
291            .unwrap_or_else(|e| e.into_inner())
292            .clone();
293
294        if sqls.is_empty() {
295            return;
296        }
297
298        for sql in sqls.iter() {
299            let sql_hash = crate::types::hash_sql(sql);
300            let _ = conn.prepare_only(sql, sql_hash);
301        }
302    }
303
304    /// Set the SQL statements to pre-PREPARE on new connections.
305    ///
306    /// Each SQL string is PREPAREd (Parse+Describe+Sync) on new connections
307    /// before they are returned from `acquire()`. This eliminates the first-use
308    /// Parse overhead for frequently executed queries.
309    ///
310    /// Warmup errors are silently ignored — a bad warmup SQL must not prevent
311    /// the connection from being usable.
312    ///
313    /// # Example
314    ///
315    /// ```no_run
316    /// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
317    /// let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
318    /// pool.set_warmup_sqls(&[
319    ///     "SELECT id, name FROM users WHERE id = $1::int4",
320    ///     "SELECT id, title FROM tickets WHERE status = ANY($1::text[])",
321    /// ]);
322    /// # Ok(())
323    /// # }
324    /// ```
325    pub fn set_warmup_sqls(&self, sqls: &[&str]) {
326        let boxed: Arc<Vec<Box<str>>> =
327            Arc::new(sqls.iter().map(|s| (*s).into()).collect::<Vec<_>>());
328        *self
329            .inner
330            .warmup_sqls
331            .lock()
332            .unwrap_or_else(|e| e.into_inner()) = boxed;
333    }
334
335    /// Close the pool. No new acquires are accepted. All idle connections
336    /// are sent Terminate and dropped.
337    pub fn close(&self) {
338        self.inner.closed.store(true, Ordering::Release);
339        // Drain and close all idle connections
340        let slots: Vec<PoolSlot> = {
341            let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
342            std::mem::take(&mut *stack)
343        };
344        for slot in slots {
345            self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
346            match slot {
347                PoolSlot::Sync(conn) => {
348                    let _ = conn.close();
349                }
350                #[cfg(feature = "async")]
351                PoolSlot::Async(_conn) => {
352                    // AsyncConnection::close() is async — we can't await in sync close().
353                    // Drop will close the TCP socket, PG auto-cleans up.
354                }
355            }
356        }
357        // Notify any waiters so they get the "pool is closed" error
358        let (_, cvar) = &self.inner.release_pair;
359        cvar.notify_all();
360    }
361
362    /// Whether the pool has been closed.
363    pub fn is_closed(&self) -> bool {
364        self.inner.closed.load(Ordering::Acquire)
365    }
366
367    /// Acquire a connection from the pool (async).
368    ///
369    /// Auto-detects transport: UDS hosts get a sync `Connection`, TCP hosts
370    /// get an `AsyncConnection`. If the `async` feature is disabled, always
371    /// creates sync connections.
372    ///
373    /// Returns immediately with the most recently used idle connection (LIFO).
374    /// If no idle connections are available and the pool is below max_size, a new
375    /// connection is created.
376    #[cfg(feature = "async")]
377    pub async fn acquire_async(&self) -> Result<PoolGuard, DriverError> {
378        if self.inner.closed.load(Ordering::Acquire) {
379            return Err(DriverError::Pool("pool is closed".into()));
380        }
381
382        // Try to pop an idle connection (fast path).
383        if let Some(guard) = self.try_pop_idle()? {
384            return Ok(guard);
385        }
386
387        // No idle connections — try to claim a slot with a proper CAS loop.
388        loop {
389            let current = self.inner.open_count.load(Ordering::Acquire);
390            if current >= self.inner.max_size {
391                if let Some(timeout) = self.inner.acquire_timeout {
392                    let (lock, cvar) = &self.inner.release_pair;
393                    let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
394                    let (_guard, result) = cvar
395                        .wait_timeout(guard, timeout)
396                        .unwrap_or_else(|e| e.into_inner());
397                    if result.timed_out() {
398                        return Err(DriverError::Pool(
399                            "pool exhausted: acquire timeout expired".into(),
400                        ));
401                    }
402                    if let Some(guard) = self.try_pop_idle()? {
403                        return Ok(guard);
404                    }
405                    continue;
406                }
407                return Err(DriverError::Pool(
408                    "pool exhausted: all connections in use".into(),
409                ));
410            }
411            if self
412                .inner
413                .open_count
414                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
415                .is_ok()
416            {
417                break;
418            }
419        }
420
421        // Open a new connection — auto-detect UDS vs TCP
422        if self.inner.config.host_is_uds() {
423            // UDS — use sync Connection
424            let conn_result = Connection::connect_arc(self.inner.config.clone());
425            match conn_result {
426                Ok(mut conn) => {
427                    conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
428                    self.warmup_conn(&mut conn);
429                    Ok(PoolGuard {
430                        conn: Some(PoolSlot::Sync(conn)),
431                        pool: self.inner.clone(),
432                        discard: false,
433                    })
434                }
435                Err(e) => {
436                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
437                    Err(e)
438                }
439            }
440        } else {
441            // TCP — use AsyncConnection
442            let conn_result = AsyncConnection::connect_arc(self.inner.config.clone()).await;
443            match conn_result {
444                Ok(mut conn) => {
445                    conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
446                    Ok(PoolGuard {
447                        conn: Some(PoolSlot::Async(conn)),
448                        pool: self.inner.clone(),
449                        discard: false,
450                    })
451                }
452                Err(e) => {
453                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
454                    Err(e)
455                }
456            }
457        }
458    }
459}
460
461impl Clone for Pool {
462    fn clone(&self) -> Self {
463        Pool {
464            inner: self.inner.clone(),
465        }
466    }
467}
468
469// --- PoolStatus ---
470
471/// Pool status metrics.
472#[derive(Debug, Clone, Copy)]
473pub struct PoolStatus {
474    /// Number of idle connections in the pool.
475    pub idle: usize,
476    /// Number of connections currently in use.
477    pub active: usize,
478    /// Total open connections (idle + active).
479    pub open: usize,
480    /// Maximum pool size.
481    pub max_size: usize,
482}
483
484// --- PoolBuilder ---
485
486/// Builder for configuring a connection pool.
487pub struct PoolBuilder {
488    url: Option<String>,
489    max_size: usize,
490    /// Maximum lifetime of a connection.
491    max_lifetime: Option<Duration>,
492    /// Maximum time to wait for a connection when pool is exhausted.
493    acquire_timeout: Option<Duration>,
494    /// Minimum number of idle connections to maintain.
495    min_idle: usize,
496    /// Maximum number of cached prepared statements per connection.
497    max_stmt_cache_size: usize,
498    /// Maximum idle duration before a connection is considered stale.
499    stale_timeout: Duration,
500}
501
502impl PoolBuilder {
503    fn new() -> Self {
504        Self {
505            url: None,
506            max_size: 10,
507            max_lifetime: Some(Duration::from_secs(30 * 60)), // 30 min default
508            acquire_timeout: Some(Duration::from_secs(5)), // 5s default (matches common pool defaults)
509            min_idle: 0,                                   // no minimum by default
510            max_stmt_cache_size: 256,                      // LRU eviction at 256 stmts
511            stale_timeout: Duration::from_secs(30),        // 30s default
512        }
513    }
514
515    /// Set the connection URL.
516    pub fn url(mut self, url: &str) -> Self {
517        self.url = Some(url.to_owned());
518        self
519    }
520
521    /// Set the maximum pool size. Default: 10.
522    ///
523    /// A max_size of 0 means the pool will reject all acquire() calls immediately.
524    pub fn max_size(mut self, size: usize) -> Self {
525        self.max_size = size;
526        self
527    }
528
529    /// Set the maximum lifetime of a connection. Default: 30 minutes.
530    /// Set to None for unlimited lifetime.
531    pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
532        self.max_lifetime = lifetime;
533        self
534    }
535
536    /// Set the acquire timeout. Default: 5 seconds.
537    /// Set to None for fail-fast behavior when the pool is exhausted.
538    pub fn acquire_timeout(mut self, timeout: Option<Duration>) -> Self {
539        self.acquire_timeout = timeout;
540        self
541    }
542
543    /// Set the minimum number of idle connections. Default: 0.
544    /// When > 0, a background thread maintains this many idle connections.
545    pub fn min_idle(mut self, count: usize) -> Self {
546        self.min_idle = count;
547        self
548    }
549
550    /// Set the maximum number of cached prepared statements per connection.
551    /// Default: 256. When the cache exceeds this size, the least recently
552    /// used statement is evicted (Close sent to PG to free server memory).
553    pub fn max_stmt_cache_size(mut self, size: usize) -> Self {
554        self.max_stmt_cache_size = size;
555        self
556    }
557
558    /// Set the maximum idle duration before a connection is considered stale.
559    /// Default: 30 seconds. Connections idle longer than this are dropped on
560    /// acquire instead of being reused.
561    pub fn stale_timeout(mut self, timeout: Duration) -> Self {
562        self.stale_timeout = timeout;
563        self
564    }
565
566    /// Build the pool. Validates the URL but does not open connections.
567    pub fn build(self) -> Result<Pool, DriverError> {
568        let url = self
569            .url
570            .ok_or_else(|| DriverError::Pool("pool builder requires a URL".into()))?;
571
572        let config = Arc::new(Config::from_url(&url)?);
573
574        let pool = Pool {
575            inner: Arc::new(PoolInner {
576                stack: std::sync::Mutex::new(Vec::with_capacity(self.max_size)),
577                max_size: self.max_size,
578                open_count: AtomicUsize::new(0),
579                config,
580                closed: AtomicBool::new(false),
581                release_pair: (std::sync::Mutex::new(()), std::sync::Condvar::new()),
582                max_lifetime: self.max_lifetime,
583                acquire_timeout: self.acquire_timeout,
584                min_idle: self.min_idle,
585                warmup_sqls: std::sync::Mutex::new(Arc::new(Vec::new())),
586                max_stmt_cache_size: self.max_stmt_cache_size,
587                stale_timeout: self.stale_timeout,
588            }),
589        };
590
591        if self.min_idle > 0 {
592            let inner = pool.inner.clone();
593            std::thread::spawn(move || {
594                maintain_min_idle(inner);
595            });
596        }
597
598        Ok(pool)
599    }
600}
601
602/// Background thread that maintains min_idle connections.
603fn maintain_min_idle(inner: Arc<PoolInner>) {
604    loop {
605        if inner.closed.load(Ordering::Acquire) {
606            return;
607        }
608
609        let idle_count = inner.stack.lock().unwrap_or_else(|e| e.into_inner()).len();
610        let needed = inner.min_idle.saturating_sub(idle_count);
611
612        for _ in 0..needed {
613            if inner.closed.load(Ordering::Acquire) {
614                return;
615            }
616            let current = inner.open_count.load(Ordering::Acquire);
617            if current >= inner.max_size {
618                break;
619            }
620            if inner
621                .open_count
622                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
623                .is_err()
624            {
625                continue;
626            }
627
628            match Connection::connect_arc(inner.config.clone()) {
629                Ok(conn) => {
630                    let mut stack = inner.stack.lock().unwrap_or_else(|e| e.into_inner());
631                    stack.push(PoolSlot::Sync(conn));
632                    let (_, cvar) = &inner.release_pair;
633                    cvar.notify_one();
634                }
635                Err(_) => {
636                    inner.open_count.fetch_sub(1, Ordering::AcqRel);
637                }
638            }
639        }
640
641        // Check every 1 second. Shorter interval ensures the thread exits promptly
642        // when pool.closed is set (worst-case 1s delay instead of 5s).
643        std::thread::sleep(Duration::from_secs(1));
644    }
645}
646
647// --- PoolGuard ---
648
649/// A borrowed connection from the pool. Returns to the pool on drop.
650///
651/// If the connection is in a failed transaction state, broken, or marked for
652/// discard, it is dropped (decrements open_count) instead of returned.
653pub struct PoolGuard {
654    conn: Option<PoolSlot>,
655    pool: Arc<PoolInner>,
656    /// When true, the connection is dropped instead of returned to the pool.
657    discard: bool,
658}
659
660impl PoolGuard {
661    /// Get a reference to the inner sync connection. Panics if the slot
662    /// holds an async connection.
663    #[inline]
664    fn sync_conn(&self) -> Result<&Connection, DriverError> {
665        match self.conn.as_ref() {
666            Some(PoolSlot::Sync(conn)) => Ok(conn),
667            #[cfg(feature = "async")]
668            Some(PoolSlot::Async(_)) => Err(DriverError::Pool(
669                "expected sync connection, got async; use async methods".into(),
670            )),
671            None => Err(DriverError::Pool("connection already taken".into())),
672        }
673    }
674
675    /// Get a mutable reference to the inner sync connection.
676    #[inline]
677    fn sync_conn_mut(&mut self) -> Result<&mut Connection, DriverError> {
678        match self.conn.as_mut() {
679            Some(PoolSlot::Sync(conn)) => Ok(conn),
680            #[cfg(feature = "async")]
681            Some(PoolSlot::Async(_)) => Err(DriverError::Pool(
682                "expected sync connection, got async; use async methods".into(),
683            )),
684            None => Err(DriverError::Pool("connection already taken".into())),
685        }
686    }
687
688    /// Mark this connection for discard — it will NOT be returned to the pool
689    /// on drop. The open_count is decremented and the TCP connection is closed.
690    pub fn mark_discard(&mut self) {
691        self.discard = true;
692    }
693
694    /// Cancel the currently running query on the underlying connection.
695    ///
696    /// Opens a new TCP connection and sends a CancelRequest to PG.
697    /// The cancel connection is closed immediately after.
698    pub fn cancel(&self) -> Result<(), DriverError> {
699        self.sync_conn()?.cancel()
700    }
701
702    // --- Introspection dispatch methods ---
703
704    /// Get the backend process ID for this connection.
705    pub fn pid(&self) -> i32 {
706        match self.conn.as_ref().expect("connection taken") {
707            PoolSlot::Sync(conn) => conn.pid(),
708            #[cfg(feature = "async")]
709            PoolSlot::Async(conn) => conn.pid(),
710        }
711    }
712
713    /// Whether the connection is idle (not in a transaction).
714    pub fn is_idle(&self) -> bool {
715        match self.conn.as_ref().expect("connection taken") {
716            PoolSlot::Sync(conn) => conn.is_idle(),
717            #[cfg(feature = "async")]
718            PoolSlot::Async(conn) => conn.is_idle(),
719        }
720    }
721
722    /// Whether the connection is inside a transaction.
723    pub fn is_in_transaction(&self) -> bool {
724        match self.conn.as_ref().expect("connection taken") {
725            PoolSlot::Sync(conn) => conn.is_in_transaction(),
726            #[cfg(feature = "async")]
727            PoolSlot::Async(conn) => conn.is_in_transaction(),
728        }
729    }
730
731    // --- Sync query dispatch methods ---
732
733    /// Execute a prepared query and return rows.
734    #[inline]
735    pub fn query(
736        &mut self,
737        sql: &str,
738        sql_hash: u64,
739        params: &[&(dyn Encode + Sync)],
740    ) -> Result<QueryResult, DriverError> {
741        self.sync_conn_mut()?.query(sql, sql_hash, params)
742    }
743
744    /// Execute a query without result rows (INSERT/UPDATE/DELETE).
745    #[inline]
746    pub fn execute(
747        &mut self,
748        sql: &str,
749        sql_hash: u64,
750        params: &[&(dyn Encode + Sync)],
751    ) -> Result<u64, DriverError> {
752        self.sync_conn_mut()?.execute(sql, sql_hash, params)
753    }
754
755    /// Execute the same statement N times with different params in one pipeline.
756    ///
757    /// Sends all N Bind+Execute messages + one Sync. One round-trip for N operations.
758    /// Returns the affected row count for each parameter set.
759    pub fn execute_pipeline(
760        &mut self,
761        sql: &str,
762        sql_hash: u64,
763        param_sets: &[&[&(dyn Encode + Sync)]],
764    ) -> Result<Vec<u64>, DriverError> {
765        self.sync_conn_mut()?
766            .execute_pipeline(sql, sql_hash, param_sets)
767    }
768
769    /// Execute a simple (unprepared) query.
770    pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
771        self.sync_conn_mut()?.simple_query(sql)
772    }
773
774    /// Execute a simple query and return rows as text.
775    ///
776    /// Uses PostgreSQL's simple query protocol — all values are strings.
777    pub fn simple_query_rows(&mut self, sql: &str) -> Result<Vec<SimpleRow>, DriverError> {
778        self.sync_conn_mut()?.simple_query_rows(sql)
779    }
780
781    /// Process each row via a closure with zero-copy `PgDataRow`.
782    pub fn for_each<F>(
783        &mut self,
784        sql: &str,
785        sql_hash: u64,
786        params: &[&(dyn Encode + Sync)],
787        f: F,
788    ) -> Result<(), DriverError>
789    where
790        F: FnMut(PgDataRow<'_>) -> Result<(), DriverError>,
791    {
792        self.sync_conn_mut()?.for_each(sql, sql_hash, params, f)
793    }
794
795    /// Process each DataRow as raw bytes — fastest path.
796    pub fn for_each_raw<F>(
797        &mut self,
798        sql: &str,
799        sql_hash: u64,
800        params: &[&(dyn Encode + Sync)],
801        f: F,
802    ) -> Result<(), DriverError>
803    where
804        F: FnMut(&[u8]) -> Result<(), DriverError>,
805    {
806        self.sync_conn_mut()?.for_each_raw(sql, sql_hash, params, f)
807    }
808
809    // --- Streaming ---
810
811    /// Start a streaming query.
812    pub fn query_streaming_start(
813        &mut self,
814        sql: &str,
815        sql_hash: u64,
816        params: &[&(dyn Encode + Sync)],
817        chunk_size: i32,
818    ) -> Result<(std::sync::Arc<[crate::types::ColumnDesc]>, bool), DriverError> {
819        self.sync_conn_mut()?
820            .query_streaming_start(sql, sql_hash, params, chunk_size)
821    }
822
823    /// Send Execute+Flush for a streaming query (2nd+ chunks).
824    pub fn streaming_send_execute(&mut self, chunk_size: i32) -> Result<(), DriverError> {
825        self.sync_conn_mut()?.streaming_send_execute(chunk_size)
826    }
827
828    /// Read the next chunk of rows from an in-progress streaming query.
829    pub fn streaming_next_chunk(
830        &mut self,
831        arena: &mut Arena,
832        all_col_offsets: &mut Vec<(usize, i32)>,
833    ) -> Result<bool, DriverError> {
834        self.sync_conn_mut()?
835            .streaming_next_chunk(arena, all_col_offsets)
836    }
837
838    // --- COPY protocol ---
839
840    /// Bulk copy data INTO a table from an iterator of text rows.
841    ///
842    /// Each row is a tab-separated string (TSV format). Returns the row count.
843    pub fn copy_in<'a, I>(
844        &mut self,
845        table: &str,
846        columns: &[&str],
847        rows: I,
848    ) -> Result<u64, DriverError>
849    where
850        I: IntoIterator<Item = &'a str>,
851    {
852        self.sync_conn_mut()?.copy_in(table, columns, rows)
853    }
854
855    /// Bulk copy data OUT of a table/query to a writer.
856    ///
857    /// Writes TSV-formatted rows. Returns the row count.
858    pub fn copy_out<W: std::io::Write>(
859        &mut self,
860        query: &str,
861        writer: &mut W,
862    ) -> Result<u64, DriverError> {
863        self.sync_conn_mut()?.copy_out(query, writer)
864    }
865
866    /// Whether this guard holds a sync connection.
867    pub fn is_sync(&self) -> bool {
868        matches!(self.conn.as_ref(), Some(PoolSlot::Sync(_)))
869    }
870
871    /// Whether this guard holds an async connection.
872    #[cfg(feature = "async")]
873    pub fn is_async(&self) -> bool {
874        matches!(self.conn.as_ref(), Some(PoolSlot::Async(_)))
875    }
876
877    // --- Async query dispatch methods ---
878
879    /// Execute a prepared query and return rows (async).
880    ///
881    /// Auto-dispatches: sync connections use blocking I/O, async connections
882    /// use tokio I/O. Returns an error if the guard holds a sync connection
883    /// and this method is called.
884    #[cfg(feature = "async")]
885    pub async fn query_async(
886        &mut self,
887        sql: &str,
888        sql_hash: u64,
889        params: &[&(dyn Encode + Sync)],
890    ) -> Result<QueryResult, DriverError> {
891        match self.conn.as_mut() {
892            Some(PoolSlot::Sync(conn)) => conn.query(sql, sql_hash, params),
893            Some(PoolSlot::Async(conn)) => conn.query(sql, sql_hash, params).await,
894            None => Err(DriverError::Pool("connection already taken".into())),
895        }
896    }
897
898    /// Execute without result rows (async).
899    #[cfg(feature = "async")]
900    pub async fn execute_async(
901        &mut self,
902        sql: &str,
903        sql_hash: u64,
904        params: &[&(dyn Encode + Sync)],
905    ) -> Result<u64, DriverError> {
906        match self.conn.as_mut() {
907            Some(PoolSlot::Sync(conn)) => conn.execute(sql, sql_hash, params),
908            Some(PoolSlot::Async(conn)) => conn.execute(sql, sql_hash, params).await,
909            None => Err(DriverError::Pool("connection already taken".into())),
910        }
911    }
912
913    /// Execute a simple query (async).
914    #[cfg(feature = "async")]
915    pub async fn simple_query_async(&mut self, sql: &str) -> Result<(), DriverError> {
916        match self.conn.as_mut() {
917            Some(PoolSlot::Sync(conn)) => conn.simple_query(sql),
918            Some(PoolSlot::Async(conn)) => conn.simple_query(sql).await,
919            None => Err(DriverError::Pool("connection already taken".into())),
920        }
921    }
922
923    // --- Deferred pipeline support ---
924
925    /// Ensure a statement is prepared and cached.
926    pub(crate) fn ensure_stmt_prepared(
927        &mut self,
928        sql: &str,
929        sql_hash: u64,
930        params: &[&(dyn Encode + Sync)],
931    ) -> Result<[u8; 18], DriverError> {
932        self.sync_conn_mut()?
933            .ensure_stmt_prepared(sql, sql_hash, params)
934    }
935
936    /// Write Bind+Execute bytes for a prepared statement into an external buffer.
937    pub(crate) fn write_deferred_bind_execute(
938        &self,
939        sql: &str,
940        sql_hash: u64,
941        params: &[&(dyn Encode + Sync)],
942        buf: &mut Vec<u8>,
943    ) {
944        let conn = self
945            .sync_conn()
946            .expect("sync_conn failed in write_deferred");
947        conn.write_deferred_bind_execute(sql, sql_hash, params, buf);
948    }
949
950    /// Flush a buffer of deferred Bind+Execute messages as a single pipeline.
951    pub(crate) fn flush_deferred_pipeline(
952        &mut self,
953        buf: &mut Vec<u8>,
954        count: usize,
955    ) -> Result<Vec<u64>, DriverError> {
956        self.sync_conn_mut()?.flush_deferred_pipeline(buf, count)
957    }
958}
959
960impl Drop for PoolGuard {
961    fn drop(&mut self) {
962        if let Some(slot) = self.conn.take() {
963            // Check discard conditions based on slot type.
964            let should_discard = self.discard
965                || self.pool.closed.load(Ordering::Acquire)
966                || match &slot {
967                    PoolSlot::Sync(conn) => {
968                        conn.is_in_failed_transaction()
969                            || conn.is_in_transaction()
970                            || conn.is_streaming()
971                    }
972                    #[cfg(feature = "async")]
973                    PoolSlot::Async(conn) => {
974                        conn.is_in_failed_transaction() || conn.is_in_transaction()
975                    }
976                };
977
978            if should_discard {
979                self.pool.open_count.fetch_sub(1, Ordering::AcqRel);
980                return;
981            }
982
983            // Stamp last-used time for idle connection tracking.
984            // Amortized: only call Instant::now() every 64 returns.
985            let mut slot = slot;
986            match &mut slot {
987                PoolSlot::Sync(conn) => {
988                    if conn.query_counter() & 63 == 0 {
989                        conn.touch();
990                    }
991                }
992                #[cfg(feature = "async")]
993                PoolSlot::Async(conn) => {
994                    if conn.query_counter() & 63 == 0 {
995                        conn.touch();
996                    }
997                }
998            }
999
1000            // Return to pool
1001            {
1002                let mut stack = self.pool.stack.lock().unwrap_or_else(|e| e.into_inner());
1003                stack.push(slot);
1004            }
1005
1006            // Notify waiters only if pool was exhausted (someone might be waiting).
1007            if self.pool.open_count.load(Ordering::Relaxed) >= self.pool.max_size {
1008                let (_, cvar) = &self.pool.release_pair;
1009                cvar.notify_one();
1010            }
1011        }
1012    }
1013}
1014
1015// --- Transaction ---
1016
1017/// A database transaction. Sends ROLLBACK on drop if not committed.
1018///
1019/// # Example
1020///
1021/// ```no_run
1022/// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
1023/// # let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
1024/// let mut tx = pool.begin()?;
1025/// tx.simple_query("INSERT INTO t VALUES (1)")?;
1026/// tx.commit()?;
1027/// # Ok(())
1028/// # }
1029/// ```
1030pub struct Transaction {
1031    guard: PoolGuard,
1032    committed: bool,
1033    /// Accumulated Bind+Execute message bytes for deferred operations.
1034    deferred_buf: Vec<u8>,
1035    /// Number of deferred operations buffered.
1036    deferred_count: usize,
1037}
1038
1039impl Transaction {
1040    /// Commit the transaction.
1041    ///
1042    /// Automatically flushes any deferred operations before committing.
1043    pub fn commit(mut self) -> Result<(), DriverError> {
1044        if self.deferred_count > 0 {
1045            self.flush_deferred()?;
1046        }
1047        self.guard.simple_query("COMMIT")?;
1048        self.committed = true;
1049        Ok(())
1050    }
1051
1052    /// Rollback the transaction explicitly.
1053    ///
1054    /// Discards any deferred operations without sending them.
1055    pub fn rollback(mut self) -> Result<(), DriverError> {
1056        self.deferred_buf.clear();
1057        self.deferred_count = 0;
1058        self.guard.simple_query("ROLLBACK")?;
1059        self.committed = true; // prevent double rollback in drop
1060        Ok(())
1061    }
1062
1063    /// Execute a prepared query within the transaction.
1064    ///
1065    /// Automatically flushes any deferred operations before executing the query,
1066    /// ensuring read-your-writes consistency.
1067    pub fn query(
1068        &mut self,
1069        sql: &str,
1070        sql_hash: u64,
1071        params: &[&(dyn Encode + Sync)],
1072    ) -> Result<QueryResult, DriverError> {
1073        if self.deferred_count > 0 {
1074            self.flush_deferred()?;
1075        }
1076        self.guard.query(sql, sql_hash, params)
1077    }
1078
1079    /// Execute without result rows within the transaction.
1080    pub fn execute(
1081        &mut self,
1082        sql: &str,
1083        sql_hash: u64,
1084        params: &[&(dyn Encode + Sync)],
1085    ) -> Result<u64, DriverError> {
1086        self.guard.execute(sql, sql_hash, params)
1087    }
1088
1089    /// Execute the same statement N times with different params in one pipeline.
1090    pub fn execute_pipeline(
1091        &mut self,
1092        sql: &str,
1093        sql_hash: u64,
1094        param_sets: &[&[&(dyn Encode + Sync)]],
1095    ) -> Result<Vec<u64>, DriverError> {
1096        self.guard.execute_pipeline(sql, sql_hash, param_sets)
1097    }
1098
1099    /// Process each row directly from the wire buffer within a transaction.
1100    ///
1101    /// Automatically flushes any deferred operations first.
1102    pub fn for_each<F>(
1103        &mut self,
1104        sql: &str,
1105        sql_hash: u64,
1106        params: &[&(dyn Encode + Sync)],
1107        f: F,
1108    ) -> Result<(), DriverError>
1109    where
1110        F: FnMut(crate::types::PgDataRow<'_>) -> Result<(), DriverError>,
1111    {
1112        if self.deferred_count > 0 {
1113            self.flush_deferred()?;
1114        }
1115        self.guard.for_each(sql, sql_hash, params, f)
1116    }
1117
1118    /// Process each DataRow as raw bytes within a transaction.
1119    ///
1120    /// Automatically flushes any deferred operations first.
1121    pub fn for_each_raw<F>(
1122        &mut self,
1123        sql: &str,
1124        sql_hash: u64,
1125        params: &[&(dyn Encode + Sync)],
1126        f: F,
1127    ) -> Result<(), DriverError>
1128    where
1129        F: FnMut(&[u8]) -> Result<(), DriverError>,
1130    {
1131        if self.deferred_count > 0 {
1132            self.flush_deferred()?;
1133        }
1134        self.guard.for_each_raw(sql, sql_hash, params, f)
1135    }
1136
1137    /// Simple query within the transaction.
1138    ///
1139    /// Automatically flushes any deferred operations first.
1140    pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
1141        if self.deferred_count > 0 {
1142            self.flush_deferred()?;
1143        }
1144        self.guard.simple_query(sql)
1145    }
1146
1147    // --- Deferred pipeline API ---
1148
1149    /// Buffer an execute for deferred pipeline flush.
1150    ///
1151    /// The operation is not sent to the server immediately. Instead, the
1152    /// Bind+Execute message bytes are buffered internally. The buffered
1153    /// operations are sent as a single pipeline on [`commit()`](Self::commit)
1154    /// or [`flush_deferred()`](Self::flush_deferred).
1155    ///
1156    /// # Example
1157    ///
1158    /// ```no_run
1159    /// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
1160    /// # let pool = bsql_driver_postgres::Pool::connect("postgres://u:p@localhost/db")?;
1161    /// let mut tx = pool.begin()?;
1162    /// let sql = "INSERT INTO t (v) VALUES ($1)";
1163    /// let hash = bsql_driver_postgres::hash_sql(sql);
1164    ///
1165    /// // These are buffered, not sent:
1166    /// tx.defer_execute(sql, hash, &[&1i32])?;
1167    /// tx.defer_execute(sql, hash, &[&2i32])?;
1168    /// tx.defer_execute(sql, hash, &[&3i32])?;
1169    ///
1170    /// // commit() flushes all 3 as one pipeline + COMMIT = 2 round-trips total
1171    /// tx.commit()?;
1172    /// # Ok(())
1173    /// # }
1174    /// ```
1175    pub fn defer_execute(
1176        &mut self,
1177        sql: &str,
1178        sql_hash: u64,
1179        params: &[&(dyn Encode + Sync)],
1180    ) -> Result<(), DriverError> {
1181        if params.len() > i16::MAX as usize {
1182            return Err(DriverError::Protocol(format!(
1183                "parameter count {} exceeds maximum {}",
1184                params.len(),
1185                i16::MAX
1186            )));
1187        }
1188
1189        // Ensure statement is prepared (may require one round-trip on first call)
1190        self.guard.ensure_stmt_prepared(sql, sql_hash, params)?;
1191
1192        // Buffer the Bind+Execute bytes — no I/O
1193        self.guard
1194            .write_deferred_bind_execute(sql, sql_hash, params, &mut self.deferred_buf);
1195        self.deferred_count += 1;
1196        Ok(())
1197    }
1198
1199    /// Flush all deferred operations as a single pipeline.
1200    ///
1201    /// Sends all buffered Bind+Execute messages + one Sync in a single TCP write.
1202    /// Returns the affected row count for each deferred operation.
1203    pub fn flush_deferred(&mut self) -> Result<Vec<u64>, DriverError> {
1204        let count = self.deferred_count;
1205        self.deferred_count = 0;
1206        self.guard
1207            .flush_deferred_pipeline(&mut self.deferred_buf, count)
1208    }
1209
1210    /// Number of operations currently buffered for deferred execution.
1211    pub fn deferred_count(&self) -> usize {
1212        self.deferred_count
1213    }
1214}
1215
1216impl Drop for Transaction {
1217    fn drop(&mut self) {
1218        if !self.committed {
1219            // Connection is in an uncommitted transaction — discard it from the pool.
1220            // Take the connection out of the guard and drop it, decrementing open_count.
1221            if let Some(_slot) = self.guard.conn.take() {
1222                self.guard.pool.open_count.fetch_sub(1, Ordering::AcqRel);
1223                // Connection dropped — PG server will auto-rollback when it sees disconnect
1224            }
1225        }
1226    }
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231    use super::*;
1232
1233    #[test]
1234    fn pool_builder_requires_url() {
1235        let result = PoolBuilder::new().build();
1236        assert!(result.is_err());
1237    }
1238
1239    #[test]
1240    fn pool_builder_validates_url() {
1241        let result = PoolBuilder::new().url("not_a_url").build();
1242        assert!(result.is_err());
1243    }
1244
1245    #[test]
1246    fn pool_builder_accepts_valid_url() {
1247        let pool = PoolBuilder::new()
1248            .url("postgres://user:pass@localhost/db")
1249            .max_size(5)
1250            .build()
1251            .unwrap();
1252        assert_eq!(pool.max_size(), 5);
1253        assert_eq!(pool.open_count(), 0);
1254    }
1255
1256    #[test]
1257    fn pool_connect_validates_url() {
1258        let result = Pool::connect("not_a_url");
1259        assert!(result.is_err());
1260    }
1261
1262    #[test]
1263    fn pool_max_size_zero() {
1264        let pool = PoolBuilder::new()
1265            .url("postgres://user:pass@localhost/db")
1266            .max_size(0)
1267            .build()
1268            .unwrap();
1269
1270        let result = pool.acquire();
1271        assert!(result.is_err());
1272        match result {
1273            Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1274            Err(e) => panic!("expected Pool error, got: {e:?}"),
1275            Ok(_) => panic!("expected error, got Ok"),
1276        }
1277    }
1278
1279    #[test]
1280    fn pool_clone_shares_state() {
1281        let pool = PoolBuilder::new()
1282            .url("postgres://user:pass@localhost/db")
1283            .max_size(5)
1284            .build()
1285            .unwrap();
1286
1287        let pool2 = pool.clone();
1288        assert_eq!(pool.max_size(), pool2.max_size());
1289    }
1290
1291    // --- Audit gap tests ---
1292
1293    // #60: max_lifetime is configurable
1294    #[test]
1295    fn pool_builder_max_lifetime() {
1296        let pool = PoolBuilder::new()
1297            .url("postgres://user:pass@localhost/db")
1298            .max_lifetime(Some(Duration::from_secs(60)))
1299            .build()
1300            .unwrap();
1301        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(60)));
1302    }
1303
1304    // #60: max_lifetime None
1305    #[test]
1306    fn pool_builder_max_lifetime_none() {
1307        let pool = PoolBuilder::new()
1308            .url("postgres://user:pass@localhost/db")
1309            .max_lifetime(None)
1310            .build()
1311            .unwrap();
1312        assert_eq!(pool.inner.max_lifetime, None);
1313    }
1314
1315    // #62: acquire_timeout set to None (fail-fast)
1316    #[test]
1317    fn pool_builder_acquire_timeout_none() {
1318        let pool = PoolBuilder::new()
1319            .url("postgres://user:pass@localhost/db")
1320            .acquire_timeout(None)
1321            .build()
1322            .unwrap();
1323        assert_eq!(pool.inner.acquire_timeout, None);
1324    }
1325
1326    // #62: acquire_timeout custom value
1327    #[test]
1328    fn pool_builder_acquire_timeout_custom() {
1329        let pool = PoolBuilder::new()
1330            .url("postgres://user:pass@localhost/db")
1331            .acquire_timeout(Some(Duration::from_secs(10)))
1332            .build()
1333            .unwrap();
1334        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(10)));
1335    }
1336
1337    // #63: min_idle setting
1338    #[test]
1339    fn pool_builder_min_idle() {
1340        let pool = PoolBuilder::new()
1341            .url("postgres://user:pass@localhost/db")
1342            .min_idle(2)
1343            .build()
1344            .unwrap();
1345        assert_eq!(pool.inner.min_idle, 2);
1346    }
1347
1348    // #64: Pool close marks pool as closed
1349    #[test]
1350    fn pool_close_marks_closed() {
1351        let pool = PoolBuilder::new()
1352            .url("postgres://user:pass@localhost/db")
1353            .max_size(5)
1354            .build()
1355            .unwrap();
1356
1357        assert!(!pool.is_closed());
1358        pool.close();
1359        assert!(pool.is_closed());
1360
1361        // New acquires should fail
1362        let result = pool.acquire();
1363        assert!(result.is_err());
1364        match result {
1365            Err(DriverError::Pool(msg)) => assert!(msg.contains("closed")),
1366            Err(e) => panic!("expected Pool(closed) error, got: {e:?}"),
1367            Ok(_) => panic!("expected error, got Ok"),
1368        }
1369    }
1370
1371    // #67: PoolStatus idle/active counts
1372    #[test]
1373    fn pool_status_initial() {
1374        let pool = PoolBuilder::new()
1375            .url("postgres://user:pass@localhost/db")
1376            .max_size(10)
1377            .build()
1378            .unwrap();
1379
1380        let status = pool.status();
1381        assert_eq!(status.idle, 0);
1382        assert_eq!(status.active, 0);
1383        assert_eq!(status.open, 0);
1384        assert_eq!(status.max_size, 10);
1385    }
1386
1387    // Default pool builder values
1388    #[test]
1389    fn pool_builder_defaults() {
1390        let pool = PoolBuilder::new()
1391            .url("postgres://user:pass@localhost/db")
1392            .build()
1393            .unwrap();
1394
1395        assert_eq!(pool.max_size(), 10);
1396        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(30 * 60)));
1397        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(5)));
1398        assert_eq!(pool.inner.min_idle, 0);
1399    }
1400
1401    // Pool open_count starts at 0
1402    #[test]
1403    fn pool_open_count_initial() {
1404        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1405        assert_eq!(pool.open_count(), 0);
1406    }
1407
1408    // --- Task 7: max_stmt_cache_size ---
1409
1410    #[test]
1411    fn pool_builder_max_stmt_cache_size_default() {
1412        let pool = PoolBuilder::new()
1413            .url("postgres://user:pass@localhost/db")
1414            .build()
1415            .unwrap();
1416        assert_eq!(pool.inner.max_stmt_cache_size, 256);
1417    }
1418
1419    #[test]
1420    fn pool_builder_max_stmt_cache_size_custom() {
1421        let pool = PoolBuilder::new()
1422            .url("postgres://user:pass@localhost/db")
1423            .max_stmt_cache_size(512)
1424            .build()
1425            .unwrap();
1426        assert_eq!(pool.inner.max_stmt_cache_size, 512);
1427    }
1428
1429    // --- Auto-UDS detection tests ---
1430
1431    #[test]
1432    fn pool_is_uds_false_for_tcp() {
1433        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1434        assert!(!pool.is_uds());
1435    }
1436
1437    #[cfg(unix)]
1438    #[test]
1439    fn pool_is_uds_true_for_unix_socket() {
1440        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1441        assert!(pool.is_uds());
1442    }
1443
1444    #[cfg(unix)]
1445    #[test]
1446    fn pool_is_uds_true_for_var_run_socket() {
1447        let pool = Pool::connect("postgres://user@localhost/db?host=/var/run/postgresql").unwrap();
1448        assert!(pool.is_uds());
1449    }
1450
1451    #[test]
1452    fn pool_is_uds_false_for_ip_address() {
1453        let pool = Pool::connect("postgres://user:pass@127.0.0.1/db").unwrap();
1454        assert!(!pool.is_uds());
1455    }
1456
1457    #[cfg(unix)]
1458    #[test]
1459    fn pool_slot_sync_created_for_uds_config() {
1460        let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1461        assert!(config.host_is_uds());
1462    }
1463
1464    #[test]
1465    fn pool_slot_tcp_config() {
1466        let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1467        assert!(!config.host_is_uds());
1468    }
1469
1470    // ===============================================================
1471    // Pool::is_uds — extended tests
1472    // ===============================================================
1473
1474    #[test]
1475    fn pool_is_uds_false_for_hostname() {
1476        let pool = Pool::connect("postgres://user:pass@db.example.com/db").unwrap();
1477        assert!(!pool.is_uds());
1478    }
1479
1480    #[cfg(unix)]
1481    #[test]
1482    fn pool_is_uds_true_for_tmp() {
1483        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1484        assert!(pool.is_uds());
1485    }
1486
1487    // ===============================================================
1488    // Pool close semantics
1489    // ===============================================================
1490
1491    #[test]
1492    fn pool_close_then_acquire_fails() {
1493        let pool = PoolBuilder::new()
1494            .url("postgres://user:pass@localhost/db")
1495            .max_size(5)
1496            .build()
1497            .unwrap();
1498        pool.close();
1499        let result = pool.acquire();
1500        assert!(result.is_err());
1501        match result {
1502            Err(DriverError::Pool(msg)) => {
1503                assert!(msg.contains("closed"), "should say closed: {msg}")
1504            }
1505            Err(e) => panic!("expected Pool error, got: {e:?}"),
1506            Ok(_) => panic!("expected error"),
1507        }
1508    }
1509
1510    #[test]
1511    fn pool_is_closed_before_and_after() {
1512        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1513        assert!(!pool.is_closed());
1514        pool.close();
1515        assert!(pool.is_closed());
1516    }
1517
1518    // ===============================================================
1519    // Pool exhaustion (fail-fast without timeout)
1520    // ===============================================================
1521
1522    #[test]
1523    fn pool_exhausted_no_timeout() {
1524        let pool = PoolBuilder::new()
1525            .url("postgres://user:pass@localhost/db")
1526            .max_size(0)
1527            .acquire_timeout(None) // fail-fast
1528            .build()
1529            .unwrap();
1530        let result = pool.acquire();
1531        assert!(result.is_err());
1532        match result {
1533            Err(DriverError::Pool(msg)) => {
1534                assert!(msg.contains("exhausted"), "should say exhausted: {msg}")
1535            }
1536            Err(e) => panic!("expected Pool error, got: {e:?}"),
1537            Ok(_) => panic!("expected error"),
1538        }
1539    }
1540
1541    // ===============================================================
1542    // PoolBuilder validation
1543    // ===============================================================
1544
1545    #[test]
1546    fn pool_builder_no_url_error() {
1547        let result = PoolBuilder::new().max_size(5).build();
1548        assert!(result.is_err());
1549        match result {
1550            Err(DriverError::Pool(msg)) => {
1551                assert!(msg.contains("URL"), "should mention URL: {msg}")
1552            }
1553            Err(e) => panic!("expected Pool error, got: {e:?}"),
1554            Ok(_) => panic!("expected error"),
1555        }
1556    }
1557
1558    #[test]
1559    fn pool_builder_invalid_url_error() {
1560        let result = PoolBuilder::new().url("ftp://something").build();
1561        assert!(result.is_err());
1562    }
1563
1564    #[test]
1565    fn pool_builder_stmt_cache_size_zero() {
1566        let pool = PoolBuilder::new()
1567            .url("postgres://user:pass@localhost/db")
1568            .max_stmt_cache_size(0)
1569            .build()
1570            .unwrap();
1571        assert_eq!(pool.inner.max_stmt_cache_size, 0);
1572    }
1573
1574    // ===============================================================
1575    // PoolStatus
1576    // ===============================================================
1577
1578    #[test]
1579    fn pool_status_reflects_max_size() {
1580        let pool = PoolBuilder::new()
1581            .url("postgres://user:pass@localhost/db")
1582            .max_size(20)
1583            .build()
1584            .unwrap();
1585        let status = pool.status();
1586        assert_eq!(status.max_size, 20);
1587        assert_eq!(status.idle, 0);
1588        assert_eq!(status.active, 0);
1589        assert_eq!(status.open, 0);
1590    }
1591
1592    // ===============================================================
1593    // Pool clone
1594    // ===============================================================
1595
1596    #[test]
1597    fn pool_clone_shares_config() {
1598        let pool = PoolBuilder::new()
1599            .url("postgres://user:pass@localhost/db")
1600            .max_size(7)
1601            .build()
1602            .unwrap();
1603        let p2 = pool.clone();
1604        assert_eq!(pool.max_size(), 7);
1605        assert_eq!(p2.max_size(), 7);
1606        assert_eq!(pool.open_count(), p2.open_count());
1607    }
1608
1609    // ===============================================================
1610    // set_warmup_sqls
1611    // ===============================================================
1612
1613    #[test]
1614    fn pool_set_warmup_sqls_empty() {
1615        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1616        pool.set_warmup_sqls(&[]);
1617        let sqls = pool
1618            .inner
1619            .warmup_sqls
1620            .lock()
1621            .unwrap_or_else(|e| e.into_inner())
1622            .clone();
1623        assert!(sqls.is_empty());
1624    }
1625
1626    #[test]
1627    fn pool_set_warmup_sqls_multiple() {
1628        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1629        pool.set_warmup_sqls(&["SELECT 1", "SELECT 2", "SELECT 3"]);
1630        let sqls = pool
1631            .inner
1632            .warmup_sqls
1633            .lock()
1634            .unwrap_or_else(|e| e.into_inner())
1635            .clone();
1636        assert_eq!(sqls.len(), 3);
1637        assert_eq!(&*sqls[0], "SELECT 1");
1638        assert_eq!(&*sqls[1], "SELECT 2");
1639        assert_eq!(&*sqls[2], "SELECT 3");
1640    }
1641
1642    #[test]
1643    fn pool_set_warmup_sqls_overwrite() {
1644        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1645        pool.set_warmup_sqls(&["SELECT 1"]);
1646        pool.set_warmup_sqls(&["SELECT 99"]);
1647        let sqls = pool
1648            .inner
1649            .warmup_sqls
1650            .lock()
1651            .unwrap_or_else(|e| e.into_inner())
1652            .clone();
1653        assert_eq!(sqls.len(), 1);
1654        assert_eq!(&*sqls[0], "SELECT 99");
1655    }
1656
1657    // ===============================================================
1658    // PoolStatus Debug
1659    // ===============================================================
1660
1661    #[test]
1662    fn pool_status_debug() {
1663        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1664        let status = pool.status();
1665        let dbg = format!("{status:?}");
1666        assert!(dbg.contains("PoolStatus"));
1667        assert!(dbg.contains("idle"));
1668        assert!(dbg.contains("active"));
1669        assert!(dbg.contains("open"));
1670        assert!(dbg.contains("max_size"));
1671    }
1672
1673    // ===============================================================
1674    // Config host_is_uds via pool (structural tests)
1675    // ===============================================================
1676
1677    #[test]
1678    fn config_host_is_uds_returns_true_for_slash() {
1679        let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1680        assert!(config.host_is_uds());
1681    }
1682
1683    #[test]
1684    fn config_host_is_uds_returns_false_for_tcp() {
1685        let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1686        assert!(!config.host_is_uds());
1687    }
1688
1689    #[test]
1690    fn config_host_is_uds_returns_false_for_ip() {
1691        let config = Config::from_url("postgres://user:pass@192.168.1.1/db").unwrap();
1692        assert!(!config.host_is_uds());
1693    }
1694
1695    // ===============================================================
1696    // PoolBuilder chaining
1697    // ===============================================================
1698
1699    #[test]
1700    fn pool_builder_full_chain() {
1701        let pool = PoolBuilder::new()
1702            .url("postgres://user:pass@localhost/db")
1703            .max_size(3)
1704            .max_lifetime(Some(Duration::from_secs(600)))
1705            .acquire_timeout(Some(Duration::from_secs(5)))
1706            .min_idle(1)
1707            .max_stmt_cache_size(128)
1708            .build()
1709            .unwrap();
1710        assert_eq!(pool.max_size(), 3);
1711        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(600)));
1712        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(5)));
1713        assert_eq!(pool.inner.min_idle, 1);
1714        assert_eq!(pool.inner.max_stmt_cache_size, 128);
1715    }
1716
1717    // --- Audit: PoolGuard drop discards connections in bad state ---
1718
1719    #[test]
1720    fn pool_max_size_zero_rejects_all_acquires() {
1721        let pool = PoolBuilder::new()
1722            .url("postgres://user:pass@localhost/db")
1723            .max_size(0)
1724            .build()
1725            .unwrap();
1726        let result = pool.acquire();
1727        assert!(result.is_err());
1728        match &result {
1729            Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1730            _ => panic!("expected pool exhausted error"),
1731        }
1732    }
1733
1734    // --- Audit: URL parsing edge cases ---
1735
1736    #[test]
1737    fn url_parse_unknown_sslmode_returns_error() {
1738        let result = Config::from_url("postgres://u:p@h/d?sslmode=bogus");
1739        assert!(result.is_err());
1740        let msg = format!("{}", result.unwrap_err());
1741        assert!(msg.contains("unknown sslmode"));
1742    }
1743
1744    #[test]
1745    fn url_parse_invalid_port_returns_error() {
1746        let result = Config::from_url("postgres://u:p@h:abc/d");
1747        assert!(result.is_err());
1748        let msg = format!("{}", result.unwrap_err());
1749        assert!(msg.contains("invalid port"));
1750    }
1751
1752    #[test]
1753    fn url_parse_missing_at_sign_returns_error() {
1754        let result = Config::from_url("postgres://u:plocalhost/d");
1755        assert!(result.is_err());
1756        let msg = format!("{}", result.unwrap_err());
1757        assert!(msg.contains("missing @"));
1758    }
1759
1760    #[test]
1761    fn url_parse_empty_host_returns_error() {
1762        let result = Config::from_url("postgres://u:p@/d");
1763        assert!(result.is_err());
1764    }
1765
1766    #[test]
1767    fn url_parse_empty_user_returns_error() {
1768        let result = Config::from_url("postgres://:p@h/d");
1769        assert!(result.is_err());
1770    }
1771
1772    #[test]
1773    fn url_parse_statement_timeout_invalid_uses_default() {
1774        let config = Config::from_url("postgres://u:p@h/d?statement_timeout=notnum").unwrap();
1775        assert_eq!(config.statement_timeout_secs, 30);
1776    }
1777
1778    #[test]
1779    fn url_parse_malformed_percent_encoding() {
1780        let result = Config::from_url("postgres://u%:p@h/d");
1781        assert!(result.is_err());
1782    }
1783
1784    #[test]
1785    fn url_parse_invalid_hex_in_percent_encoding() {
1786        let result = Config::from_url("postgres://u%ZZ:p@h/d");
1787        assert!(result.is_err());
1788    }
1789}