Skip to main content

sqlmodel_pool/
lib.rs

1//! Connection pooling for SQLModel Rust using asupersync.
2//!
3//! `sqlmodel-pool` is the **connection lifecycle layer**. It provides a generic,
4//! budget-aware pool that integrates with structured concurrency and can wrap any
5//! `Connection` implementation.
6//!
7//! # Role In The Architecture
8//!
9//! - **Shared connection management**: reuse connections across tasks safely.
10//! - **Budget-aware acquisition**: respects `Cx` timeouts and cancellation.
11//! - **Health checks**: validates connections before handing them out.
12//! - **Metrics**: exposes stats for pool sizing and tuning.
13//!
14//! # Features
15//!
16//! - Generic over any `Connection` type
17//! - RAII-based connection return (connections returned on drop)
18//! - Timeout support via `Cx` context
19//! - Connection health validation
20//! - Idle and max lifetime tracking
21//! - Pool statistics
22//!
23//! # Example
24//!
25//! ```rust,ignore
26//! use sqlmodel_pool::{Pool, PoolConfig};
27//!
28//! // Create a pool
29//! let config = PoolConfig::new(10)
30//!     .min_connections(2)
31//!     .acquire_timeout(5000);
32//!
33//! let pool = Pool::new(config, || async {
34//!     // Factory function to create new connections
35//!     PgConnection::connect(&cx, &pg_config).await
36//! });
37//!
38//! // Acquire a connection
39//! let conn = pool.acquire(&cx).await?;
40//!
41//! // Use the connection (automatically returned to pool on drop)
42//! conn.query(&cx, "SELECT 1", &[]).await?;
43//! ```
44
45pub mod replica;
46pub use replica::{ReplicaPool, ReplicaStrategy};
47
48pub mod sharding;
49pub use sharding::{ModuloShardChooser, QueryHints, ShardChooser, ShardedPool, ShardedPoolStats};
50
51use std::collections::VecDeque;
52use std::future::Future;
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::{Arc, Condvar, Mutex, Weak};
55use std::time::{Duration, Instant};
56
57use asupersync::{CancelReason, Cx, Outcome};
58use sqlmodel_core::error::{ConnectionError, ConnectionErrorKind, PoolError, PoolErrorKind};
59use sqlmodel_core::{Connection, Error};
60
61/// Connection pool configuration.
62#[derive(Debug, Clone)]
63pub struct PoolConfig {
64    /// Minimum number of connections to maintain
65    pub min_connections: usize,
66    /// Maximum number of connections allowed
67    pub max_connections: usize,
68    /// Connection idle timeout in milliseconds
69    pub idle_timeout_ms: u64,
70    /// Maximum time to wait for a connection in milliseconds
71    pub acquire_timeout_ms: u64,
72    /// Maximum lifetime of a connection in milliseconds
73    pub max_lifetime_ms: u64,
74    /// Test connections before giving them out
75    pub test_on_checkout: bool,
76    /// Test connections when returning them to the pool
77    pub test_on_return: bool,
78}
79
80impl Default for PoolConfig {
81    fn default() -> Self {
82        Self {
83            min_connections: 1,
84            max_connections: 10,
85            idle_timeout_ms: 600_000,   // 10 minutes
86            acquire_timeout_ms: 30_000, // 30 seconds
87            max_lifetime_ms: 1_800_000, // 30 minutes
88            test_on_checkout: true,
89            test_on_return: false,
90        }
91    }
92}
93
94impl PoolConfig {
95    /// Create a new pool configuration with the given max connections.
96    #[must_use]
97    pub fn new(max_connections: usize) -> Self {
98        Self {
99            max_connections,
100            ..Default::default()
101        }
102    }
103
104    /// Set minimum connections.
105    #[must_use]
106    pub fn min_connections(mut self, n: usize) -> Self {
107        self.min_connections = n;
108        self
109    }
110
111    /// Set idle timeout in milliseconds.
112    #[must_use]
113    pub fn idle_timeout(mut self, ms: u64) -> Self {
114        self.idle_timeout_ms = ms;
115        self
116    }
117
118    /// Set acquire timeout in milliseconds.
119    #[must_use]
120    pub fn acquire_timeout(mut self, ms: u64) -> Self {
121        self.acquire_timeout_ms = ms;
122        self
123    }
124
125    /// Set max lifetime in milliseconds.
126    #[must_use]
127    pub fn max_lifetime(mut self, ms: u64) -> Self {
128        self.max_lifetime_ms = ms;
129        self
130    }
131
132    /// Enable/disable test on checkout.
133    #[must_use]
134    pub fn test_on_checkout(mut self, enabled: bool) -> Self {
135        self.test_on_checkout = enabled;
136        self
137    }
138
139    /// Enable/disable test on return.
140    #[must_use]
141    pub fn test_on_return(mut self, enabled: bool) -> Self {
142        self.test_on_return = enabled;
143        self
144    }
145}
146
147/// Pool statistics.
148#[derive(Debug, Clone, Default)]
149pub struct PoolStats {
150    /// Total number of connections (active + idle)
151    pub total_connections: usize,
152    /// Number of idle connections
153    pub idle_connections: usize,
154    /// Number of active connections (currently in use)
155    pub active_connections: usize,
156    /// Number of pending acquire requests
157    pub pending_requests: usize,
158    /// Total number of connections created
159    pub connections_created: u64,
160    /// Total number of connections closed
161    pub connections_closed: u64,
162    /// Total number of successful acquires
163    pub acquires: u64,
164    /// Total number of acquire timeouts
165    pub timeouts: u64,
166}
167
168/// Metadata about a pooled connection.
169#[derive(Debug)]
170struct ConnectionMeta<C> {
171    /// The actual connection
172    conn: C,
173    /// When this connection was created
174    created_at: Instant,
175    /// When this connection was last used
176    last_used: Instant,
177}
178
179impl<C> ConnectionMeta<C> {
180    fn new(conn: C) -> Self {
181        let now = Instant::now();
182        Self {
183            conn,
184            created_at: now,
185            last_used: now,
186        }
187    }
188
189    fn touch(&mut self) {
190        self.last_used = Instant::now();
191    }
192
193    fn age(&self) -> Duration {
194        self.created_at.elapsed()
195    }
196
197    fn idle_time(&self) -> Duration {
198        self.last_used.elapsed()
199    }
200}
201
202/// Internal pool state shared between pool and connections.
203struct PoolInner<C> {
204    /// Pool configuration
205    config: PoolConfig,
206    /// Idle connections available for use
207    idle: VecDeque<ConnectionMeta<C>>,
208    /// Number of connections currently checked out
209    active_count: usize,
210    /// Total number of connections (idle + active)
211    total_count: usize,
212    /// Number of waiters in the queue
213    waiter_count: usize,
214    /// Whether the pool has been closed
215    closed: bool,
216}
217
218impl<C> PoolInner<C> {
219    fn new(config: PoolConfig) -> Self {
220        Self {
221            config,
222            idle: VecDeque::new(),
223            active_count: 0,
224            total_count: 0,
225            waiter_count: 0,
226            closed: false,
227        }
228    }
229
230    fn can_create_new(&self) -> bool {
231        !self.closed && self.total_count < self.config.max_connections
232    }
233
234    fn stats(&self) -> PoolStats {
235        PoolStats {
236            total_connections: self.total_count,
237            idle_connections: self.idle.len(),
238            active_connections: self.active_count,
239            pending_requests: self.waiter_count,
240            ..Default::default()
241        }
242    }
243}
244
245/// Shared state wrapper with condition variable for notification.
246struct PoolShared<C> {
247    /// Protected pool state
248    inner: Mutex<PoolInner<C>>,
249    /// Notifies waiters when connections become available
250    conn_available: Condvar,
251    /// Statistics counters (atomic for lock-free reads)
252    connections_created: AtomicU64,
253    connections_closed: AtomicU64,
254    acquires: AtomicU64,
255    timeouts: AtomicU64,
256}
257
258impl<C> PoolShared<C> {
259    fn new(config: PoolConfig) -> Self {
260        Self {
261            inner: Mutex::new(PoolInner::new(config)),
262            conn_available: Condvar::new(),
263            connections_created: AtomicU64::new(0),
264            connections_closed: AtomicU64::new(0),
265            acquires: AtomicU64::new(0),
266            timeouts: AtomicU64::new(0),
267        }
268    }
269
270    /// Lock the inner mutex, recovering from poisoning for read-only access.
271    ///
272    /// A poisoned mutex occurs when a thread panicked while holding the lock.
273    /// The data inside is still valid for reading, so we recover by logging
274    /// and using `into_inner()` to get the guard.
275    ///
276    /// This should only be used for read-only operations where the data is
277    /// always valid regardless of whether a previous operation completed.
278    fn lock_or_recover(&self) -> std::sync::MutexGuard<'_, PoolInner<C>> {
279        self.inner.lock().unwrap_or_else(|poisoned| {
280            tracing::error!(
281                "Pool mutex poisoned; recovering for read-only access. \
282                 A thread panicked while holding the lock."
283            );
284            poisoned.into_inner()
285        })
286    }
287
288    /// Lock the inner mutex, returning an error if poisoned.
289    ///
290    /// Use this for mutation operations where the pool state may be inconsistent
291    /// after a panic. Unlike `lock_or_recover()`, this propagates the error
292    /// to the caller.
293    #[allow(clippy::result_large_err)] // Error type is large by design for rich diagnostics
294    fn lock_or_error(
295        &self,
296        operation: &'static str,
297    ) -> Result<std::sync::MutexGuard<'_, PoolInner<C>>, Error> {
298        self.inner
299            .lock()
300            .map_err(|_| Error::Pool(PoolError::poisoned(operation)))
301    }
302}
303
304/// A connection pool for database connections.
305///
306/// The pool manages a collection of connections, reusing them across
307/// requests to avoid the overhead of establishing new connections.
308///
309/// # Type Parameters
310///
311/// - `C`: The connection type, must implement `Connection`
312///
313/// # Cancellation
314///
315/// Pool operations respect cancellation via the `Cx` context:
316/// - `acquire` will return early if cancellation is requested
317/// - Connections are properly cleaned up on cancellation
318pub struct Pool<C: Connection> {
319    shared: Arc<PoolShared<C>>,
320}
321
322impl<C: Connection> Pool<C> {
323    /// Create a new connection pool with the given configuration.
324    #[must_use]
325    pub fn new(config: PoolConfig) -> Self {
326        Self {
327            shared: Arc::new(PoolShared::new(config)),
328        }
329    }
330
331    /// Get the pool configuration.
332    #[must_use]
333    pub fn config(&self) -> PoolConfig {
334        let inner = self.shared.lock_or_recover();
335        inner.config.clone()
336    }
337
338    /// Get the current pool statistics.
339    #[must_use]
340    pub fn stats(&self) -> PoolStats {
341        let inner = self.shared.lock_or_recover();
342        let mut stats = inner.stats();
343        stats.connections_created = self.shared.connections_created.load(Ordering::Relaxed);
344        stats.connections_closed = self.shared.connections_closed.load(Ordering::Relaxed);
345        stats.acquires = self.shared.acquires.load(Ordering::Relaxed);
346        stats.timeouts = self.shared.timeouts.load(Ordering::Relaxed);
347        stats
348    }
349
350    /// Check if the pool is at capacity.
351    #[must_use]
352    pub fn at_capacity(&self) -> bool {
353        let inner = self.shared.lock_or_recover();
354        inner.total_count >= inner.config.max_connections
355    }
356
357    /// Check if the pool has been closed.
358    #[must_use]
359    pub fn is_closed(&self) -> bool {
360        let inner = self.shared.lock_or_recover();
361        inner.closed
362    }
363
364    /// Acquire a connection from the pool.
365    ///
366    /// This method will:
367    /// 1. Return an idle connection if one is available
368    /// 2. Create a new connection if below capacity
369    /// 3. Wait for a connection to become available (up to timeout)
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if:
374    /// - The pool is closed
375    /// - The acquire timeout is exceeded
376    /// - Cancellation is requested via the `Cx` context
377    /// - Connection validation fails (if `test_on_checkout` is enabled)
378    pub async fn acquire<F, Fut>(&self, cx: &Cx, factory: F) -> Outcome<PooledConnection<C>, Error>
379    where
380        F: Fn() -> Fut,
381        Fut: Future<Output = Outcome<C, Error>>,
382    {
383        let deadline = Instant::now() + Duration::from_millis(self.config().acquire_timeout_ms);
384        let test_on_checkout = self.config().test_on_checkout;
385        let max_lifetime = Duration::from_millis(self.config().max_lifetime_ms);
386        let idle_timeout = Duration::from_millis(self.config().idle_timeout_ms);
387
388        loop {
389            // Check cancellation
390            if cx.is_cancel_requested() {
391                return Outcome::Cancelled(CancelReason::user("pool acquire cancelled"));
392            }
393
394            // Check timeout
395            if Instant::now() >= deadline {
396                self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
397                return Outcome::Err(Error::Pool(PoolError {
398                    kind: PoolErrorKind::Timeout,
399                    message: "acquire timeout: no connections available".to_string(),
400                    source: None,
401                }));
402            }
403
404            // Try to get an idle connection or determine if we can create new
405            let action = {
406                let mut inner = match self.shared.lock_or_error("acquire") {
407                    Ok(guard) => guard,
408                    Err(e) => return Outcome::Err(e),
409                };
410
411                if inner.closed {
412                    AcquireAction::PoolClosed
413                } else {
414                    // Try to get an idle connection
415                    let mut found_conn = None;
416                    while let Some(mut meta) = inner.idle.pop_front() {
417                        // Check if connection is too old
418                        if meta.age() > max_lifetime {
419                            inner.total_count -= 1;
420                            self.shared
421                                .connections_closed
422                                .fetch_add(1, Ordering::Relaxed);
423                            continue;
424                        }
425
426                        // Check if connection has been idle too long
427                        if meta.idle_time() > idle_timeout {
428                            inner.total_count -= 1;
429                            self.shared
430                                .connections_closed
431                                .fetch_add(1, Ordering::Relaxed);
432                            continue;
433                        }
434
435                        // Found a valid connection
436                        meta.touch();
437                        inner.active_count += 1;
438                        found_conn = Some(meta);
439                        break;
440                    }
441
442                    if let Some(meta) = found_conn {
443                        AcquireAction::ValidateExisting(meta)
444                    } else if inner.can_create_new() {
445                        // No idle connections, can we create new?
446                        inner.total_count += 1;
447                        inner.active_count += 1;
448                        AcquireAction::CreateNew
449                    } else {
450                        // Must wait
451                        inner.waiter_count += 1;
452                        AcquireAction::Wait
453                    }
454                }
455            };
456
457            match action {
458                AcquireAction::PoolClosed => {
459                    return Outcome::Err(Error::Pool(PoolError {
460                        kind: PoolErrorKind::Closed,
461                        message: "pool has been closed".to_string(),
462                        source: None,
463                    }));
464                }
465                AcquireAction::ValidateExisting(meta) => {
466                    // Validate and wrap the connection (lock is released)
467                    return self.validate_and_wrap(cx, meta, test_on_checkout).await;
468                }
469                AcquireAction::CreateNew => {
470                    // Create new connection outside of lock
471                    match factory().await {
472                        Outcome::Ok(conn) => {
473                            self.shared
474                                .connections_created
475                                .fetch_add(1, Ordering::Relaxed);
476                            self.shared.acquires.fetch_add(1, Ordering::Relaxed);
477                            let meta = ConnectionMeta::new(conn);
478                            return Outcome::Ok(PooledConnection::new(
479                                meta,
480                                Arc::downgrade(&self.shared),
481                            ));
482                        }
483                        Outcome::Err(e) => {
484                            // Failed to create, decrement counts
485                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
486                                inner.total_count -= 1;
487                                inner.active_count -= 1;
488                            }
489                            // Even if we can't decrement counts, still return the original error
490                            return Outcome::Err(e);
491                        }
492                        Outcome::Cancelled(reason) => {
493                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
494                                inner.total_count -= 1;
495                                inner.active_count -= 1;
496                            }
497                            return Outcome::Cancelled(reason);
498                        }
499                        Outcome::Panicked(info) => {
500                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
501                                inner.total_count -= 1;
502                                inner.active_count -= 1;
503                            }
504                            return Outcome::Panicked(info);
505                        }
506                    }
507                }
508                AcquireAction::Wait => {
509                    // Wait for a connection to become available
510                    let remaining = deadline.saturating_duration_since(Instant::now());
511                    if remaining.is_zero() {
512                        if let Ok(mut inner) = self.shared.lock_or_error("acquire_timeout") {
513                            inner.waiter_count -= 1;
514                        }
515                        self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
516                        return Outcome::Err(Error::Pool(PoolError {
517                            kind: PoolErrorKind::Timeout,
518                            message: "acquire timeout: no connections available".to_string(),
519                            source: None,
520                        }));
521                    }
522
523                    // Wait with timeout (use shorter interval for cancellation checks)
524                    let wait_time = remaining.min(Duration::from_millis(100));
525                    {
526                        let inner = match self.shared.lock_or_error("acquire_wait") {
527                            Ok(guard) => guard,
528                            Err(e) => return Outcome::Err(e),
529                        };
530                        // wait_timeout can also return a poisoned error, handle it
531                        let _ = self
532                            .shared
533                            .conn_available
534                            .wait_timeout(inner, wait_time)
535                            .map_err(|_| {
536                                tracing::error!("Pool mutex poisoned during wait_timeout");
537                            });
538                    }
539
540                    // Decrement waiter count after waking
541                    {
542                        if let Ok(mut inner) = self.shared.lock_or_error("acquire_wake") {
543                            inner.waiter_count = inner.waiter_count.saturating_sub(1);
544                        }
545                    }
546
547                    // Loop back to try again
548                }
549            }
550        }
551    }
552
553    /// Validate a connection and wrap it in a PooledConnection.
554    async fn validate_and_wrap(
555        &self,
556        cx: &Cx,
557        meta: ConnectionMeta<C>,
558        test_on_checkout: bool,
559    ) -> Outcome<PooledConnection<C>, Error> {
560        if test_on_checkout {
561            // Validate the connection
562            match meta.conn.ping(cx).await {
563                Outcome::Ok(()) => {
564                    self.shared.acquires.fetch_add(1, Ordering::Relaxed);
565                    Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
566                }
567                Outcome::Err(_) | Outcome::Cancelled(_) | Outcome::Panicked(_) => {
568                    // Connection is invalid, decrement counts and try again
569                    {
570                        if let Ok(mut inner) = self.shared.lock_or_error("validate_cleanup") {
571                            inner.total_count -= 1;
572                            inner.active_count -= 1;
573                        }
574                    }
575                    self.shared
576                        .connections_closed
577                        .fetch_add(1, Ordering::Relaxed);
578                    // Return error - caller should retry
579                    Outcome::Err(Error::Connection(ConnectionError {
580                        kind: ConnectionErrorKind::Disconnected,
581                        message: "connection validation failed".to_string(),
582                        source: None,
583                    }))
584                }
585            }
586        } else {
587            self.shared.acquires.fetch_add(1, Ordering::Relaxed);
588            Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
589        }
590    }
591
592    /// Close the pool, preventing new connections and closing all idle connections.
593    ///
594    /// If the pool mutex is poisoned, this logs an error but still wakes waiters.
595    pub fn close(&self) {
596        match self.shared.inner.lock() {
597            Ok(mut inner) => {
598                inner.closed = true;
599
600                // Close all idle connections
601                let idle_count = inner.idle.len();
602                inner.idle.clear();
603                inner.total_count -= idle_count;
604                self.shared
605                    .connections_closed
606                    .fetch_add(idle_count as u64, Ordering::Relaxed);
607                drop(inner);
608            }
609            Err(poisoned) => {
610                // Recover from poisoning - we still want to mark the pool as closed
611                // and wake waiters even if counts may be inconsistent.
612                tracing::error!(
613                    "Pool mutex poisoned during close; attempting recovery. \
614                     Pool state may be inconsistent."
615                );
616                let mut inner = poisoned.into_inner();
617                inner.closed = true;
618                let idle_count = inner.idle.len();
619                inner.idle.clear();
620                inner.total_count -= idle_count;
621                self.shared
622                    .connections_closed
623                    .fetch_add(idle_count as u64, Ordering::Relaxed);
624            }
625        }
626
627        // Wake all waiters so they see the pool is closed
628        self.shared.conn_available.notify_all();
629    }
630
631    /// Get the number of idle connections.
632    #[must_use]
633    pub fn idle_count(&self) -> usize {
634        let inner = self.shared.lock_or_recover();
635        inner.idle.len()
636    }
637
638    /// Get the number of active connections.
639    #[must_use]
640    pub fn active_count(&self) -> usize {
641        let inner = self.shared.lock_or_recover();
642        inner.active_count
643    }
644
645    /// Get the total number of connections.
646    #[must_use]
647    pub fn total_count(&self) -> usize {
648        let inner = self.shared.lock_or_recover();
649        inner.total_count
650    }
651}
652
653/// Action to take when acquiring a connection.
654enum AcquireAction<C> {
655    /// Pool is closed
656    PoolClosed,
657    /// Found an existing connection to validate
658    ValidateExisting(ConnectionMeta<C>),
659    /// Create a new connection
660    CreateNew,
661    /// Wait for a connection to become available
662    Wait,
663}
664
665/// A connection borrowed from the pool.
666///
667/// When dropped, the connection is automatically returned to the pool.
668/// The connection can be used via `Deref` and `DerefMut`.
669pub struct PooledConnection<C: Connection> {
670    /// The connection metadata (Some while held, None after return)
671    meta: Option<ConnectionMeta<C>>,
672    /// Weak reference to pool for returning
673    pool: Weak<PoolShared<C>>,
674}
675
676impl<C: Connection> PooledConnection<C> {
677    fn new(meta: ConnectionMeta<C>, pool: Weak<PoolShared<C>>) -> Self {
678        Self {
679            meta: Some(meta),
680            pool,
681        }
682    }
683
684    /// Detach this connection from the pool.
685    ///
686    /// The connection will not be returned to the pool when dropped.
687    /// This is useful when you need to close a connection explicitly.
688    pub fn detach(mut self) -> C {
689        if let Some(pool) = self.pool.upgrade() {
690            // Try to update pool counters, but don't panic if mutex is poisoned.
691            // The connection is being detached anyway, so counts being off is acceptable.
692            match pool.inner.lock() {
693                Ok(mut inner) => {
694                    inner.total_count -= 1;
695                    inner.active_count -= 1;
696                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
697                }
698                Err(_poisoned) => {
699                    tracing::error!(
700                        "Pool mutex poisoned during detach; pool counters will be inconsistent"
701                    );
702                    // Still increment the atomic counter for tracking
703                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
704                }
705            }
706        }
707        self.meta.take().expect("connection already detached").conn
708    }
709
710    /// Get the age of this connection (time since creation).
711    #[must_use]
712    pub fn age(&self) -> Duration {
713        self.meta.as_ref().map_or(Duration::ZERO, |m| m.age())
714    }
715
716    /// Get the idle time of this connection (time since last use).
717    #[must_use]
718    pub fn idle_time(&self) -> Duration {
719        self.meta.as_ref().map_or(Duration::ZERO, |m| m.idle_time())
720    }
721}
722
723impl<C: Connection> std::ops::Deref for PooledConnection<C> {
724    type Target = C;
725
726    fn deref(&self) -> &Self::Target {
727        &self
728            .meta
729            .as_ref()
730            .expect("connection already returned to pool")
731            .conn
732    }
733}
734
735impl<C: Connection> std::ops::DerefMut for PooledConnection<C> {
736    fn deref_mut(&mut self) -> &mut Self::Target {
737        &mut self
738            .meta
739            .as_mut()
740            .expect("connection already returned to pool")
741            .conn
742    }
743}
744
745impl<C: Connection> Drop for PooledConnection<C> {
746    fn drop(&mut self) {
747        if let Some(mut meta) = self.meta.take() {
748            meta.touch(); // Update last used time
749            if let Some(pool) = self.pool.upgrade() {
750                // Return to pool - but if mutex is poisoned, we must not panic in Drop.
751                // Instead, log the error and leak the connection.
752                let mut inner = match pool.inner.lock() {
753                    Ok(guard) => guard,
754                    Err(_poisoned) => {
755                        tracing::error!(
756                            "Pool mutex poisoned during connection return; \
757                             connection will be leaked. A thread panicked while holding the lock."
758                        );
759                        // Connection is leaked - we can't safely return it or update counts.
760                        // The pool is likely in a bad state anyway.
761                        return;
762                    }
763                };
764
765                if inner.closed {
766                    inner.total_count -= 1;
767                    inner.active_count -= 1;
768                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
769                    return;
770                }
771
772                // Check max lifetime
773                let max_lifetime = Duration::from_millis(inner.config.max_lifetime_ms);
774                if meta.age() > max_lifetime {
775                    inner.total_count -= 1;
776                    inner.active_count -= 1;
777                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
778                    return;
779                }
780
781                inner.active_count -= 1;
782                inner.idle.push_back(meta);
783
784                drop(inner);
785                pool.conn_available.notify_one();
786            }
787            // If pool is gone, connection is just dropped
788        }
789    }
790}
791
792impl<C: Connection + std::fmt::Debug> std::fmt::Debug for PooledConnection<C> {
793    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
794        f.debug_struct("PooledConnection")
795            .field("conn", &self.meta.as_ref().map(|m| &m.conn))
796            .field("age", &self.age())
797            .field("idle_time", &self.idle_time())
798            .finish_non_exhaustive()
799    }
800}
801
802#[cfg(test)]
803mod tests {
804    use super::*;
805    use sqlmodel_core::connection::{IsolationLevel, PreparedStatement, TransactionOps};
806    use sqlmodel_core::{Row, Value};
807    use std::sync::atomic::AtomicBool;
808
809    /// A mock connection for testing pool behavior.
810    #[derive(Debug)]
811    struct MockConnection {
812        id: u32,
813        ping_should_fail: Arc<AtomicBool>,
814    }
815
816    impl MockConnection {
817        fn new(id: u32) -> Self {
818            Self {
819                id,
820                ping_should_fail: Arc::new(AtomicBool::new(false)),
821            }
822        }
823
824        #[allow(dead_code)]
825        fn with_ping_behavior(id: u32, should_fail: Arc<AtomicBool>) -> Self {
826            Self {
827                id,
828                ping_should_fail: should_fail,
829            }
830        }
831    }
832
833    /// Mock transaction for MockConnection.
834    struct MockTx;
835
836    impl TransactionOps for MockTx {
837        async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
838            Outcome::Ok(vec![])
839        }
840
841        async fn query_one(
842            &self,
843            _cx: &Cx,
844            _sql: &str,
845            _params: &[Value],
846        ) -> Outcome<Option<Row>, Error> {
847            Outcome::Ok(None)
848        }
849
850        async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
851            Outcome::Ok(0)
852        }
853
854        async fn savepoint(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
855            Outcome::Ok(())
856        }
857
858        async fn rollback_to(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
859            Outcome::Ok(())
860        }
861
862        async fn release(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
863            Outcome::Ok(())
864        }
865
866        async fn commit(self, _cx: &Cx) -> Outcome<(), Error> {
867            Outcome::Ok(())
868        }
869
870        async fn rollback(self, _cx: &Cx) -> Outcome<(), Error> {
871            Outcome::Ok(())
872        }
873    }
874
875    impl Connection for MockConnection {
876        type Tx<'conn> = MockTx;
877
878        async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
879            Outcome::Ok(vec![])
880        }
881
882        async fn query_one(
883            &self,
884            _cx: &Cx,
885            _sql: &str,
886            _params: &[Value],
887        ) -> Outcome<Option<Row>, Error> {
888            Outcome::Ok(None)
889        }
890
891        async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
892            Outcome::Ok(0)
893        }
894
895        async fn insert(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<i64, Error> {
896            Outcome::Ok(0)
897        }
898
899        async fn batch(
900            &self,
901            _cx: &Cx,
902            _statements: &[(String, Vec<Value>)],
903        ) -> Outcome<Vec<u64>, Error> {
904            Outcome::Ok(vec![])
905        }
906
907        async fn begin(&self, _cx: &Cx) -> Outcome<Self::Tx<'_>, Error> {
908            Outcome::Ok(MockTx)
909        }
910
911        async fn begin_with(
912            &self,
913            _cx: &Cx,
914            _isolation: IsolationLevel,
915        ) -> Outcome<Self::Tx<'_>, Error> {
916            Outcome::Ok(MockTx)
917        }
918
919        async fn prepare(&self, _cx: &Cx, _sql: &str) -> Outcome<PreparedStatement, Error> {
920            Outcome::Ok(PreparedStatement::new(1, String::new(), 0))
921        }
922
923        async fn query_prepared(
924            &self,
925            _cx: &Cx,
926            _stmt: &PreparedStatement,
927            _params: &[Value],
928        ) -> Outcome<Vec<Row>, Error> {
929            Outcome::Ok(vec![])
930        }
931
932        async fn execute_prepared(
933            &self,
934            _cx: &Cx,
935            _stmt: &PreparedStatement,
936            _params: &[Value],
937        ) -> Outcome<u64, Error> {
938            Outcome::Ok(0)
939        }
940
941        async fn ping(&self, _cx: &Cx) -> Outcome<(), Error> {
942            if self.ping_should_fail.load(Ordering::Relaxed) {
943                Outcome::Err(Error::Connection(ConnectionError {
944                    kind: ConnectionErrorKind::Disconnected,
945                    message: "mock ping failed".to_string(),
946                    source: None,
947                }))
948            } else {
949                Outcome::Ok(())
950            }
951        }
952
953        async fn close(self, _cx: &Cx) -> Result<(), Error> {
954            Ok(())
955        }
956    }
957
958    #[test]
959    fn test_config_default() {
960        let config = PoolConfig::default();
961        assert_eq!(config.min_connections, 1);
962        assert_eq!(config.max_connections, 10);
963        assert_eq!(config.idle_timeout_ms, 600_000);
964        assert_eq!(config.acquire_timeout_ms, 30_000);
965        assert_eq!(config.max_lifetime_ms, 1_800_000);
966        assert!(config.test_on_checkout);
967        assert!(!config.test_on_return);
968    }
969
970    #[test]
971    fn test_config_builder() {
972        let config = PoolConfig::new(20)
973            .min_connections(5)
974            .idle_timeout(60_000)
975            .acquire_timeout(5_000)
976            .max_lifetime(300_000)
977            .test_on_checkout(false)
978            .test_on_return(true);
979
980        assert_eq!(config.min_connections, 5);
981        assert_eq!(config.max_connections, 20);
982        assert_eq!(config.idle_timeout_ms, 60_000);
983        assert_eq!(config.acquire_timeout_ms, 5_000);
984        assert_eq!(config.max_lifetime_ms, 300_000);
985        assert!(!config.test_on_checkout);
986        assert!(config.test_on_return);
987    }
988
989    #[test]
990    fn test_config_clone() {
991        let config = PoolConfig::new(15).min_connections(3);
992        let cloned = config.clone();
993        assert_eq!(config.max_connections, cloned.max_connections);
994        assert_eq!(config.min_connections, cloned.min_connections);
995    }
996
997    #[test]
998    fn test_stats_default() {
999        let stats = PoolStats::default();
1000        assert_eq!(stats.total_connections, 0);
1001        assert_eq!(stats.idle_connections, 0);
1002        assert_eq!(stats.active_connections, 0);
1003        assert_eq!(stats.pending_requests, 0);
1004        assert_eq!(stats.connections_created, 0);
1005        assert_eq!(stats.connections_closed, 0);
1006        assert_eq!(stats.acquires, 0);
1007        assert_eq!(stats.timeouts, 0);
1008    }
1009
1010    #[test]
1011    fn test_stats_clone() {
1012        let stats = PoolStats {
1013            total_connections: 5,
1014            acquires: 100,
1015            ..Default::default()
1016        };
1017        let cloned = stats.clone();
1018        assert_eq!(stats.total_connections, cloned.total_connections);
1019        assert_eq!(stats.acquires, cloned.acquires);
1020    }
1021
1022    #[test]
1023    fn test_connection_meta_timing() {
1024        use std::thread;
1025
1026        // Create a dummy type for testing
1027        struct DummyConn;
1028
1029        let meta = ConnectionMeta::new(DummyConn);
1030        let initial_age = meta.age();
1031
1032        // Small sleep to ensure time passes
1033        thread::sleep(Duration::from_millis(10));
1034
1035        // Age should have increased
1036        assert!(meta.age() > initial_age);
1037        assert!(meta.idle_time() > Duration::ZERO);
1038    }
1039
1040    #[test]
1041    fn test_connection_meta_touch() {
1042        use std::thread;
1043
1044        struct DummyConn;
1045
1046        let mut meta = ConnectionMeta::new(DummyConn);
1047
1048        // Small sleep to build up some idle time
1049        thread::sleep(Duration::from_millis(10));
1050        let idle_before_touch = meta.idle_time();
1051        assert!(idle_before_touch > Duration::ZERO);
1052
1053        // Touch should reset idle time
1054        meta.touch();
1055        let idle_after_touch = meta.idle_time();
1056
1057        // After touch, idle time should be very small (less than before)
1058        assert!(idle_after_touch < idle_before_touch);
1059    }
1060
1061    #[test]
1062    fn test_pool_new() {
1063        let config = PoolConfig::new(5);
1064        let pool: Pool<MockConnection> = Pool::new(config);
1065
1066        // New pool should be empty
1067        assert_eq!(pool.idle_count(), 0);
1068        assert_eq!(pool.active_count(), 0);
1069        assert_eq!(pool.total_count(), 0);
1070        assert!(!pool.is_closed());
1071        assert!(!pool.at_capacity());
1072    }
1073
1074    #[test]
1075    fn test_pool_config() {
1076        let config = PoolConfig::new(7).min_connections(2);
1077        let pool: Pool<MockConnection> = Pool::new(config);
1078
1079        let retrieved_config = pool.config();
1080        assert_eq!(retrieved_config.max_connections, 7);
1081        assert_eq!(retrieved_config.min_connections, 2);
1082    }
1083
1084    #[test]
1085    fn test_pool_stats_initial() {
1086        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1087
1088        let stats = pool.stats();
1089        assert_eq!(stats.total_connections, 0);
1090        assert_eq!(stats.idle_connections, 0);
1091        assert_eq!(stats.active_connections, 0);
1092        assert_eq!(stats.pending_requests, 0);
1093        assert_eq!(stats.connections_created, 0);
1094        assert_eq!(stats.connections_closed, 0);
1095        assert_eq!(stats.acquires, 0);
1096        assert_eq!(stats.timeouts, 0);
1097    }
1098
1099    #[test]
1100    fn test_pool_close() {
1101        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1102
1103        assert!(!pool.is_closed());
1104        pool.close();
1105        assert!(pool.is_closed());
1106    }
1107
1108    #[test]
1109    fn test_pool_inner_can_create_new() {
1110        let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(3));
1111
1112        // Initially can create new
1113        assert!(inner.can_create_new());
1114
1115        // At capacity
1116        inner.total_count = 3;
1117        assert!(!inner.can_create_new());
1118
1119        // Below capacity again
1120        inner.total_count = 2;
1121        assert!(inner.can_create_new());
1122
1123        // Closed pool
1124        inner.closed = true;
1125        assert!(!inner.can_create_new());
1126    }
1127
1128    #[test]
1129    fn test_pool_inner_stats() {
1130        let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(10));
1131
1132        inner.total_count = 5;
1133        inner.active_count = 3;
1134        inner.waiter_count = 2;
1135        inner
1136            .idle
1137            .push_back(ConnectionMeta::new(MockConnection::new(1)));
1138        inner
1139            .idle
1140            .push_back(ConnectionMeta::new(MockConnection::new(2)));
1141
1142        let stats = inner.stats();
1143        assert_eq!(stats.total_connections, 5);
1144        assert_eq!(stats.idle_connections, 2);
1145        assert_eq!(stats.active_connections, 3);
1146        assert_eq!(stats.pending_requests, 2);
1147    }
1148
1149    #[test]
1150    fn test_pooled_connection_age_and_idle_time() {
1151        use std::thread;
1152
1153        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1154
1155        // Properly initialize pool state as if acquire happened
1156        {
1157            let mut inner = pool.shared.inner.lock().unwrap();
1158            inner.total_count = 1;
1159            inner.active_count = 1;
1160        }
1161
1162        let meta = ConnectionMeta::new(MockConnection::new(1));
1163        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1164
1165        // Should have some small positive age
1166        assert!(pooled.age() >= Duration::ZERO);
1167
1168        thread::sleep(Duration::from_millis(5));
1169        assert!(pooled.age() > Duration::ZERO);
1170    }
1171
1172    #[test]
1173    fn test_pooled_connection_detach() {
1174        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1175
1176        // Manually add a connection to simulate acquire
1177        {
1178            let mut inner = pool.shared.inner.lock().unwrap();
1179            inner.total_count = 1;
1180            inner.active_count = 1;
1181        }
1182
1183        let meta = ConnectionMeta::new(MockConnection::new(42));
1184        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1185
1186        // Verify counts before detach
1187        assert_eq!(pool.total_count(), 1);
1188        assert_eq!(pool.active_count(), 1);
1189
1190        // Detach returns the connection
1191        let conn = pooled.detach();
1192        assert_eq!(conn.id, 42);
1193
1194        // After detach, counts should be decremented
1195        assert_eq!(pool.total_count(), 0);
1196        assert_eq!(pool.active_count(), 0);
1197
1198        // connections_closed should be incremented
1199        let stats = pool.stats();
1200        assert_eq!(stats.connections_closed, 1);
1201    }
1202
1203    #[test]
1204    fn test_pooled_connection_drop_returns_to_pool() {
1205        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1206
1207        // Manually set up pool state as if we acquired a connection
1208        {
1209            let mut inner = pool.shared.inner.lock().unwrap();
1210            inner.total_count = 1;
1211            inner.active_count = 1;
1212        }
1213
1214        let meta = ConnectionMeta::new(MockConnection::new(1));
1215        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1216
1217        // While held, active=1, idle=0
1218        assert_eq!(pool.active_count(), 1);
1219        assert_eq!(pool.idle_count(), 0);
1220
1221        // Drop the connection
1222        drop(pooled);
1223
1224        // After drop, active=0, idle=1 (returned to pool)
1225        assert_eq!(pool.active_count(), 0);
1226        assert_eq!(pool.idle_count(), 1);
1227        assert_eq!(pool.total_count(), 1); // Total unchanged
1228    }
1229
1230    #[test]
1231    fn test_pooled_connection_drop_when_pool_closed() {
1232        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1233
1234        // Set up pool state
1235        {
1236            let mut inner = pool.shared.inner.lock().unwrap();
1237            inner.total_count = 1;
1238            inner.active_count = 1;
1239        }
1240
1241        let meta = ConnectionMeta::new(MockConnection::new(1));
1242        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1243
1244        // Close the pool while connection is out
1245        pool.close();
1246
1247        // Drop the connection
1248        drop(pooled);
1249
1250        // Connection should not be returned to idle (pool is closed)
1251        assert_eq!(pool.idle_count(), 0);
1252        assert_eq!(pool.active_count(), 0);
1253        assert_eq!(pool.total_count(), 0);
1254
1255        // Connection was closed
1256        assert_eq!(pool.stats().connections_closed, 1);
1257    }
1258
1259    #[test]
1260    fn test_pooled_connection_deref() {
1261        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1262
1263        // Properly initialize pool state as if acquire happened
1264        {
1265            let mut inner = pool.shared.inner.lock().unwrap();
1266            inner.total_count = 1;
1267            inner.active_count = 1;
1268        }
1269
1270        let meta = ConnectionMeta::new(MockConnection::new(99));
1271        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1272
1273        // Deref should give access to the connection's id
1274        assert_eq!(pooled.id, 99);
1275    }
1276
1277    #[test]
1278    fn test_pooled_connection_deref_mut() {
1279        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1280
1281        // Properly initialize pool state as if acquire happened
1282        {
1283            let mut inner = pool.shared.inner.lock().unwrap();
1284            inner.total_count = 1;
1285            inner.active_count = 1;
1286        }
1287
1288        let meta = ConnectionMeta::new(MockConnection::new(1));
1289        let mut pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1290
1291        // DerefMut should allow mutation
1292        pooled.id = 50;
1293        assert_eq!(pooled.id, 50);
1294    }
1295
1296    #[test]
1297    fn test_pooled_connection_debug() {
1298        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1299
1300        // Properly initialize pool state as if acquire happened
1301        {
1302            let mut inner = pool.shared.inner.lock().unwrap();
1303            inner.total_count = 1;
1304            inner.active_count = 1;
1305        }
1306
1307        let meta = ConnectionMeta::new(MockConnection::new(1));
1308        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1309
1310        let debug_str = format!("{:?}", pooled);
1311        assert!(debug_str.contains("PooledConnection"));
1312        assert!(debug_str.contains("age"));
1313    }
1314
1315    #[test]
1316    fn test_pool_at_capacity() {
1317        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(2));
1318
1319        assert!(!pool.at_capacity());
1320
1321        // Simulate connections being created
1322        {
1323            let mut inner = pool.shared.inner.lock().unwrap();
1324            inner.total_count = 1;
1325        }
1326        assert!(!pool.at_capacity());
1327
1328        {
1329            let mut inner = pool.shared.inner.lock().unwrap();
1330            inner.total_count = 2;
1331        }
1332        assert!(pool.at_capacity());
1333    }
1334
1335    #[test]
1336    fn test_acquire_action_enum() {
1337        // Verify the enum variants exist and can be pattern-matched
1338        let closed: AcquireAction<MockConnection> = AcquireAction::PoolClosed;
1339        assert!(matches!(closed, AcquireAction::PoolClosed));
1340
1341        let create: AcquireAction<MockConnection> = AcquireAction::CreateNew;
1342        assert!(matches!(create, AcquireAction::CreateNew));
1343
1344        let wait: AcquireAction<MockConnection> = AcquireAction::Wait;
1345        assert!(matches!(wait, AcquireAction::Wait));
1346
1347        let meta = ConnectionMeta::new(MockConnection::new(1));
1348        let validate: AcquireAction<MockConnection> = AcquireAction::ValidateExisting(meta);
1349        assert!(matches!(validate, AcquireAction::ValidateExisting(_)));
1350    }
1351
1352    #[test]
1353    fn test_pool_shared_atomic_counters() {
1354        let shared = PoolShared::<MockConnection>::new(PoolConfig::new(5));
1355
1356        // Initial values should be 0
1357        assert_eq!(shared.connections_created.load(Ordering::Relaxed), 0);
1358        assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 0);
1359        assert_eq!(shared.acquires.load(Ordering::Relaxed), 0);
1360        assert_eq!(shared.timeouts.load(Ordering::Relaxed), 0);
1361
1362        // Test incrementing
1363        shared.connections_created.fetch_add(1, Ordering::Relaxed);
1364        shared.connections_closed.fetch_add(2, Ordering::Relaxed);
1365        shared.acquires.fetch_add(10, Ordering::Relaxed);
1366        shared.timeouts.fetch_add(3, Ordering::Relaxed);
1367
1368        assert_eq!(shared.connections_created.load(Ordering::Relaxed), 1);
1369        assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 2);
1370        assert_eq!(shared.acquires.load(Ordering::Relaxed), 10);
1371        assert_eq!(shared.timeouts.load(Ordering::Relaxed), 3);
1372    }
1373
1374    #[test]
1375    fn test_pool_close_clears_idle() {
1376        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1377
1378        // Add some idle connections
1379        {
1380            let mut inner = pool.shared.inner.lock().unwrap();
1381            inner.total_count = 3;
1382            inner
1383                .idle
1384                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1385            inner
1386                .idle
1387                .push_back(ConnectionMeta::new(MockConnection::new(2)));
1388            inner
1389                .idle
1390                .push_back(ConnectionMeta::new(MockConnection::new(3)));
1391        }
1392
1393        assert_eq!(pool.idle_count(), 3);
1394        assert_eq!(pool.total_count(), 3);
1395
1396        pool.close();
1397
1398        // After close, idle connections should be cleared
1399        assert_eq!(pool.idle_count(), 0);
1400        assert_eq!(pool.total_count(), 0);
1401        assert!(pool.is_closed());
1402
1403        // connections_closed should reflect the 3 idle connections
1404        assert_eq!(pool.stats().connections_closed, 3);
1405    }
1406
1407    // ==================== Lock Poisoning Safety Tests ====================
1408    //
1409    // These tests verify that the pool correctly handles mutex poisoning,
1410    // which occurs when a thread panics while holding the lock.
1411    //
1412    // Tier 1 (mutations): Return Error if poisoned
1413    // Tier 2 (read-only): Recover and return valid data
1414    // Tier 3 (Drop): Log error and leak connection (don't panic)
1415
1416    /// Helper to poison a pool's mutex by panicking while holding the lock.
1417    ///
1418    /// Returns the pool with a poisoned mutex.
1419    fn poison_pool_mutex() -> Pool<MockConnection> {
1420        use std::panic;
1421        use std::thread;
1422
1423        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1424
1425        // Set up some valid state before poisoning
1426        {
1427            let mut inner = pool.shared.inner.lock().unwrap();
1428            inner.total_count = 2;
1429            inner.active_count = 1;
1430            inner
1431                .idle
1432                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1433        }
1434
1435        // Spawn a thread that will panic while holding the lock
1436        let shared_clone = Arc::clone(&pool.shared);
1437        let handle = thread::spawn(move || {
1438            let _guard = shared_clone.inner.lock().unwrap();
1439            // Panic while holding the lock - this poisons the mutex
1440            panic!("intentional panic to poison mutex");
1441        });
1442
1443        // Wait for the thread to panic (ignore the panic result)
1444        let _ = handle.join();
1445
1446        // Verify the mutex is now poisoned
1447        assert!(pool.shared.inner.lock().is_err());
1448
1449        pool
1450    }
1451
1452    // -------------------- Tier 2: Read-Only Methods --------------------
1453
1454    #[test]
1455    fn test_config_after_poisoning_returns_valid_data() {
1456        let pool = poison_pool_mutex();
1457
1458        // config() should recover and return the configuration
1459        let config = pool.config();
1460        assert_eq!(config.max_connections, 5);
1461    }
1462
1463    #[test]
1464    fn test_stats_after_poisoning_returns_valid_data() {
1465        let pool = poison_pool_mutex();
1466
1467        // stats() should recover and return valid statistics
1468        let stats = pool.stats();
1469        // The state before poisoning was: total=2, active=1, idle=1
1470        assert_eq!(stats.total_connections, 2);
1471        assert_eq!(stats.active_connections, 1);
1472        assert_eq!(stats.idle_connections, 1);
1473    }
1474
1475    #[test]
1476    fn test_at_capacity_after_poisoning() {
1477        let pool = poison_pool_mutex();
1478
1479        // at_capacity() should recover and return correct value
1480        // Pool has 2 connections, max is 5, so not at capacity
1481        assert!(!pool.at_capacity());
1482    }
1483
1484    #[test]
1485    fn test_is_closed_after_poisoning() {
1486        let pool = poison_pool_mutex();
1487
1488        // is_closed() should recover and return correct value
1489        assert!(!pool.is_closed());
1490    }
1491
1492    #[test]
1493    fn test_idle_count_after_poisoning() {
1494        let pool = poison_pool_mutex();
1495
1496        // idle_count() should recover and return correct value
1497        assert_eq!(pool.idle_count(), 1);
1498    }
1499
1500    #[test]
1501    fn test_active_count_after_poisoning() {
1502        let pool = poison_pool_mutex();
1503
1504        // active_count() should recover and return correct value
1505        assert_eq!(pool.active_count(), 1);
1506    }
1507
1508    #[test]
1509    fn test_total_count_after_poisoning() {
1510        let pool = poison_pool_mutex();
1511
1512        // total_count() should recover and return correct value
1513        assert_eq!(pool.total_count(), 2);
1514    }
1515
1516    // -------------------- Tier 1: Mutation Methods --------------------
1517
1518    #[test]
1519    fn test_lock_or_error_returns_error_when_poisoned() {
1520        use std::thread;
1521
1522        let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1523
1524        // Poison the mutex
1525        let shared_clone = Arc::clone(&shared);
1526        let handle = thread::spawn(move || {
1527            let _guard = shared_clone.inner.lock().unwrap();
1528            panic!("intentional panic to poison mutex");
1529        });
1530        let _ = handle.join();
1531
1532        // lock_or_error should return an error
1533        let result = shared.lock_or_error("test_operation");
1534
1535        // Verify it's a pool poisoning error
1536        match result {
1537            Err(Error::Pool(pool_err)) => {
1538                assert!(matches!(pool_err.kind, PoolErrorKind::Poisoned));
1539                assert!(pool_err.message.contains("poisoned"));
1540            }
1541            Err(other) => panic!("Expected Pool error, got: {:?}", other),
1542            Ok(_) => panic!("Expected error, got Ok"),
1543        }
1544    }
1545
1546    #[test]
1547    fn test_lock_or_recover_succeeds_when_poisoned() {
1548        use std::thread;
1549
1550        let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1551
1552        // Set up some state
1553        {
1554            let mut inner = shared.inner.lock().unwrap();
1555            inner.total_count = 42;
1556        }
1557
1558        // Poison the mutex
1559        let shared_clone = Arc::clone(&shared);
1560        let handle = thread::spawn(move || {
1561            let _guard = shared_clone.inner.lock().unwrap();
1562            panic!("intentional panic to poison mutex");
1563        });
1564        let _ = handle.join();
1565
1566        // Verify mutex is poisoned
1567        assert!(shared.inner.lock().is_err());
1568
1569        // lock_or_recover should still succeed and provide access to data
1570        let inner = shared.lock_or_recover();
1571        assert_eq!(inner.total_count, 42);
1572    }
1573
1574    #[test]
1575    fn test_close_after_poisoning_recovers_and_closes() {
1576        let pool = poison_pool_mutex();
1577
1578        // close() should recover from poisoning and still close the pool
1579        pool.close();
1580
1581        // After close, the pool should be marked as closed
1582        assert!(pool.is_closed());
1583
1584        // Idle connections should be cleared
1585        assert_eq!(pool.idle_count(), 0);
1586    }
1587
1588    // -------------------- Tier 3: Drop Safety --------------------
1589
1590    #[test]
1591    fn test_drop_pooled_connection_after_poisoning_does_not_panic() {
1592        use std::panic;
1593        use std::thread;
1594
1595        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1596
1597        // Set up a connection that's "checked out"
1598        {
1599            let mut inner = pool.shared.inner.lock().unwrap();
1600            inner.total_count = 1;
1601            inner.active_count = 1;
1602        }
1603
1604        // Create a pooled connection
1605        let meta = ConnectionMeta::new(MockConnection::new(1));
1606        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1607
1608        // Poison the mutex by panicking in another thread
1609        let shared_clone = Arc::clone(&pool.shared);
1610        let handle = thread::spawn(move || {
1611            let _guard = shared_clone.inner.lock().unwrap();
1612            panic!("intentional panic to poison mutex");
1613        });
1614        let _ = handle.join();
1615
1616        // Verify mutex is poisoned
1617        assert!(pool.shared.inner.lock().is_err());
1618
1619        // Drop the pooled connection - should NOT panic
1620        // The connection will be leaked, but that's the correct behavior
1621        let drop_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1622            drop(pooled);
1623        }));
1624
1625        // Dropping should not panic
1626        assert!(
1627            drop_result.is_ok(),
1628            "Dropping PooledConnection after mutex poisoning should not panic"
1629        );
1630    }
1631
1632    #[test]
1633    fn test_detach_after_poisoning_does_not_panic() {
1634        use std::panic;
1635        use std::thread;
1636
1637        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1638
1639        // Set up a connection that's "checked out"
1640        {
1641            let mut inner = pool.shared.inner.lock().unwrap();
1642            inner.total_count = 1;
1643            inner.active_count = 1;
1644        }
1645
1646        // Create a pooled connection
1647        let meta = ConnectionMeta::new(MockConnection::new(42));
1648        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1649
1650        // Poison the mutex
1651        let shared_clone = Arc::clone(&pool.shared);
1652        let handle = thread::spawn(move || {
1653            let _guard = shared_clone.inner.lock().unwrap();
1654            panic!("intentional panic to poison mutex");
1655        });
1656        let _ = handle.join();
1657
1658        // Verify mutex is poisoned
1659        assert!(pool.shared.inner.lock().is_err());
1660
1661        // Detach should not panic, even though it can't update counters
1662        let detach_result = panic::catch_unwind(panic::AssertUnwindSafe(|| pooled.detach()));
1663
1664        assert!(
1665            detach_result.is_ok(),
1666            "detach() after mutex poisoning should not panic"
1667        );
1668
1669        // Should still get the connection back
1670        let conn = detach_result.unwrap();
1671        assert_eq!(conn.id, 42);
1672    }
1673
1674    // -------------------- Integration: Pool Survives Thread Panic --------------------
1675
1676    #[test]
1677    fn test_pool_survives_thread_panic_during_acquire() {
1678        use std::thread;
1679
1680        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1681        let pool_arc = Arc::new(pool);
1682
1683        // Simulate a thread that acquires, does work, then panics
1684        // The connection should be leaked but pool should remain usable for reads
1685        let pool_clone = Arc::clone(&pool_arc);
1686        let handle = thread::spawn(move || {
1687            // Manually simulate having acquired a connection
1688            {
1689                let mut inner = pool_clone.shared.inner.lock().unwrap();
1690                inner.total_count = 1;
1691                inner.active_count = 1;
1692            }
1693
1694            // Now panic while "using" the connection
1695            // (In real code, this would be while holding a PooledConnection)
1696            let _guard = pool_clone.shared.inner.lock().unwrap();
1697            panic!("simulated panic during database operation");
1698        });
1699
1700        // Wait for thread to panic
1701        let _ = handle.join();
1702
1703        // Pool's mutex is now poisoned, but read-only methods should still work
1704        assert_eq!(pool_arc.total_count(), 1);
1705        assert_eq!(pool_arc.config().max_connections, 5);
1706
1707        // Stats should be recoverable
1708        let stats = pool_arc.stats();
1709        assert_eq!(stats.total_connections, 1);
1710    }
1711
1712    #[test]
1713    fn test_pool_close_after_thread_panic() {
1714        use std::thread;
1715
1716        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1717
1718        // Add some idle connections
1719        {
1720            let mut inner = pool.shared.inner.lock().unwrap();
1721            inner.total_count = 2;
1722            inner
1723                .idle
1724                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1725            inner
1726                .idle
1727                .push_back(ConnectionMeta::new(MockConnection::new(2)));
1728        }
1729
1730        // Poison the mutex
1731        let shared_clone = Arc::clone(&pool.shared);
1732        let handle = thread::spawn(move || {
1733            let _guard = shared_clone.inner.lock().unwrap();
1734            panic!("intentional panic");
1735        });
1736        let _ = handle.join();
1737
1738        // close() should recover and still work
1739        pool.close();
1740
1741        // Pool should be closed and idle connections cleared
1742        assert!(pool.is_closed());
1743        assert_eq!(pool.idle_count(), 0);
1744    }
1745
1746    #[test]
1747    fn test_multiple_reads_after_poisoning() {
1748        let pool = poison_pool_mutex();
1749
1750        // Multiple read operations should all succeed
1751        for _ in 0..10 {
1752            let _ = pool.config();
1753            let _ = pool.stats();
1754            let _ = pool.at_capacity();
1755            let _ = pool.is_closed();
1756            let _ = pool.idle_count();
1757            let _ = pool.active_count();
1758            let _ = pool.total_count();
1759        }
1760
1761        // All reads should have recovered successfully
1762        assert_eq!(pool.total_count(), 2);
1763    }
1764
1765    #[test]
1766    fn test_waiters_count_after_poisoning() {
1767        use std::thread;
1768
1769        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1770
1771        // Set up waiter count
1772        {
1773            let mut inner = pool.shared.inner.lock().unwrap();
1774            inner.waiter_count = 3;
1775        }
1776
1777        // Poison the mutex
1778        let shared_clone = Arc::clone(&pool.shared);
1779        let handle = thread::spawn(move || {
1780            let _guard = shared_clone.inner.lock().unwrap();
1781            panic!("intentional panic");
1782        });
1783        let _ = handle.join();
1784
1785        // stats() should recover and show correct waiter count
1786        let stats = pool.stats();
1787        assert_eq!(stats.pending_requests, 3);
1788    }
1789}