Skip to main content

bsql_driver_postgres/
pool.rs

1//! Connection pool — LIFO ordering, Condvar-based waiting.
2//!
3//! The pool maintains a stack of idle connections. `acquire()` pops the top
4//! (most recently used = warmest caches). On drop, the guard pushes the
5//! connection back. If the pool is exhausted, callers wait on a `Condvar`
6//! up to `acquire_timeout` (default: 5 seconds). Set `acquire_timeout` to
7//! `None` for fail-fast behavior (immediate error when exhausted).
8
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use crate::arena::Arena;
14use crate::codec::Encode;
15use crate::conn::Connection;
16use crate::types::{Config, PgDataRow, QueryResult, SimpleRow};
17use crate::DriverError;
18
19#[cfg(feature = "async")]
20use crate::async_conn::AsyncConnection;
21
22// --- PoolSlot ---
23
24/// A connection slot — either sync (UDS/TCP) or async (TCP only).
25///
26/// The pool auto-detects: UDS hosts get sync `Connection`, TCP hosts get
27/// `AsyncConnection` (when the `async` feature is enabled). When `async`
28/// is disabled, all connections are sync.
29pub(crate) enum PoolSlot {
30    /// Sync connection (UDS or TCP without async feature).
31    Sync(Connection),
32    /// Async TCP connection (requires async feature + tokio runtime).
33    #[cfg(feature = "async")]
34    Async(AsyncConnection),
35}
36
37// --- N+1 Detection ---
38
39/// Tracks sequential repeats of the same `sql_hash` on a single connection
40/// checkout. When the same hash fires more than `threshold` times in a row,
41/// a warning is emitted. Fully `cfg`-gated — zero cost when disabled.
42#[cfg(feature = "detect-n-plus-one")]
43pub(crate) struct NPlusOneDetector {
44    last_query_hash: u64,
45    repeat_count: u16,
46    threshold: u16,
47}
48
49#[cfg(feature = "detect-n-plus-one")]
50impl NPlusOneDetector {
51    /// Create a new detector with the given warning threshold.
52    pub(crate) fn new(threshold: u16) -> Self {
53        Self {
54            last_query_hash: 0,
55            repeat_count: 0,
56            threshold,
57        }
58    }
59
60    /// Track a query execution. Call this at the start of every query method.
61    #[inline]
62    pub(crate) fn track(&mut self, sql_hash: u64) {
63        if sql_hash == self.last_query_hash {
64            self.repeat_count = self.repeat_count.saturating_add(1);
65        } else {
66            // Check previous run before resetting
67            self.emit_warning();
68            self.last_query_hash = sql_hash;
69            self.repeat_count = 1;
70        }
71    }
72
73    /// Check the final sequence on drop / connection return.
74    /// Returns `Some((hash, count))` if a warning should be emitted.
75    pub(crate) fn check_final(&self) -> Option<(u64, u16)> {
76        if self.repeat_count > self.threshold && self.last_query_hash != 0 {
77            Some((self.last_query_hash, self.repeat_count))
78        } else {
79            None
80        }
81    }
82
83    /// Emit a log warning if the current run exceeds the threshold.
84    #[cold]
85    #[inline(never)]
86    fn emit_warning(&self) {
87        if let Some((hash, count)) = self.check_final() {
88            log::warn!(
89                "[bsql] potential N+1 detected: sql_hash={:#018x} repeated {} times (threshold: {})",
90                hash,
91                count,
92                self.threshold,
93            );
94        }
95    }
96
97    /// Emit the final warning (called on drop).
98    #[cold]
99    #[inline(never)]
100    pub(crate) fn emit_final_warning(&self) {
101        self.emit_warning();
102    }
103}
104
105// --- Pool ---
106
107/// A connection pool with LIFO ordering and fail-fast semantics.
108///
109/// # Example
110///
111/// ```no_run
112/// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
113/// let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
114/// let mut conn = pool.acquire()?;
115/// conn.simple_query("SELECT 1")?;
116/// // conn is returned to pool on drop
117/// # Ok(())
118/// # }
119/// ```
120pub struct Pool {
121    inner: Arc<PoolInner>,
122}
123
124struct PoolInner {
125    /// Idle connections. Uses std::sync::Mutex because the critical section is
126    /// trivial (push/pop — no I/O). This lets PoolGuard::Drop return connections
127    /// synchronously.
128    stack: std::sync::Mutex<Vec<PoolSlot>>,
129    max_size: usize,
130    open_count: AtomicUsize,
131    config: Arc<Config>,
132    /// When true, no new acquires are accepted.
133    closed: AtomicBool,
134    /// Condvar pair for release notification. Waiters block on the Condvar
135    /// when the pool is exhausted and `acquire_timeout` is set.
136    release_pair: (std::sync::Mutex<()>, std::sync::Condvar),
137    /// Maximum lifetime of a connection. Connections older than this
138    /// are discarded when popped from the pool. Default: 30 minutes.
139    max_lifetime: Option<Duration>,
140    /// Maximum time to wait for a connection. Default: None (fail-fast).
141    acquire_timeout: Option<Duration>,
142    /// Minimum number of idle connections to maintain. Default: 0.
143    min_idle: usize,
144    /// SQL statements to PREPARE on new connections (warmup).
145    warmup_sqls: std::sync::Mutex<Arc<Vec<Box<str>>>>,
146    /// Maximum number of cached prepared statements per connection.
147    max_stmt_cache_size: usize,
148    /// Maximum idle duration before a connection is considered stale and discarded.
149    /// Connections idle longer than this are dropped on acquire. Default: 30 seconds.
150    stale_timeout: Duration,
151    /// Threshold for N+1 detection. When the same sql_hash fires more than
152    /// this many times sequentially on a single checkout, a warning is logged.
153    #[cfg(feature = "detect-n-plus-one")]
154    n_plus_one_threshold: u16,
155}
156
157impl Pool {
158    /// Create a pool from a connection URL with default settings (max_size = 10).
159    ///
160    /// Validates the URL but does not open any connections yet (lazy initialization).
161    pub fn connect(url: &str) -> Result<Self, DriverError> {
162        PoolBuilder::new().url(url).build()
163    }
164
165    /// Create a pool builder for custom configuration.
166    pub fn builder() -> PoolBuilder {
167        PoolBuilder::new()
168    }
169
170    /// Acquire a connection from the pool.
171    ///
172    /// Returns immediately with the most recently used idle connection (LIFO).
173    /// If no idle connections are available and the pool is below max_size, a new
174    /// connection is created. If the pool is at max_size and no `acquire_timeout`
175    /// is set, returns `DriverError::Pool` immediately. If `acquire_timeout` is
176    /// set, blocks until a connection is returned or the timeout expires.
177    #[inline]
178    pub fn acquire(&self) -> Result<PoolGuard, DriverError> {
179        if self.inner.closed.load(Ordering::Acquire) {
180            return Err(DriverError::Pool("pool is closed".into()));
181        }
182
183        // Try to pop an idle connection (fast path).
184        if let Some(guard) = self.try_pop_idle()? {
185            return Ok(guard);
186        }
187
188        // No idle connections — try to claim a slot with a proper CAS loop.
189        loop {
190            let current = self.inner.open_count.load(Ordering::Acquire);
191            if current >= self.inner.max_size {
192                if let Some(timeout) = self.inner.acquire_timeout {
193                    let (lock, cvar) = &self.inner.release_pair;
194                    let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
195                    let (_guard, result) = cvar
196                        .wait_timeout(guard, timeout)
197                        .unwrap_or_else(|e| e.into_inner());
198                    if result.timed_out() {
199                        return Err(DriverError::Pool(
200                            "pool exhausted: acquire timeout expired".into(),
201                        ));
202                    }
203                    // A connection was returned — try again
204                    if let Some(guard) = self.try_pop_idle()? {
205                        return Ok(guard);
206                    }
207                    // Popped nothing — retry CAS
208                    continue;
209                }
210                return Err(DriverError::Pool(
211                    "pool exhausted: all connections in use".into(),
212                ));
213            }
214            if self
215                .inner
216                .open_count
217                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
218                .is_ok()
219            {
220                break;
221            }
222            // CAS failed — another thread incremented. Retry.
223        }
224
225        // Open a new connection
226        let conn_result = Connection::connect_arc(self.inner.config.clone());
227        match conn_result {
228            Ok(mut conn) => {
229                // Configure statement cache size
230                conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
231                // Warmup: pre-PREPARE frequently used statements
232                self.warmup_conn(&mut conn);
233
234                Ok(PoolGuard {
235                    conn: Some(PoolSlot::Sync(conn)),
236                    pool: self.inner.clone(),
237                    discard: false,
238                    #[cfg(feature = "detect-n-plus-one")]
239                    detector: NPlusOneDetector::new(self.inner.n_plus_one_threshold),
240                })
241            }
242            Err(e) => {
243                // Give back the slot
244                self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
245                Err(e)
246            }
247        }
248    }
249
250    /// Try to pop a valid idle connection from the stack.
251    ///
252    /// Performs lifetime and stale checks. For connections idle > 5 seconds
253    /// (but within the stale timeout), sends an empty query as a health check
254    /// to verify the connection is still alive before returning it.
255    #[inline]
256    fn try_pop_idle(&self) -> Result<Option<PoolGuard>, DriverError> {
257        // Pop a candidate slot under the lock, performing only non-I/O checks
258        // (lifetime, stale timeout). The health check (network round-trip) happens
259        // AFTER the lock is released so other threads aren't blocked.
260        loop {
261            let (mut slot, needs_health_check) = {
262                let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
263                loop {
264                    let Some(slot) = stack.pop() else {
265                        return Ok(None);
266                    };
267                    let (created_at, idle_dur) = match &slot {
268                        PoolSlot::Sync(conn) => (conn.created_at(), conn.idle_duration()),
269                        #[cfg(feature = "async")]
270                        PoolSlot::Async(conn) => (conn.created_at(), conn.idle_duration()),
271                    };
272                    if let Some(max_lifetime) = self.inner.max_lifetime {
273                        if created_at.elapsed() >= max_lifetime {
274                            self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
275                            continue;
276                        }
277                    }
278                    if idle_dur >= self.inner.stale_timeout {
279                        // Stale connection — drop it, free the slot
280                        self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
281                        continue;
282                    }
283                    break (slot, idle_dur > Duration::from_secs(5));
284                }
285            };
286            // Lock is now released — health check happens outside the critical section.
287            // Sends an empty query — PG returns EmptyQueryResponse + ReadyForQuery.
288            // Fast: one round-trip, ~15us on UDS. Skip for hot connections.
289            if needs_health_check {
290                let alive = match &mut slot {
291                    PoolSlot::Sync(conn) => conn.simple_query("").is_ok(),
292                    #[cfg(feature = "async")]
293                    PoolSlot::Async(_) => true, // async connections are checked at I/O time
294                };
295                if !alive {
296                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
297                    continue; // retry — re-acquire lock and pop next slot
298                }
299            }
300            return Ok(Some(PoolGuard {
301                conn: Some(slot),
302                pool: self.inner.clone(),
303                discard: false,
304                #[cfg(feature = "detect-n-plus-one")]
305                detector: NPlusOneDetector::new(self.inner.n_plus_one_threshold),
306            }));
307        }
308    }
309
310    /// Whether this pool uses UDS connections.
311    ///
312    /// Returns `true` when the pool URL points to a Unix domain socket.
313    /// On non-Unix platforms, always returns `false`.
314    pub fn is_uds(&self) -> bool {
315        #[cfg(unix)]
316        {
317            self.inner.config.host_is_uds()
318        }
319        #[cfg(not(unix))]
320        {
321            false
322        }
323    }
324
325    /// Begin a transaction. Acquires a connection and sends BEGIN.
326    pub fn begin(&self) -> Result<Transaction, DriverError> {
327        let mut guard = self.acquire()?;
328        guard.simple_query("BEGIN")?;
329        Ok(Transaction {
330            guard,
331            committed: false,
332            deferred_buf: Vec::new(),
333            deferred_count: 0,
334        })
335    }
336
337    /// Current number of open connections (idle + in-use).
338    pub fn open_count(&self) -> usize {
339        self.inner.open_count.load(Ordering::Relaxed)
340    }
341
342    /// Maximum pool size.
343    pub fn max_size(&self) -> usize {
344        self.inner.max_size
345    }
346
347    /// Pool status metrics.
348    pub fn status(&self) -> PoolStatus {
349        let idle = self
350            .inner
351            .stack
352            .lock()
353            .unwrap_or_else(|e| e.into_inner())
354            .len();
355        let open = self.inner.open_count.load(Ordering::Relaxed);
356        let active = open.saturating_sub(idle);
357        PoolStatus {
358            idle,
359            active,
360            open,
361            max_size: self.inner.max_size,
362        }
363    }
364
365    /// Pre-PREPARE warmup statements on a new connection.
366    ///
367    /// Uses `prepare_batch()` to pipeline N × (Parse+Describe) + 1 × Sync
368    /// in a single round-trip, instead of N separate round-trips.
369    ///
370    /// Best-effort: errors are silently ignored.
371    /// The connection remains usable even if warmup fails.
372    fn warmup_conn(&self, conn: &mut Connection) {
373        let sqls = self
374            .inner
375            .warmup_sqls
376            .lock()
377            .unwrap_or_else(|e| e.into_inner())
378            .clone();
379
380        if sqls.is_empty() {
381            return;
382        }
383
384        let batch: Vec<(&str, u64)> = sqls
385            .iter()
386            .map(|sql| (sql.as_ref(), crate::types::hash_sql(sql)))
387            .collect();
388
389        let _ = conn.prepare_batch(&batch);
390    }
391
392    /// Set the SQL statements to pre-PREPARE on new connections.
393    ///
394    /// Each SQL string is PREPAREd (Parse+Describe+Sync) on new connections
395    /// before they are returned from `acquire()`. This eliminates the first-use
396    /// Parse overhead for frequently executed queries.
397    ///
398    /// Warmup errors are silently ignored — a bad warmup SQL must not prevent
399    /// the connection from being usable.
400    ///
401    /// # Example
402    ///
403    /// ```no_run
404    /// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
405    /// let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
406    /// pool.set_warmup_sqls([
407    ///     "SELECT id, name FROM users WHERE id = $1::int4",
408    ///     "SELECT id, title FROM tickets WHERE status = ANY($1::text[])",
409    /// ]);
410    /// # Ok(())
411    /// # }
412    /// ```
413    /// Set SQL statements to pre-PREPARE on new connections.
414    ///
415    /// Accepts any iterator of items convertible to `Box<str>`:
416    /// - `["SELECT 1", "SELECT 2"]` — static &str, copied into Box
417    /// - `[format!("SET search_path TO {}", name)]` — String, zero-copy move
418    pub fn set_warmup_sqls<S: Into<Box<str>>>(&self, sqls: impl IntoIterator<Item = S>) {
419        let boxed: Arc<Vec<Box<str>>> = Arc::new(sqls.into_iter().map(Into::into).collect());
420        *self
421            .inner
422            .warmup_sqls
423            .lock()
424            .unwrap_or_else(|e| e.into_inner()) = boxed;
425    }
426
427    /// Close the pool. No new acquires are accepted. All idle connections
428    /// are sent Terminate and dropped.
429    pub fn close(&self) {
430        self.inner.closed.store(true, Ordering::Release);
431        // Drain and close all idle connections
432        let slots: Vec<PoolSlot> = {
433            let mut stack = self.inner.stack.lock().unwrap_or_else(|e| e.into_inner());
434            std::mem::take(&mut *stack)
435        };
436        for slot in slots {
437            self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
438            match slot {
439                PoolSlot::Sync(conn) => {
440                    let _ = conn.close();
441                }
442                #[cfg(feature = "async")]
443                PoolSlot::Async(_conn) => {
444                    // AsyncConnection::close() is async — we can't await in sync close().
445                    // Drop will close the TCP socket, PG auto-cleans up.
446                }
447            }
448        }
449        // Notify any waiters so they get the "pool is closed" error
450        let (_, cvar) = &self.inner.release_pair;
451        cvar.notify_all();
452    }
453
454    /// Whether the pool has been closed.
455    pub fn is_closed(&self) -> bool {
456        self.inner.closed.load(Ordering::Acquire)
457    }
458
459    /// Acquire a connection from the pool (async).
460    ///
461    /// Auto-detects transport: UDS hosts get a sync `Connection`, TCP hosts
462    /// get an `AsyncConnection`. If the `async` feature is disabled, always
463    /// creates sync connections.
464    ///
465    /// Returns immediately with the most recently used idle connection (LIFO).
466    /// If no idle connections are available and the pool is below max_size, a new
467    /// connection is created.
468    #[cfg(feature = "async")]
469    pub async fn acquire_async(&self) -> Result<PoolGuard, DriverError> {
470        if self.inner.closed.load(Ordering::Acquire) {
471            return Err(DriverError::Pool("pool is closed".into()));
472        }
473
474        // Try to pop an idle connection (fast path).
475        if let Some(guard) = self.try_pop_idle()? {
476            return Ok(guard);
477        }
478
479        // No idle connections — try to claim a slot with a proper CAS loop.
480        loop {
481            let current = self.inner.open_count.load(Ordering::Acquire);
482            if current >= self.inner.max_size {
483                if let Some(timeout) = self.inner.acquire_timeout {
484                    let (lock, cvar) = &self.inner.release_pair;
485                    let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
486                    let (_guard, result) = cvar
487                        .wait_timeout(guard, timeout)
488                        .unwrap_or_else(|e| e.into_inner());
489                    if result.timed_out() {
490                        return Err(DriverError::Pool(
491                            "pool exhausted: acquire timeout expired".into(),
492                        ));
493                    }
494                    if let Some(guard) = self.try_pop_idle()? {
495                        return Ok(guard);
496                    }
497                    continue;
498                }
499                return Err(DriverError::Pool(
500                    "pool exhausted: all connections in use".into(),
501                ));
502            }
503            if self
504                .inner
505                .open_count
506                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
507                .is_ok()
508            {
509                break;
510            }
511        }
512
513        // Open a new connection — auto-detect UDS vs TCP
514        if self.inner.config.host_is_uds() {
515            // UDS — use sync Connection
516            let conn_result = Connection::connect_arc(self.inner.config.clone());
517            match conn_result {
518                Ok(mut conn) => {
519                    conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
520                    self.warmup_conn(&mut conn);
521                    Ok(PoolGuard {
522                        conn: Some(PoolSlot::Sync(conn)),
523                        pool: self.inner.clone(),
524                        discard: false,
525                        #[cfg(feature = "detect-n-plus-one")]
526                        detector: NPlusOneDetector::new(self.inner.n_plus_one_threshold),
527                    })
528                }
529                Err(e) => {
530                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
531                    Err(e)
532                }
533            }
534        } else {
535            // TCP — use AsyncConnection
536            let conn_result = AsyncConnection::connect_arc(self.inner.config.clone()).await;
537            match conn_result {
538                Ok(mut conn) => {
539                    conn.set_max_stmt_cache_size(self.inner.max_stmt_cache_size);
540                    Ok(PoolGuard {
541                        conn: Some(PoolSlot::Async(conn)),
542                        pool: self.inner.clone(),
543                        discard: false,
544                        #[cfg(feature = "detect-n-plus-one")]
545                        detector: NPlusOneDetector::new(self.inner.n_plus_one_threshold),
546                    })
547                }
548                Err(e) => {
549                    self.inner.open_count.fetch_sub(1, Ordering::AcqRel);
550                    Err(e)
551                }
552            }
553        }
554    }
555}
556
557impl Clone for Pool {
558    fn clone(&self) -> Self {
559        Pool {
560            inner: self.inner.clone(),
561        }
562    }
563}
564
565// --- PoolStatus ---
566
567/// Pool status metrics.
568#[derive(Debug, Clone, Copy)]
569pub struct PoolStatus {
570    /// Number of idle connections in the pool.
571    pub idle: usize,
572    /// Number of connections currently in use.
573    pub active: usize,
574    /// Total open connections (idle + active).
575    pub open: usize,
576    /// Maximum pool size.
577    pub max_size: usize,
578}
579
580// --- PoolBuilder ---
581
582/// Builder for configuring a connection pool.
583pub struct PoolBuilder {
584    url: Option<String>,
585    max_size: usize,
586    /// Maximum lifetime of a connection.
587    max_lifetime: Option<Duration>,
588    /// Maximum time to wait for a connection when pool is exhausted.
589    acquire_timeout: Option<Duration>,
590    /// Minimum number of idle connections to maintain.
591    min_idle: usize,
592    /// Maximum number of cached prepared statements per connection.
593    max_stmt_cache_size: usize,
594    /// Maximum idle duration before a connection is considered stale.
595    stale_timeout: Duration,
596    /// Threshold for N+1 detection warnings.
597    #[cfg(feature = "detect-n-plus-one")]
598    n_plus_one_threshold: Option<u16>,
599}
600
601impl PoolBuilder {
602    fn new() -> Self {
603        Self {
604            url: None,
605            max_size: 10,
606            max_lifetime: Some(Duration::from_secs(30 * 60)), // 30 min default
607            acquire_timeout: Some(Duration::from_secs(5)), // 5s default (matches common pool defaults)
608            min_idle: 0,                                   // no minimum by default
609            max_stmt_cache_size: 256,                      // LRU eviction at 256 stmts
610            stale_timeout: Duration::from_secs(30),        // 30s default
611            #[cfg(feature = "detect-n-plus-one")]
612            n_plus_one_threshold: None,
613        }
614    }
615
616    /// Set the connection URL.
617    pub fn url(mut self, url: &str) -> Self {
618        self.url = Some(url.to_owned());
619        self
620    }
621
622    /// Set the maximum pool size. Default: 10.
623    ///
624    /// A max_size of 0 means the pool will reject all acquire() calls immediately.
625    pub fn max_size(mut self, size: usize) -> Self {
626        self.max_size = size;
627        self
628    }
629
630    /// Set the maximum lifetime of a connection. Default: 30 minutes.
631    /// Set to None for unlimited lifetime.
632    pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
633        self.max_lifetime = lifetime;
634        self
635    }
636
637    /// Set the acquire timeout. Default: 5 seconds.
638    /// Set to None for fail-fast behavior when the pool is exhausted.
639    pub fn acquire_timeout(mut self, timeout: Option<Duration>) -> Self {
640        self.acquire_timeout = timeout;
641        self
642    }
643
644    /// Set the minimum number of idle connections. Default: 0.
645    /// When > 0, a background thread maintains this many idle connections.
646    pub fn min_idle(mut self, count: usize) -> Self {
647        self.min_idle = count;
648        self
649    }
650
651    /// Set the maximum number of cached prepared statements per connection.
652    /// Default: 256. When the cache exceeds this size, the least recently
653    /// used statement is evicted (Close sent to PG to free server memory).
654    pub fn max_stmt_cache_size(mut self, size: usize) -> Self {
655        self.max_stmt_cache_size = size;
656        self
657    }
658
659    /// Set the maximum idle duration before a connection is considered stale.
660    /// Default: 30 seconds. Connections idle longer than this are dropped on
661    /// acquire instead of being reused.
662    pub fn stale_timeout(mut self, timeout: Duration) -> Self {
663        self.stale_timeout = timeout;
664        self
665    }
666
667    /// Set the threshold for N+1 detection warnings.
668    ///
669    /// When the same `sql_hash` fires more than this many times sequentially
670    /// on a single connection checkout, a warning is logged. Default: 10.
671    #[cfg(feature = "detect-n-plus-one")]
672    pub fn n_plus_one_threshold(mut self, n: u16) -> Self {
673        self.n_plus_one_threshold = Some(n);
674        self
675    }
676
677    /// Build the pool. Validates the URL but does not open connections.
678    pub fn build(self) -> Result<Pool, DriverError> {
679        let url = self
680            .url
681            .ok_or_else(|| DriverError::Pool("pool builder requires a URL".into()))?;
682
683        let config = Arc::new(Config::from_url(&url)?);
684
685        let pool = Pool {
686            inner: Arc::new(PoolInner {
687                stack: std::sync::Mutex::new(Vec::with_capacity(self.max_size)),
688                max_size: self.max_size,
689                open_count: AtomicUsize::new(0),
690                config,
691                closed: AtomicBool::new(false),
692                release_pair: (std::sync::Mutex::new(()), std::sync::Condvar::new()),
693                max_lifetime: self.max_lifetime,
694                acquire_timeout: self.acquire_timeout,
695                min_idle: self.min_idle,
696                warmup_sqls: std::sync::Mutex::new(Arc::new(Vec::new())),
697                max_stmt_cache_size: self.max_stmt_cache_size,
698                stale_timeout: self.stale_timeout,
699                #[cfg(feature = "detect-n-plus-one")]
700                n_plus_one_threshold: self.n_plus_one_threshold.unwrap_or(10),
701            }),
702        };
703
704        if self.min_idle > 0 {
705            let inner = pool.inner.clone();
706            std::thread::spawn(move || {
707                maintain_min_idle(inner);
708            });
709        }
710
711        Ok(pool)
712    }
713}
714
715/// Background thread that maintains min_idle connections.
716fn maintain_min_idle(inner: Arc<PoolInner>) {
717    loop {
718        if inner.closed.load(Ordering::Acquire) {
719            return;
720        }
721
722        let idle_count = inner.stack.lock().unwrap_or_else(|e| e.into_inner()).len();
723        let needed = inner.min_idle.saturating_sub(idle_count);
724
725        for _ in 0..needed {
726            if inner.closed.load(Ordering::Acquire) {
727                return;
728            }
729            let current = inner.open_count.load(Ordering::Acquire);
730            if current >= inner.max_size {
731                break;
732            }
733            if inner
734                .open_count
735                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
736                .is_err()
737            {
738                continue;
739            }
740
741            match Connection::connect_arc(inner.config.clone()) {
742                Ok(conn) => {
743                    let mut stack = inner.stack.lock().unwrap_or_else(|e| e.into_inner());
744                    stack.push(PoolSlot::Sync(conn));
745                    let (_, cvar) = &inner.release_pair;
746                    cvar.notify_one();
747                }
748                Err(_) => {
749                    inner.open_count.fetch_sub(1, Ordering::AcqRel);
750                }
751            }
752        }
753
754        // Check every 1 second. Shorter interval ensures the thread exits promptly
755        // when pool.closed is set (worst-case 1s delay instead of 5s).
756        std::thread::sleep(Duration::from_secs(1));
757    }
758}
759
760// --- PoolGuard ---
761
762/// A borrowed connection from the pool. Returns to the pool on drop.
763///
764/// If the connection is in a failed transaction state, broken, or marked for
765/// discard, it is dropped (decrements open_count) instead of returned.
766pub struct PoolGuard {
767    conn: Option<PoolSlot>,
768    pool: Arc<PoolInner>,
769    /// When true, the connection is dropped instead of returned to the pool.
770    discard: bool,
771    /// Tracks sequential repeats of the same sql_hash for N+1 detection.
772    #[cfg(feature = "detect-n-plus-one")]
773    detector: NPlusOneDetector,
774}
775
776impl PoolGuard {
777    /// Get a reference to the inner sync connection. Panics if the slot
778    /// holds an async connection.
779    #[inline]
780    fn sync_conn(&self) -> Result<&Connection, DriverError> {
781        match self.conn.as_ref() {
782            Some(PoolSlot::Sync(conn)) => Ok(conn),
783            #[cfg(feature = "async")]
784            Some(PoolSlot::Async(_)) => Err(DriverError::Pool(
785                "expected sync connection, got async; use async methods".into(),
786            )),
787            None => Err(DriverError::Pool("connection already taken".into())),
788        }
789    }
790
791    /// Get a mutable reference to the inner sync connection.
792    #[inline]
793    fn sync_conn_mut(&mut self) -> Result<&mut Connection, DriverError> {
794        match self.conn.as_mut() {
795            Some(PoolSlot::Sync(conn)) => Ok(conn),
796            #[cfg(feature = "async")]
797            Some(PoolSlot::Async(_)) => Err(DriverError::Pool(
798                "expected sync connection, got async; use async methods".into(),
799            )),
800            None => Err(DriverError::Pool("connection already taken".into())),
801        }
802    }
803
804    /// Mark this connection for discard — it will NOT be returned to the pool
805    /// on drop. The open_count is decremented and the TCP connection is closed.
806    pub fn mark_discard(&mut self) {
807        self.discard = true;
808    }
809
810    /// Cancel the currently running query on the underlying connection.
811    ///
812    /// Opens a new TCP connection and sends a CancelRequest to PG.
813    /// The cancel connection is closed immediately after.
814    pub fn cancel(&self) -> Result<(), DriverError> {
815        self.sync_conn()?.cancel()
816    }
817
818    // --- Introspection dispatch methods ---
819
820    /// Get the backend process ID for this connection.
821    ///
822    /// # Panics
823    ///
824    /// Panics if the connection has already been returned to the pool (Drop ran).
825    /// This cannot happen in safe code because `PoolGuard` owns the connection.
826    pub fn pid(&self) -> i32 {
827        match self.conn.as_ref().expect("connection returned to pool") {
828            PoolSlot::Sync(conn) => conn.pid(),
829            #[cfg(feature = "async")]
830            PoolSlot::Async(conn) => conn.pid(),
831        }
832    }
833
834    /// Whether the connection is idle (not in a transaction).
835    ///
836    /// # Panics
837    ///
838    /// Panics if the connection has already been returned to the pool (Drop ran).
839    /// This cannot happen in safe code because `PoolGuard` owns the connection.
840    pub fn is_idle(&self) -> bool {
841        match self.conn.as_ref().expect("connection returned to pool") {
842            PoolSlot::Sync(conn) => conn.is_idle(),
843            #[cfg(feature = "async")]
844            PoolSlot::Async(conn) => conn.is_idle(),
845        }
846    }
847
848    /// Whether the connection is inside a transaction.
849    ///
850    /// # Panics
851    ///
852    /// Panics if the connection has already been returned to the pool (Drop ran).
853    /// This cannot happen in safe code because `PoolGuard` owns the connection.
854    pub fn is_in_transaction(&self) -> bool {
855        match self.conn.as_ref().expect("connection returned to pool") {
856            PoolSlot::Sync(conn) => conn.is_in_transaction(),
857            #[cfg(feature = "async")]
858            PoolSlot::Async(conn) => conn.is_in_transaction(),
859        }
860    }
861
862    // --- Sync query dispatch methods ---
863
864    /// Execute a prepared query and return rows.
865    #[inline]
866    pub fn query(
867        &mut self,
868        sql: &str,
869        sql_hash: u64,
870        params: &[&(dyn Encode + Sync)],
871    ) -> Result<QueryResult, DriverError> {
872        #[cfg(feature = "detect-n-plus-one")]
873        self.detector.track(sql_hash);
874        self.sync_conn_mut()?.query(sql, sql_hash, params)
875    }
876
877    /// Execute a query without result rows (INSERT/UPDATE/DELETE).
878    #[inline]
879    pub fn execute(
880        &mut self,
881        sql: &str,
882        sql_hash: u64,
883        params: &[&(dyn Encode + Sync)],
884    ) -> Result<u64, DriverError> {
885        #[cfg(feature = "detect-n-plus-one")]
886        self.detector.track(sql_hash);
887        self.sync_conn_mut()?.execute(sql, sql_hash, params)
888    }
889
890    /// Execute the same statement N times with different params in one pipeline.
891    ///
892    /// Sends all N Bind+Execute messages + one Sync. One round-trip for N operations.
893    /// Returns the affected row count for each parameter set.
894    pub fn execute_pipeline(
895        &mut self,
896        sql: &str,
897        sql_hash: u64,
898        param_sets: &[&[&(dyn Encode + Sync)]],
899    ) -> Result<Vec<u64>, DriverError> {
900        #[cfg(feature = "detect-n-plus-one")]
901        self.detector.track(sql_hash);
902        self.sync_conn_mut()?
903            .execute_pipeline(sql, sql_hash, param_sets)
904    }
905
906    /// Execute a simple (unprepared) query.
907    pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
908        self.sync_conn_mut()?.simple_query(sql)
909    }
910
911    /// Execute a simple query and return rows as text.
912    ///
913    /// Uses PostgreSQL's simple query protocol — all values are strings.
914    pub fn simple_query_rows(&mut self, sql: &str) -> Result<Vec<SimpleRow>, DriverError> {
915        self.sync_conn_mut()?.simple_query_rows(sql)
916    }
917
918    /// Process each row via a closure with zero-copy `PgDataRow`.
919    pub fn for_each<F>(
920        &mut self,
921        sql: &str,
922        sql_hash: u64,
923        params: &[&(dyn Encode + Sync)],
924        f: F,
925    ) -> Result<(), DriverError>
926    where
927        F: FnMut(PgDataRow<'_>) -> Result<(), DriverError>,
928    {
929        #[cfg(feature = "detect-n-plus-one")]
930        self.detector.track(sql_hash);
931        self.sync_conn_mut()?.for_each(sql, sql_hash, params, f)
932    }
933
934    /// Process each DataRow as raw bytes — fastest path.
935    pub fn for_each_raw<F>(
936        &mut self,
937        sql: &str,
938        sql_hash: u64,
939        params: &[&(dyn Encode + Sync)],
940        f: F,
941    ) -> Result<(), DriverError>
942    where
943        F: FnMut(&[u8]) -> Result<(), DriverError>,
944    {
945        #[cfg(feature = "detect-n-plus-one")]
946        self.detector.track(sql_hash);
947        self.sync_conn_mut()?.for_each_raw(sql, sql_hash, params, f)
948    }
949
950    // --- Streaming ---
951
952    /// Start a streaming query.
953    pub fn query_streaming_start(
954        &mut self,
955        sql: &str,
956        sql_hash: u64,
957        params: &[&(dyn Encode + Sync)],
958        chunk_size: i32,
959    ) -> Result<(std::sync::Arc<[crate::types::ColumnDesc]>, bool), DriverError> {
960        #[cfg(feature = "detect-n-plus-one")]
961        self.detector.track(sql_hash);
962        self.sync_conn_mut()?
963            .query_streaming_start(sql, sql_hash, params, chunk_size)
964    }
965
966    /// Send Execute+Flush for a streaming query (2nd+ chunks).
967    pub fn streaming_send_execute(&mut self, chunk_size: i32) -> Result<(), DriverError> {
968        self.sync_conn_mut()?.streaming_send_execute(chunk_size)
969    }
970
971    /// Read the next chunk of rows from an in-progress streaming query.
972    pub fn streaming_next_chunk(
973        &mut self,
974        arena: &mut Arena,
975        all_col_offsets: &mut Vec<(usize, i32)>,
976    ) -> Result<bool, DriverError> {
977        self.sync_conn_mut()?
978            .streaming_next_chunk(arena, all_col_offsets)
979    }
980
981    // --- COPY protocol ---
982
983    /// Bulk copy data INTO a table from an iterator of text rows.
984    ///
985    /// Each row is a tab-separated string (TSV format). Returns the row count.
986    pub fn copy_in<'a, I>(
987        &mut self,
988        table: &str,
989        columns: &[&str],
990        rows: I,
991    ) -> Result<u64, DriverError>
992    where
993        I: IntoIterator<Item = &'a str>,
994    {
995        self.sync_conn_mut()?.copy_in(table, columns, rows)
996    }
997
998    /// Bulk copy data OUT of a table/query to a writer.
999    ///
1000    /// Writes TSV-formatted rows. Returns the row count.
1001    pub fn copy_out<W: std::io::Write>(
1002        &mut self,
1003        query: &str,
1004        writer: &mut W,
1005    ) -> Result<u64, DriverError> {
1006        self.sync_conn_mut()?.copy_out(query, writer)
1007    }
1008
1009    /// Whether this guard holds a sync connection.
1010    pub fn is_sync(&self) -> bool {
1011        matches!(self.conn.as_ref(), Some(PoolSlot::Sync(_)))
1012    }
1013
1014    /// Whether this guard holds an async connection.
1015    #[cfg(feature = "async")]
1016    pub fn is_async(&self) -> bool {
1017        matches!(self.conn.as_ref(), Some(PoolSlot::Async(_)))
1018    }
1019
1020    // --- Async query dispatch methods ---
1021
1022    /// Execute a prepared query and return rows (async).
1023    ///
1024    /// Auto-dispatches: sync connections use blocking I/O, async connections
1025    /// use tokio I/O. Returns an error if the guard holds a sync connection
1026    /// and this method is called.
1027    #[cfg(feature = "async")]
1028    pub async fn query_async(
1029        &mut self,
1030        sql: &str,
1031        sql_hash: u64,
1032        params: &[&(dyn Encode + Sync)],
1033    ) -> Result<QueryResult, DriverError> {
1034        #[cfg(feature = "detect-n-plus-one")]
1035        self.detector.track(sql_hash);
1036        match self.conn.as_mut() {
1037            Some(PoolSlot::Sync(conn)) => conn.query(sql, sql_hash, params),
1038            Some(PoolSlot::Async(conn)) => conn.query(sql, sql_hash, params).await,
1039            None => Err(DriverError::Pool("connection already taken".into())),
1040        }
1041    }
1042
1043    /// Execute without result rows (async).
1044    #[cfg(feature = "async")]
1045    pub async fn execute_async(
1046        &mut self,
1047        sql: &str,
1048        sql_hash: u64,
1049        params: &[&(dyn Encode + Sync)],
1050    ) -> Result<u64, DriverError> {
1051        #[cfg(feature = "detect-n-plus-one")]
1052        self.detector.track(sql_hash);
1053        match self.conn.as_mut() {
1054            Some(PoolSlot::Sync(conn)) => conn.execute(sql, sql_hash, params),
1055            Some(PoolSlot::Async(conn)) => conn.execute(sql, sql_hash, params).await,
1056            None => Err(DriverError::Pool("connection already taken".into())),
1057        }
1058    }
1059
1060    /// Execute a simple query (async).
1061    #[cfg(feature = "async")]
1062    pub async fn simple_query_async(&mut self, sql: &str) -> Result<(), DriverError> {
1063        match self.conn.as_mut() {
1064            Some(PoolSlot::Sync(conn)) => conn.simple_query(sql),
1065            Some(PoolSlot::Async(conn)) => conn.simple_query(sql).await,
1066            None => Err(DriverError::Pool("connection already taken".into())),
1067        }
1068    }
1069
1070    // --- Deferred pipeline support ---
1071
1072    /// Ensure a statement is prepared and cached.
1073    pub(crate) fn ensure_stmt_prepared(
1074        &mut self,
1075        sql: &str,
1076        sql_hash: u64,
1077        params: &[&(dyn Encode + Sync)],
1078    ) -> Result<[u8; 18], DriverError> {
1079        self.sync_conn_mut()?
1080            .ensure_stmt_prepared(sql, sql_hash, params)
1081    }
1082
1083    /// Write Bind+Execute bytes for a prepared statement into an external buffer.
1084    pub(crate) fn write_deferred_bind_execute(
1085        &self,
1086        sql: &str,
1087        sql_hash: u64,
1088        params: &[&(dyn Encode + Sync)],
1089        buf: &mut Vec<u8>,
1090    ) -> Result<(), DriverError> {
1091        let conn = self.sync_conn()?;
1092        conn.write_deferred_bind_execute(sql, sql_hash, params, buf)
1093    }
1094
1095    /// Flush a buffer of deferred Bind+Execute messages as a single pipeline.
1096    pub(crate) fn flush_deferred_pipeline(
1097        &mut self,
1098        buf: &mut Vec<u8>,
1099        count: usize,
1100    ) -> Result<Vec<u64>, DriverError> {
1101        self.sync_conn_mut()?.flush_deferred_pipeline(buf, count)
1102    }
1103}
1104
1105impl Drop for PoolGuard {
1106    fn drop(&mut self) {
1107        #[cfg(feature = "detect-n-plus-one")]
1108        self.detector.emit_final_warning();
1109
1110        if let Some(slot) = self.conn.take() {
1111            // Check discard conditions based on slot type.
1112            let should_discard = self.discard
1113                || self.pool.closed.load(Ordering::Acquire)
1114                || match &slot {
1115                    PoolSlot::Sync(conn) => {
1116                        conn.is_in_failed_transaction()
1117                            || conn.is_in_transaction()
1118                            || conn.is_streaming()
1119                    }
1120                    #[cfg(feature = "async")]
1121                    PoolSlot::Async(conn) => {
1122                        conn.is_in_failed_transaction() || conn.is_in_transaction()
1123                    }
1124                };
1125
1126            if should_discard {
1127                self.pool.open_count.fetch_sub(1, Ordering::AcqRel);
1128                return;
1129            }
1130
1131            // Stamp last-used time for idle connection tracking.
1132            // Amortized: only call Instant::now() every 64 returns.
1133            let mut slot = slot;
1134            match &mut slot {
1135                PoolSlot::Sync(conn) => {
1136                    if conn.query_counter() & 63 == 0 {
1137                        conn.touch();
1138                    }
1139                }
1140                #[cfg(feature = "async")]
1141                PoolSlot::Async(conn) => {
1142                    if conn.query_counter() & 63 == 0 {
1143                        conn.touch();
1144                    }
1145                }
1146            }
1147
1148            // Return to pool
1149            {
1150                let mut stack = self.pool.stack.lock().unwrap_or_else(|e| e.into_inner());
1151                stack.push(slot);
1152            }
1153
1154            // Notify waiters only if pool was exhausted (someone might be waiting).
1155            if self.pool.open_count.load(Ordering::Relaxed) >= self.pool.max_size {
1156                let (_, cvar) = &self.pool.release_pair;
1157                cvar.notify_one();
1158            }
1159        }
1160    }
1161}
1162
1163// --- Transaction ---
1164
1165/// A database transaction. Sends ROLLBACK on drop if not committed.
1166///
1167/// # Example
1168///
1169/// ```no_run
1170/// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
1171/// # let pool = bsql_driver_postgres::Pool::connect("postgres://user:pass@localhost/db")?;
1172/// let mut tx = pool.begin()?;
1173/// tx.simple_query("INSERT INTO t VALUES (1)")?;
1174/// tx.commit()?;
1175/// # Ok(())
1176/// # }
1177/// ```
1178pub struct Transaction {
1179    guard: PoolGuard,
1180    committed: bool,
1181    /// Accumulated Bind+Execute message bytes for deferred operations.
1182    deferred_buf: Vec<u8>,
1183    /// Number of deferred operations buffered.
1184    deferred_count: usize,
1185}
1186
1187impl Transaction {
1188    /// Commit the transaction.
1189    ///
1190    /// Automatically flushes any deferred operations before committing.
1191    pub fn commit(mut self) -> Result<(), DriverError> {
1192        if self.deferred_count > 0 {
1193            self.flush_deferred()?;
1194        }
1195        self.guard.simple_query("COMMIT")?;
1196        self.committed = true;
1197        Ok(())
1198    }
1199
1200    /// Rollback the transaction explicitly.
1201    ///
1202    /// Discards any deferred operations without sending them.
1203    pub fn rollback(mut self) -> Result<(), DriverError> {
1204        self.deferred_buf.clear();
1205        self.deferred_count = 0;
1206        self.guard.simple_query("ROLLBACK")?;
1207        self.committed = true; // prevent double rollback in drop
1208        Ok(())
1209    }
1210
1211    /// Execute a prepared query within the transaction.
1212    ///
1213    /// Automatically flushes any deferred operations before executing the query,
1214    /// ensuring read-your-writes consistency.
1215    pub fn query(
1216        &mut self,
1217        sql: &str,
1218        sql_hash: u64,
1219        params: &[&(dyn Encode + Sync)],
1220    ) -> Result<QueryResult, DriverError> {
1221        if self.deferred_count > 0 {
1222            self.flush_deferred()?;
1223        }
1224        self.guard.query(sql, sql_hash, params)
1225    }
1226
1227    /// Execute without result rows within the transaction.
1228    pub fn execute(
1229        &mut self,
1230        sql: &str,
1231        sql_hash: u64,
1232        params: &[&(dyn Encode + Sync)],
1233    ) -> Result<u64, DriverError> {
1234        self.guard.execute(sql, sql_hash, params)
1235    }
1236
1237    /// Execute the same statement N times with different params in one pipeline.
1238    pub fn execute_pipeline(
1239        &mut self,
1240        sql: &str,
1241        sql_hash: u64,
1242        param_sets: &[&[&(dyn Encode + Sync)]],
1243    ) -> Result<Vec<u64>, DriverError> {
1244        self.guard.execute_pipeline(sql, sql_hash, param_sets)
1245    }
1246
1247    /// Process each row directly from the wire buffer within a transaction.
1248    ///
1249    /// Automatically flushes any deferred operations first.
1250    pub fn for_each<F>(
1251        &mut self,
1252        sql: &str,
1253        sql_hash: u64,
1254        params: &[&(dyn Encode + Sync)],
1255        f: F,
1256    ) -> Result<(), DriverError>
1257    where
1258        F: FnMut(crate::types::PgDataRow<'_>) -> Result<(), DriverError>,
1259    {
1260        if self.deferred_count > 0 {
1261            self.flush_deferred()?;
1262        }
1263        self.guard.for_each(sql, sql_hash, params, f)
1264    }
1265
1266    /// Process each DataRow as raw bytes within a transaction.
1267    ///
1268    /// Automatically flushes any deferred operations first.
1269    pub fn for_each_raw<F>(
1270        &mut self,
1271        sql: &str,
1272        sql_hash: u64,
1273        params: &[&(dyn Encode + Sync)],
1274        f: F,
1275    ) -> Result<(), DriverError>
1276    where
1277        F: FnMut(&[u8]) -> Result<(), DriverError>,
1278    {
1279        if self.deferred_count > 0 {
1280            self.flush_deferred()?;
1281        }
1282        self.guard.for_each_raw(sql, sql_hash, params, f)
1283    }
1284
1285    /// Simple query within the transaction.
1286    ///
1287    /// Automatically flushes any deferred operations first.
1288    pub fn simple_query(&mut self, sql: &str) -> Result<(), DriverError> {
1289        if self.deferred_count > 0 {
1290            self.flush_deferred()?;
1291        }
1292        self.guard.simple_query(sql)
1293    }
1294
1295    // --- Deferred pipeline API ---
1296
1297    /// Buffer an execute for deferred pipeline flush.
1298    ///
1299    /// The operation is not sent to the server immediately. Instead, the
1300    /// Bind+Execute message bytes are buffered internally. The buffered
1301    /// operations are sent as a single pipeline on [`commit()`](Self::commit)
1302    /// or [`flush_deferred()`](Self::flush_deferred).
1303    ///
1304    /// # Example
1305    ///
1306    /// ```no_run
1307    /// # fn example() -> Result<(), bsql_driver_postgres::DriverError> {
1308    /// # let pool = bsql_driver_postgres::Pool::connect("postgres://u:p@localhost/db")?;
1309    /// let mut tx = pool.begin()?;
1310    /// let sql = "INSERT INTO t (v) VALUES ($1)";
1311    /// let hash = bsql_driver_postgres::hash_sql(sql);
1312    ///
1313    /// // These are buffered, not sent:
1314    /// tx.defer_execute(sql, hash, &[&1i32])?;
1315    /// tx.defer_execute(sql, hash, &[&2i32])?;
1316    /// tx.defer_execute(sql, hash, &[&3i32])?;
1317    ///
1318    /// // commit() flushes all 3 as one pipeline + COMMIT = 2 round-trips total
1319    /// tx.commit()?;
1320    /// # Ok(())
1321    /// # }
1322    /// ```
1323    pub fn defer_execute(
1324        &mut self,
1325        sql: &str,
1326        sql_hash: u64,
1327        params: &[&(dyn Encode + Sync)],
1328    ) -> Result<(), DriverError> {
1329        if params.len() > i16::MAX as usize {
1330            return Err(DriverError::Protocol(format!(
1331                "parameter count {} exceeds maximum {}",
1332                params.len(),
1333                i16::MAX
1334            )));
1335        }
1336
1337        // Ensure statement is prepared (may require one round-trip on first call)
1338        self.guard.ensure_stmt_prepared(sql, sql_hash, params)?;
1339
1340        // Buffer the Bind+Execute bytes — no I/O
1341        self.guard
1342            .write_deferred_bind_execute(sql, sql_hash, params, &mut self.deferred_buf)?;
1343        self.deferred_count += 1;
1344        Ok(())
1345    }
1346
1347    /// Flush all deferred operations as a single pipeline.
1348    ///
1349    /// Sends all buffered Bind+Execute messages + one Sync in a single TCP write.
1350    /// Returns the affected row count for each deferred operation.
1351    pub fn flush_deferred(&mut self) -> Result<Vec<u64>, DriverError> {
1352        let count = self.deferred_count;
1353        self.deferred_count = 0;
1354        self.guard
1355            .flush_deferred_pipeline(&mut self.deferred_buf, count)
1356    }
1357
1358    /// Number of operations currently buffered for deferred execution.
1359    pub fn deferred_count(&self) -> usize {
1360        self.deferred_count
1361    }
1362}
1363
1364impl Drop for Transaction {
1365    fn drop(&mut self) {
1366        if !self.committed {
1367            // Connection is in an uncommitted transaction — discard it from the pool.
1368            // Take the connection out of the guard and drop it, decrementing open_count.
1369            if let Some(_slot) = self.guard.conn.take() {
1370                self.guard.pool.open_count.fetch_sub(1, Ordering::AcqRel);
1371                // Connection dropped — PG server will auto-rollback when it sees disconnect
1372            }
1373        }
1374    }
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379    use super::*;
1380
1381    #[test]
1382    fn pool_builder_requires_url() {
1383        let result = PoolBuilder::new().build();
1384        assert!(result.is_err());
1385    }
1386
1387    #[test]
1388    fn pool_builder_validates_url() {
1389        let result = PoolBuilder::new().url("not_a_url").build();
1390        assert!(result.is_err());
1391    }
1392
1393    #[test]
1394    fn pool_builder_accepts_valid_url() {
1395        let pool = PoolBuilder::new()
1396            .url("postgres://user:pass@localhost/db")
1397            .max_size(5)
1398            .build()
1399            .unwrap();
1400        assert_eq!(pool.max_size(), 5);
1401        assert_eq!(pool.open_count(), 0);
1402    }
1403
1404    #[test]
1405    fn pool_connect_validates_url() {
1406        let result = Pool::connect("not_a_url");
1407        assert!(result.is_err());
1408    }
1409
1410    #[test]
1411    fn pool_max_size_zero() {
1412        let pool = PoolBuilder::new()
1413            .url("postgres://user:pass@localhost/db")
1414            .max_size(0)
1415            .build()
1416            .unwrap();
1417
1418        let result = pool.acquire();
1419        assert!(result.is_err());
1420        match result {
1421            Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
1422            Err(e) => panic!("expected Pool error, got: {e:?}"),
1423            Ok(_) => panic!("expected error, got Ok"),
1424        }
1425    }
1426
1427    #[test]
1428    fn pool_clone_shares_state() {
1429        let pool = PoolBuilder::new()
1430            .url("postgres://user:pass@localhost/db")
1431            .max_size(5)
1432            .build()
1433            .unwrap();
1434
1435        let pool2 = pool.clone();
1436        assert_eq!(pool.max_size(), pool2.max_size());
1437    }
1438
1439    // --- Audit gap tests ---
1440
1441    // #60: max_lifetime is configurable
1442    #[test]
1443    fn pool_builder_max_lifetime() {
1444        let pool = PoolBuilder::new()
1445            .url("postgres://user:pass@localhost/db")
1446            .max_lifetime(Some(Duration::from_secs(60)))
1447            .build()
1448            .unwrap();
1449        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(60)));
1450    }
1451
1452    // #60: max_lifetime None
1453    #[test]
1454    fn pool_builder_max_lifetime_none() {
1455        let pool = PoolBuilder::new()
1456            .url("postgres://user:pass@localhost/db")
1457            .max_lifetime(None)
1458            .build()
1459            .unwrap();
1460        assert_eq!(pool.inner.max_lifetime, None);
1461    }
1462
1463    // #62: acquire_timeout set to None (fail-fast)
1464    #[test]
1465    fn pool_builder_acquire_timeout_none() {
1466        let pool = PoolBuilder::new()
1467            .url("postgres://user:pass@localhost/db")
1468            .acquire_timeout(None)
1469            .build()
1470            .unwrap();
1471        assert_eq!(pool.inner.acquire_timeout, None);
1472    }
1473
1474    // #62: acquire_timeout custom value
1475    #[test]
1476    fn pool_builder_acquire_timeout_custom() {
1477        let pool = PoolBuilder::new()
1478            .url("postgres://user:pass@localhost/db")
1479            .acquire_timeout(Some(Duration::from_secs(10)))
1480            .build()
1481            .unwrap();
1482        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(10)));
1483    }
1484
1485    // #63: min_idle setting
1486    #[test]
1487    fn pool_builder_min_idle() {
1488        let pool = PoolBuilder::new()
1489            .url("postgres://user:pass@localhost/db")
1490            .min_idle(2)
1491            .build()
1492            .unwrap();
1493        assert_eq!(pool.inner.min_idle, 2);
1494    }
1495
1496    // #64: Pool close marks pool as closed
1497    #[test]
1498    fn pool_close_marks_closed() {
1499        let pool = PoolBuilder::new()
1500            .url("postgres://user:pass@localhost/db")
1501            .max_size(5)
1502            .build()
1503            .unwrap();
1504
1505        assert!(!pool.is_closed());
1506        pool.close();
1507        assert!(pool.is_closed());
1508
1509        // New acquires should fail
1510        let result = pool.acquire();
1511        assert!(result.is_err());
1512        match result {
1513            Err(DriverError::Pool(msg)) => assert!(msg.contains("closed")),
1514            Err(e) => panic!("expected Pool(closed) error, got: {e:?}"),
1515            Ok(_) => panic!("expected error, got Ok"),
1516        }
1517    }
1518
1519    // #67: PoolStatus idle/active counts
1520    #[test]
1521    fn pool_status_initial() {
1522        let pool = PoolBuilder::new()
1523            .url("postgres://user:pass@localhost/db")
1524            .max_size(10)
1525            .build()
1526            .unwrap();
1527
1528        let status = pool.status();
1529        assert_eq!(status.idle, 0);
1530        assert_eq!(status.active, 0);
1531        assert_eq!(status.open, 0);
1532        assert_eq!(status.max_size, 10);
1533    }
1534
1535    // Default pool builder values
1536    #[test]
1537    fn pool_builder_defaults() {
1538        let pool = PoolBuilder::new()
1539            .url("postgres://user:pass@localhost/db")
1540            .build()
1541            .unwrap();
1542
1543        assert_eq!(pool.max_size(), 10);
1544        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(30 * 60)));
1545        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(5)));
1546        assert_eq!(pool.inner.min_idle, 0);
1547    }
1548
1549    // Pool open_count starts at 0
1550    #[test]
1551    fn pool_open_count_initial() {
1552        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1553        assert_eq!(pool.open_count(), 0);
1554    }
1555
1556    // --- Task 7: max_stmt_cache_size ---
1557
1558    #[test]
1559    fn pool_builder_max_stmt_cache_size_default() {
1560        let pool = PoolBuilder::new()
1561            .url("postgres://user:pass@localhost/db")
1562            .build()
1563            .unwrap();
1564        assert_eq!(pool.inner.max_stmt_cache_size, 256);
1565    }
1566
1567    #[test]
1568    fn pool_builder_max_stmt_cache_size_custom() {
1569        let pool = PoolBuilder::new()
1570            .url("postgres://user:pass@localhost/db")
1571            .max_stmt_cache_size(512)
1572            .build()
1573            .unwrap();
1574        assert_eq!(pool.inner.max_stmt_cache_size, 512);
1575    }
1576
1577    // --- Auto-UDS detection tests ---
1578
1579    #[test]
1580    fn pool_is_uds_false_for_tcp() {
1581        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1582        assert!(!pool.is_uds());
1583    }
1584
1585    #[cfg(unix)]
1586    #[test]
1587    fn pool_is_uds_true_for_unix_socket() {
1588        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1589        assert!(pool.is_uds());
1590    }
1591
1592    #[cfg(unix)]
1593    #[test]
1594    fn pool_is_uds_true_for_var_run_socket() {
1595        let pool = Pool::connect("postgres://user@localhost/db?host=/var/run/postgresql").unwrap();
1596        assert!(pool.is_uds());
1597    }
1598
1599    #[test]
1600    fn pool_is_uds_false_for_ip_address() {
1601        let pool = Pool::connect("postgres://user:pass@127.0.0.1/db").unwrap();
1602        assert!(!pool.is_uds());
1603    }
1604
1605    #[cfg(unix)]
1606    #[test]
1607    fn pool_slot_sync_created_for_uds_config() {
1608        let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1609        assert!(config.host_is_uds());
1610    }
1611
1612    #[test]
1613    fn pool_slot_tcp_config() {
1614        let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1615        assert!(!config.host_is_uds());
1616    }
1617
1618    // ===============================================================
1619    // Pool::is_uds — extended tests
1620    // ===============================================================
1621
1622    #[test]
1623    fn pool_is_uds_false_for_hostname() {
1624        let pool = Pool::connect("postgres://user:pass@db.example.com/db").unwrap();
1625        assert!(!pool.is_uds());
1626    }
1627
1628    #[cfg(unix)]
1629    #[test]
1630    fn pool_is_uds_true_for_tmp() {
1631        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp").unwrap();
1632        assert!(pool.is_uds());
1633    }
1634
1635    // ===============================================================
1636    // Pool close semantics
1637    // ===============================================================
1638
1639    #[test]
1640    fn pool_close_then_acquire_fails() {
1641        let pool = PoolBuilder::new()
1642            .url("postgres://user:pass@localhost/db")
1643            .max_size(5)
1644            .build()
1645            .unwrap();
1646        pool.close();
1647        let result = pool.acquire();
1648        assert!(result.is_err());
1649        match result {
1650            Err(DriverError::Pool(msg)) => {
1651                assert!(msg.contains("closed"), "should say closed: {msg}")
1652            }
1653            Err(e) => panic!("expected Pool error, got: {e:?}"),
1654            Ok(_) => panic!("expected error"),
1655        }
1656    }
1657
1658    #[test]
1659    fn pool_is_closed_before_and_after() {
1660        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1661        assert!(!pool.is_closed());
1662        pool.close();
1663        assert!(pool.is_closed());
1664    }
1665
1666    // ===============================================================
1667    // Pool exhaustion (fail-fast without timeout)
1668    // ===============================================================
1669
1670    #[test]
1671    fn pool_exhausted_no_timeout() {
1672        let pool = PoolBuilder::new()
1673            .url("postgres://user:pass@localhost/db")
1674            .max_size(0)
1675            .acquire_timeout(None) // fail-fast
1676            .build()
1677            .unwrap();
1678        let result = pool.acquire();
1679        assert!(result.is_err());
1680        match result {
1681            Err(DriverError::Pool(msg)) => {
1682                assert!(msg.contains("exhausted"), "should say exhausted: {msg}")
1683            }
1684            Err(e) => panic!("expected Pool error, got: {e:?}"),
1685            Ok(_) => panic!("expected error"),
1686        }
1687    }
1688
1689    // ===============================================================
1690    // PoolBuilder validation
1691    // ===============================================================
1692
1693    #[test]
1694    fn pool_builder_no_url_error() {
1695        let result = PoolBuilder::new().max_size(5).build();
1696        assert!(result.is_err());
1697        match result {
1698            Err(DriverError::Pool(msg)) => {
1699                assert!(msg.contains("URL"), "should mention URL: {msg}")
1700            }
1701            Err(e) => panic!("expected Pool error, got: {e:?}"),
1702            Ok(_) => panic!("expected error"),
1703        }
1704    }
1705
1706    #[test]
1707    fn pool_builder_invalid_url_error() {
1708        let result = PoolBuilder::new().url("ftp://something").build();
1709        assert!(result.is_err());
1710    }
1711
1712    #[test]
1713    fn pool_builder_stmt_cache_size_zero() {
1714        let pool = PoolBuilder::new()
1715            .url("postgres://user:pass@localhost/db")
1716            .max_stmt_cache_size(0)
1717            .build()
1718            .unwrap();
1719        assert_eq!(pool.inner.max_stmt_cache_size, 0);
1720    }
1721
1722    // --- Gap: stale_timeout builder config ---
1723
1724    #[test]
1725    fn pool_builder_stale_timeout_default() {
1726        let pool = PoolBuilder::new()
1727            .url("postgres://user:pass@localhost/db")
1728            .build()
1729            .unwrap();
1730        assert_eq!(pool.inner.stale_timeout, Duration::from_secs(30));
1731    }
1732
1733    #[test]
1734    fn pool_builder_stale_timeout_custom() {
1735        let pool = PoolBuilder::new()
1736            .url("postgres://user:pass@localhost/db")
1737            .stale_timeout(Duration::from_secs(60))
1738            .build()
1739            .unwrap();
1740        assert_eq!(pool.inner.stale_timeout, Duration::from_secs(60));
1741    }
1742
1743    #[test]
1744    fn pool_builder_stale_timeout_zero() {
1745        let pool = PoolBuilder::new()
1746            .url("postgres://user:pass@localhost/db")
1747            .stale_timeout(Duration::from_secs(0))
1748            .build()
1749            .unwrap();
1750        assert_eq!(pool.inner.stale_timeout, Duration::from_secs(0));
1751    }
1752
1753    // ===============================================================
1754    // PoolStatus
1755    // ===============================================================
1756
1757    #[test]
1758    fn pool_status_reflects_max_size() {
1759        let pool = PoolBuilder::new()
1760            .url("postgres://user:pass@localhost/db")
1761            .max_size(20)
1762            .build()
1763            .unwrap();
1764        let status = pool.status();
1765        assert_eq!(status.max_size, 20);
1766        assert_eq!(status.idle, 0);
1767        assert_eq!(status.active, 0);
1768        assert_eq!(status.open, 0);
1769    }
1770
1771    // ===============================================================
1772    // Pool clone
1773    // ===============================================================
1774
1775    #[test]
1776    fn pool_clone_shares_config() {
1777        let pool = PoolBuilder::new()
1778            .url("postgres://user:pass@localhost/db")
1779            .max_size(7)
1780            .build()
1781            .unwrap();
1782        let p2 = pool.clone();
1783        assert_eq!(pool.max_size(), 7);
1784        assert_eq!(p2.max_size(), 7);
1785        assert_eq!(pool.open_count(), p2.open_count());
1786    }
1787
1788    // ===============================================================
1789    // set_warmup_sqls
1790    // ===============================================================
1791
1792    #[test]
1793    fn pool_set_warmup_sqls_empty() {
1794        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1795        pool.set_warmup_sqls([] as [&str; 0]);
1796        let sqls = pool
1797            .inner
1798            .warmup_sqls
1799            .lock()
1800            .unwrap_or_else(|e| e.into_inner())
1801            .clone();
1802        assert!(sqls.is_empty());
1803    }
1804
1805    #[test]
1806    fn pool_set_warmup_sqls_multiple() {
1807        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1808        pool.set_warmup_sqls(["SELECT 1", "SELECT 2", "SELECT 3"]);
1809        let sqls = pool
1810            .inner
1811            .warmup_sqls
1812            .lock()
1813            .unwrap_or_else(|e| e.into_inner())
1814            .clone();
1815        assert_eq!(sqls.len(), 3);
1816        assert_eq!(&*sqls[0], "SELECT 1");
1817        assert_eq!(&*sqls[1], "SELECT 2");
1818        assert_eq!(&*sqls[2], "SELECT 3");
1819    }
1820
1821    #[test]
1822    fn pool_set_warmup_sqls_overwrite() {
1823        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1824        pool.set_warmup_sqls(["SELECT 1"]);
1825        pool.set_warmup_sqls(["SELECT 99"]);
1826        let sqls = pool
1827            .inner
1828            .warmup_sqls
1829            .lock()
1830            .unwrap_or_else(|e| e.into_inner())
1831            .clone();
1832        assert_eq!(sqls.len(), 1);
1833        assert_eq!(&*sqls[0], "SELECT 99");
1834    }
1835
1836    #[test]
1837    fn pool_set_warmup_sqls_with_iter_empty() {
1838        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1839        pool.set_warmup_sqls(std::iter::empty::<&str>());
1840        let sqls = pool
1841            .inner
1842            .warmup_sqls
1843            .lock()
1844            .unwrap_or_else(|e| e.into_inner())
1845            .clone();
1846        assert!(sqls.is_empty());
1847    }
1848
1849    #[test]
1850    fn pool_set_warmup_sqls_with_owned_string() {
1851        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1852        let dynamic = format!("SET search_path TO test_{}", 42);
1853        pool.set_warmup_sqls([dynamic]);
1854        let sqls = pool
1855            .inner
1856            .warmup_sqls
1857            .lock()
1858            .unwrap_or_else(|e| e.into_inner())
1859            .clone();
1860        assert_eq!(sqls.len(), 1);
1861        assert_eq!(&*sqls[0], "SET search_path TO test_42");
1862    }
1863
1864    #[test]
1865    fn pool_set_warmup_sqls_with_vec_of_strings() {
1866        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1867        let sqls_owned: Vec<String> = vec!["SELECT 1".to_owned(), "SELECT 2".to_owned()];
1868        pool.set_warmup_sqls(sqls_owned);
1869        let sqls = pool
1870            .inner
1871            .warmup_sqls
1872            .lock()
1873            .unwrap_or_else(|e| e.into_inner())
1874            .clone();
1875        assert_eq!(sqls.len(), 2);
1876        assert_eq!(&*sqls[0], "SELECT 1");
1877    }
1878
1879    #[test]
1880    fn pool_set_warmup_sqls_with_boxed_str() {
1881        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1882        let b: Box<str> = "SELECT 1".into();
1883        pool.set_warmup_sqls([b]);
1884        let sqls = pool
1885            .inner
1886            .warmup_sqls
1887            .lock()
1888            .unwrap_or_else(|e| e.into_inner())
1889            .clone();
1890        assert_eq!(&*sqls[0], "SELECT 1");
1891    }
1892
1893    #[test]
1894    fn pool_set_warmup_sqls_single_static_str() {
1895        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1896        pool.set_warmup_sqls(["SET statement_timeout = '30s'"]);
1897        let sqls = pool
1898            .inner
1899            .warmup_sqls
1900            .lock()
1901            .unwrap_or_else(|e| e.into_inner())
1902            .clone();
1903        assert_eq!(sqls.len(), 1);
1904    }
1905
1906    #[test]
1907    fn pool_set_warmup_sqls_preserves_order() {
1908        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1909        pool.set_warmup_sqls(["first", "second", "third"]);
1910        let sqls = pool
1911            .inner
1912            .warmup_sqls
1913            .lock()
1914            .unwrap_or_else(|e| e.into_inner())
1915            .clone();
1916        assert_eq!(&*sqls[0], "first");
1917        assert_eq!(&*sqls[1], "second");
1918        assert_eq!(&*sqls[2], "third");
1919    }
1920
1921    #[test]
1922    fn pool_set_warmup_sqls_unicode() {
1923        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1924        pool.set_warmup_sqls(["SET client_encoding TO 'UTF8'", "SELECT '日本語'"]);
1925        let sqls = pool
1926            .inner
1927            .warmup_sqls
1928            .lock()
1929            .unwrap_or_else(|e| e.into_inner())
1930            .clone();
1931        assert_eq!(&*sqls[1], "SELECT '日本語'");
1932    }
1933
1934    #[test]
1935    fn pool_set_warmup_sqls_empty_string() {
1936        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1937        pool.set_warmup_sqls([""]);
1938        let sqls = pool
1939            .inner
1940            .warmup_sqls
1941            .lock()
1942            .unwrap_or_else(|e| e.into_inner())
1943            .clone();
1944        assert_eq!(sqls.len(), 1);
1945        assert_eq!(&*sqls[0], "");
1946    }
1947
1948    #[test]
1949    fn pool_set_warmup_sqls_long_sql() {
1950        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1951        let long = "SELECT ".to_owned() + &"x, ".repeat(1000) + "1";
1952        pool.set_warmup_sqls([long]);
1953        let sqls = pool
1954            .inner
1955            .warmup_sqls
1956            .lock()
1957            .unwrap_or_else(|e| e.into_inner())
1958            .clone();
1959        assert!(sqls[0].len() > 3000);
1960    }
1961
1962    // ===============================================================
1963    // PoolStatus Debug
1964    // ===============================================================
1965
1966    #[test]
1967    fn pool_status_debug() {
1968        let pool = Pool::connect("postgres://user:pass@localhost/db").unwrap();
1969        let status = pool.status();
1970        let dbg = format!("{status:?}");
1971        assert!(dbg.contains("PoolStatus"));
1972        assert!(dbg.contains("idle"));
1973        assert!(dbg.contains("active"));
1974        assert!(dbg.contains("open"));
1975        assert!(dbg.contains("max_size"));
1976    }
1977
1978    // ===============================================================
1979    // Config host_is_uds via pool (structural tests)
1980    // ===============================================================
1981
1982    #[test]
1983    fn config_host_is_uds_returns_true_for_slash() {
1984        let config = Config::from_url("postgres://user@localhost/db?host=/tmp").unwrap();
1985        assert!(config.host_is_uds());
1986    }
1987
1988    #[test]
1989    fn config_host_is_uds_returns_false_for_tcp() {
1990        let config = Config::from_url("postgres://user:pass@localhost/db").unwrap();
1991        assert!(!config.host_is_uds());
1992    }
1993
1994    #[test]
1995    fn config_host_is_uds_returns_false_for_ip() {
1996        let config = Config::from_url("postgres://user:pass@192.168.1.1/db").unwrap();
1997        assert!(!config.host_is_uds());
1998    }
1999
2000    // ===============================================================
2001    // PoolBuilder chaining
2002    // ===============================================================
2003
2004    #[test]
2005    fn pool_builder_full_chain() {
2006        let pool = PoolBuilder::new()
2007            .url("postgres://user:pass@localhost/db")
2008            .max_size(3)
2009            .max_lifetime(Some(Duration::from_secs(600)))
2010            .acquire_timeout(Some(Duration::from_secs(5)))
2011            .min_idle(1)
2012            .max_stmt_cache_size(128)
2013            .build()
2014            .unwrap();
2015        assert_eq!(pool.max_size(), 3);
2016        assert_eq!(pool.inner.max_lifetime, Some(Duration::from_secs(600)));
2017        assert_eq!(pool.inner.acquire_timeout, Some(Duration::from_secs(5)));
2018        assert_eq!(pool.inner.min_idle, 1);
2019        assert_eq!(pool.inner.max_stmt_cache_size, 128);
2020    }
2021
2022    // --- Audit: PoolGuard drop discards connections in bad state ---
2023
2024    #[test]
2025    fn pool_max_size_zero_rejects_all_acquires() {
2026        let pool = PoolBuilder::new()
2027            .url("postgres://user:pass@localhost/db")
2028            .max_size(0)
2029            .build()
2030            .unwrap();
2031        let result = pool.acquire();
2032        assert!(result.is_err());
2033        match &result {
2034            Err(DriverError::Pool(msg)) => assert!(msg.contains("exhausted")),
2035            _ => panic!("expected pool exhausted error"),
2036        }
2037    }
2038
2039    // --- Audit: URL parsing edge cases ---
2040
2041    #[test]
2042    fn url_parse_unknown_sslmode_returns_error() {
2043        let result = Config::from_url("postgres://u:p@h/d?sslmode=bogus");
2044        assert!(result.is_err());
2045        let msg = format!("{}", result.unwrap_err());
2046        assert!(msg.contains("unknown sslmode"));
2047    }
2048
2049    #[test]
2050    fn url_parse_invalid_port_returns_error() {
2051        let result = Config::from_url("postgres://u:p@h:abc/d");
2052        assert!(result.is_err());
2053        let msg = format!("{}", result.unwrap_err());
2054        assert!(msg.contains("invalid port"));
2055    }
2056
2057    #[test]
2058    fn url_parse_missing_at_sign_returns_error() {
2059        let result = Config::from_url("postgres://u:plocalhost/d");
2060        assert!(result.is_err());
2061        let msg = format!("{}", result.unwrap_err());
2062        assert!(msg.contains("missing @"));
2063    }
2064
2065    #[test]
2066    fn url_parse_empty_host_returns_error() {
2067        let result = Config::from_url("postgres://u:p@/d");
2068        assert!(result.is_err());
2069    }
2070
2071    #[test]
2072    fn url_parse_empty_user_returns_error() {
2073        let result = Config::from_url("postgres://:p@h/d");
2074        assert!(result.is_err());
2075    }
2076
2077    #[test]
2078    fn url_parse_statement_timeout_invalid_uses_default() {
2079        let config = Config::from_url("postgres://u:p@h/d?statement_timeout=notnum").unwrap();
2080        assert_eq!(config.statement_timeout_secs, 30);
2081    }
2082
2083    #[test]
2084    fn url_parse_malformed_percent_encoding() {
2085        let result = Config::from_url("postgres://u%:p@h/d");
2086        assert!(result.is_err());
2087    }
2088
2089    #[test]
2090    fn url_parse_invalid_hex_in_percent_encoding() {
2091        let result = Config::from_url("postgres://u%ZZ:p@h/d");
2092        assert!(result.is_err());
2093    }
2094}
2095
2096// --- N+1 detector tests ---
2097
2098#[cfg(all(test, feature = "detect-n-plus-one"))]
2099mod n_plus_one_tests {
2100    use super::NPlusOneDetector;
2101
2102    #[test]
2103    fn below_threshold_no_warning() {
2104        let mut d = NPlusOneDetector::new(10);
2105        for _ in 0..10 {
2106            d.track(42);
2107        }
2108        assert!(d.check_final().is_none());
2109    }
2110
2111    #[test]
2112    fn above_threshold_warns() {
2113        let mut d = NPlusOneDetector::new(10);
2114        for _ in 0..11 {
2115            d.track(42);
2116        }
2117        let w = d.check_final().unwrap();
2118        assert_eq!(w, (42, 11));
2119    }
2120
2121    #[test]
2122    fn exact_threshold_no_warning() {
2123        let mut d = NPlusOneDetector::new(5);
2124        for _ in 0..5 {
2125            d.track(99);
2126        }
2127        assert!(d.check_final().is_none(), "> not >=");
2128    }
2129
2130    #[test]
2131    fn threshold_plus_one_warns() {
2132        let mut d = NPlusOneDetector::new(5);
2133        for _ in 0..6 {
2134            d.track(99);
2135        }
2136        assert_eq!(d.check_final(), Some((99, 6)));
2137    }
2138
2139    #[test]
2140    fn alternating_hashes_no_warning() {
2141        let mut d = NPlusOneDetector::new(2);
2142        for i in 0..100 {
2143            d.track(if i % 2 == 0 { 1 } else { 2 });
2144        }
2145        assert!(d.check_final().is_none());
2146    }
2147
2148    #[test]
2149    fn single_query_no_warning() {
2150        let mut d = NPlusOneDetector::new(10);
2151        d.track(42);
2152        assert!(d.check_final().is_none());
2153    }
2154
2155    #[test]
2156    fn no_queries_no_warning() {
2157        let d = NPlusOneDetector::new(10);
2158        assert!(d.check_final().is_none());
2159    }
2160
2161    #[test]
2162    fn threshold_zero_warns_on_second() {
2163        let mut d = NPlusOneDetector::new(0);
2164        d.track(42);
2165        // count=1, threshold=0 -> 1 > 0 -> warn
2166        assert_eq!(d.check_final(), Some((42, 1)));
2167    }
2168
2169    #[test]
2170    fn threshold_max_never_warns() {
2171        let mut d = NPlusOneDetector::new(u16::MAX);
2172        for _ in 0..1000 {
2173            d.track(42);
2174        }
2175        assert!(d.check_final().is_none());
2176    }
2177
2178    #[test]
2179    fn saturating_add_no_overflow() {
2180        let mut d = NPlusOneDetector::new(10);
2181        d.last_query_hash = 42;
2182        d.repeat_count = u16::MAX - 1;
2183        d.track(42); // saturating_add -> MAX
2184        d.track(42); // saturating_add -> still MAX
2185        assert_eq!(d.repeat_count, u16::MAX);
2186    }
2187
2188    #[test]
2189    fn different_hash_resets() {
2190        let mut d = NPlusOneDetector::new(100);
2191        for _ in 0..50 {
2192            d.track(1);
2193        }
2194        d.track(2); // resets
2195        assert_eq!(d.repeat_count, 1);
2196        assert_eq!(d.last_query_hash, 2);
2197    }
2198
2199    #[test]
2200    fn multiple_n_plus_one_sequences() {
2201        let mut d = NPlusOneDetector::new(3);
2202        // First sequence: hash=1, 5 times (>3 -> warning on switch)
2203        for _ in 0..5 {
2204            d.track(1);
2205        }
2206        // Switch triggers warning for hash=1
2207        // Second sequence: hash=2, 4 times (>3 -> check_final catches it)
2208        for _ in 0..4 {
2209            d.track(2);
2210        }
2211        // check_final sees hash=2, count=4 > 3
2212        assert_eq!(d.check_final(), Some((2, 4)));
2213    }
2214
2215    #[test]
2216    fn warning_emitted_on_hash_switch() {
2217        let mut d = NPlusOneDetector::new(2);
2218        d.track(10);
2219        d.track(10);
2220        d.track(10); // count=3 > 2
2221                     // Switch hash — this internally calls emit_warning for hash=10
2222        d.track(20);
2223        // Now tracking hash=20, count=1
2224        assert_eq!(d.last_query_hash, 20);
2225        assert_eq!(d.repeat_count, 1);
2226    }
2227
2228    #[test]
2229    fn hash_zero_treated_normally() {
2230        let mut d = NPlusOneDetector::new(2);
2231        d.track(0);
2232        d.track(0);
2233        d.track(0);
2234        // hash=0 but check_final requires hash != 0 — no warning
2235        assert!(d.check_final().is_none());
2236    }
2237
2238    #[test]
2239    fn long_sequence_correct_count() {
2240        let mut d = NPlusOneDetector::new(10);
2241        for _ in 0..500 {
2242            d.track(42);
2243        }
2244        assert_eq!(d.check_final(), Some((42, 500)));
2245    }
2246
2247    #[test]
2248    fn two_queries_below_threshold() {
2249        let mut d = NPlusOneDetector::new(10);
2250        d.track(1);
2251        d.track(1);
2252        assert!(d.check_final().is_none());
2253    }
2254
2255    #[test]
2256    fn interleaved_then_burst() {
2257        let mut d = NPlusOneDetector::new(3);
2258        // Interleaved: no trigger
2259        d.track(1);
2260        d.track(2);
2261        d.track(1);
2262        d.track(2);
2263        // Burst: hash=5, 5 times
2264        for _ in 0..5 {
2265            d.track(5);
2266        }
2267        assert_eq!(d.check_final(), Some((5, 5)));
2268    }
2269
2270    // --- Builder threshold wiring ---
2271
2272    #[test]
2273    fn pool_builder_n_plus_one_threshold_default() {
2274        let pool = super::PoolBuilder::new()
2275            .url("postgres://user:pass@localhost/db")
2276            .build()
2277            .unwrap();
2278        assert_eq!(pool.inner.n_plus_one_threshold, 10);
2279    }
2280
2281    #[test]
2282    fn pool_builder_n_plus_one_threshold_custom() {
2283        let pool = super::PoolBuilder::new()
2284            .url("postgres://user:pass@localhost/db")
2285            .n_plus_one_threshold(5)
2286            .build()
2287            .unwrap();
2288        assert_eq!(pool.inner.n_plus_one_threshold, 5);
2289    }
2290
2291    #[test]
2292    fn pool_builder_n_plus_one_threshold_zero() {
2293        let pool = super::PoolBuilder::new()
2294            .url("postgres://user:pass@localhost/db")
2295            .n_plus_one_threshold(0)
2296            .build()
2297            .unwrap();
2298        assert_eq!(pool.inner.n_plus_one_threshold, 0);
2299    }
2300
2301    #[test]
2302    fn pool_builder_n_plus_one_threshold_max() {
2303        let pool = super::PoolBuilder::new()
2304            .url("postgres://user:pass@localhost/db")
2305            .n_plus_one_threshold(u16::MAX)
2306            .build()
2307            .unwrap();
2308        assert_eq!(pool.inner.n_plus_one_threshold, u16::MAX);
2309    }
2310
2311    #[test]
2312    fn one_then_different_no_warning() {
2313        let mut d = NPlusOneDetector::new(10);
2314        d.track(1);
2315        d.track(2);
2316        // hash=1 had count=1 (below 10), hash=2 has count=1 (below 10)
2317        assert!(d.check_final().is_none());
2318    }
2319
2320    #[test]
2321    fn nonzero_hash_after_zero_init() {
2322        // First call with nonzero hash: else branch (0 != hash),
2323        // emit_warning for old (hash=0, count=0) - nothing.
2324        // Set last=hash, count=1.
2325        let mut d = NPlusOneDetector::new(0);
2326        d.track(42);
2327        let w = d.check_final().unwrap();
2328        assert_eq!(w, (42, 1));
2329    }
2330
2331    #[test]
2332    fn independent_detectors_dont_interfere() {
2333        // Each PoolGuard has its own detector -- verify independence
2334        let mut d1 = NPlusOneDetector::new(5);
2335        let mut d2 = NPlusOneDetector::new(5);
2336
2337        // d1 gets N+1 pattern
2338        for _ in 0..10 {
2339            d1.track(42);
2340        }
2341        // d2 gets different pattern
2342        d2.track(1);
2343        d2.track(2);
2344        d2.track(3);
2345
2346        // d1 should warn, d2 should not
2347        assert!(d1.check_final().is_some());
2348        assert!(d2.check_final().is_none());
2349    }
2350
2351    #[test]
2352    fn rapid_hash_changes_dont_false_positive() {
2353        // Rapid switching between many different hashes should never trigger
2354        let mut d = NPlusOneDetector::new(2);
2355        for i in 0u64..1000 {
2356            d.track(i);
2357        }
2358        // Final hash (999) was only tracked once
2359        assert!(d.check_final().is_none());
2360    }
2361
2362    #[test]
2363    fn detector_reset_state_after_warning() {
2364        // After a sequence triggers, the next sequence starts fresh
2365        let mut d = NPlusOneDetector::new(2);
2366        d.track(1);
2367        d.track(1);
2368        d.track(1); // count=3 > 2, would warn on switch
2369        d.track(2); // switch triggers warning for hash=1, resets to hash=2, count=1
2370        d.track(2); // count=2, not > 2
2371        assert!(d.check_final().is_none()); // hash=2, count=2, not > threshold=2
2372    }
2373
2374    #[test]
2375    fn detector_with_realistic_orm_pattern() {
2376        // Simulate: fetch users, then for each user fetch orders (N+1)
2377        let mut d = NPlusOneDetector::new(5);
2378        d.track(100); // SELECT * FROM users
2379                      // N+1 pattern: same query per user
2380        for _ in 0..20 {
2381            d.track(200); // SELECT * FROM orders WHERE user_id = ?
2382        }
2383        // Should detect the orders query
2384        assert_eq!(d.check_final(), Some((200, 20)));
2385    }
2386
2387    #[test]
2388    fn detector_with_legitimate_batch_pattern() {
2389        // Legitimate: different params but same prepared statement hash
2390        // This IS an N+1 and SHOULD be detected
2391        let mut d = NPlusOneDetector::new(10);
2392        for _ in 0..15 {
2393            d.track(300); // same sql_hash, different params (detector doesn't see params)
2394        }
2395        assert!(d.check_final().is_some());
2396    }
2397
2398    #[test]
2399    fn detector_exactly_at_boundaries() {
2400        for threshold in [0u16, 1, 2, 5, 10, 100] {
2401            let mut d = NPlusOneDetector::new(threshold);
2402            for _ in 0..=threshold {
2403                d.track(42);
2404            }
2405            // count == threshold + 1, should warn (> not >=)
2406            assert!(
2407                d.check_final().is_some(),
2408                "threshold={threshold} should warn at count={}",
2409                threshold + 1
2410            );
2411        }
2412    }
2413
2414    #[test]
2415    fn detector_with_deterministic_random_sequences() {
2416        // Deterministic "random" hash sequences
2417        let mut d = NPlusOneDetector::new(5);
2418        let hashes: Vec<u64> = (0..100).map(|i| ((i * 7 + 3) % 4) as u64).collect();
2419        for &h in &hashes {
2420            d.track(h);
2421        }
2422        // Should not panic, result depends on sequence
2423        let _ = d.check_final();
2424    }
2425
2426    mod proptest_fuzz {
2427        use super::*;
2428        use proptest::prelude::*;
2429
2430        proptest! {
2431            #[test]
2432            fn detector_never_panics(
2433                hashes in proptest::collection::vec(0u64..100, 0..500),
2434                threshold in 0u16..100,
2435            ) {
2436                let mut d = NPlusOneDetector::new(threshold);
2437                for h in &hashes {
2438                    d.track(*h);
2439                }
2440                let _ = d.check_final();
2441            }
2442
2443            #[test]
2444            fn sequential_repeats_always_detected(
2445                hash in 1u64..u64::MAX,
2446                count in 2u16..1000,
2447                threshold in 0u16..100,
2448            ) {
2449                let mut d = NPlusOneDetector::new(threshold);
2450                for _ in 0..count {
2451                    d.track(hash);
2452                }
2453                if count > threshold {
2454                    assert!(d.check_final().is_some(),
2455                        "count={count} > threshold={threshold} should trigger");
2456                }
2457            }
2458        }
2459    }
2460}