Skip to main content

sqlmodel_pool/
lib.rs

1//! Connection pooling for SQLModel Rust using asupersync.
2//!
3//! `sqlmodel-pool` is the **connection lifecycle layer**. It provides a generic,
4//! budget-aware pool that integrates with structured concurrency and can wrap any
5//! `Connection` implementation.
6//!
7//! # Role In The Architecture
8//!
9//! - **Shared connection management**: reuse connections across tasks safely.
10//! - **Budget-aware acquisition**: respects `Cx` timeouts and cancellation.
11//! - **Health checks**: validates connections before handing them out.
12//! - **Metrics**: exposes stats for pool sizing and tuning.
13//!
14//! # Features
15//!
16//! - Generic over any `Connection` type
17//! - RAII-based connection return (connections returned on drop)
18//! - Timeout support via `Cx` context
19//! - Connection health validation
20//! - Idle and max lifetime tracking
21//! - Pool statistics
22//!
23//! # Example
24//!
25//! ```rust,ignore
26//! use sqlmodel_pool::{Pool, PoolConfig};
27//!
28//! // Create a pool
29//! let config = PoolConfig::new(10)
30//!     .min_connections(2)
31//!     .acquire_timeout(5000);
32//!
33//! let pool = Pool::new(config, || async {
34//!     // Factory function to create new connections
35//!     PgConnection::connect(&cx, &pg_config).await
36//! });
37//!
38//! // Acquire a connection
39//! let conn = pool.acquire(&cx).await?;
40//!
41//! // Use the connection (automatically returned to pool on drop)
42//! conn.query(&cx, "SELECT 1", &[]).await?;
43//! ```
44
45pub mod replica;
46pub use replica::{ReplicaPool, ReplicaStrategy};
47
48pub mod sharding;
49pub use sharding::{ModuloShardChooser, QueryHints, ShardChooser, ShardedPool, ShardedPoolStats};
50
51use std::collections::VecDeque;
52use std::future::Future;
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::{Arc, Condvar, Mutex, Weak};
55use std::time::{Duration, Instant};
56
57use asupersync::{CancelReason, Cx, Outcome};
58use sqlmodel_core::error::{ConnectionError, ConnectionErrorKind, PoolError, PoolErrorKind};
59use sqlmodel_core::{Connection, Error};
60
61/// Connection pool configuration.
62#[derive(Debug, Clone)]
63pub struct PoolConfig {
64    /// Minimum number of connections to maintain
65    pub min_connections: usize,
66    /// Maximum number of connections allowed
67    pub max_connections: usize,
68    /// Connection idle timeout in milliseconds
69    pub idle_timeout_ms: u64,
70    /// Maximum time to wait for a connection in milliseconds
71    pub acquire_timeout_ms: u64,
72    /// Maximum lifetime of a connection in milliseconds
73    pub max_lifetime_ms: u64,
74    /// Test connections before giving them out
75    pub test_on_checkout: bool,
76    /// Test connections when returning them to the pool
77    pub test_on_return: bool,
78}
79
80impl Default for PoolConfig {
81    fn default() -> Self {
82        Self {
83            min_connections: 1,
84            max_connections: 10,
85            idle_timeout_ms: 600_000,   // 10 minutes
86            acquire_timeout_ms: 30_000, // 30 seconds
87            max_lifetime_ms: 1_800_000, // 30 minutes
88            test_on_checkout: true,
89            test_on_return: false,
90        }
91    }
92}
93
94impl PoolConfig {
95    /// Create a new pool configuration with the given max connections.
96    #[must_use]
97    pub fn new(max_connections: usize) -> Self {
98        Self {
99            max_connections,
100            ..Default::default()
101        }
102    }
103
104    /// Set minimum connections.
105    #[must_use]
106    pub fn min_connections(mut self, n: usize) -> Self {
107        self.min_connections = n;
108        self
109    }
110
111    /// Set idle timeout in milliseconds.
112    #[must_use]
113    pub fn idle_timeout(mut self, ms: u64) -> Self {
114        self.idle_timeout_ms = ms;
115        self
116    }
117
118    /// Set acquire timeout in milliseconds.
119    #[must_use]
120    pub fn acquire_timeout(mut self, ms: u64) -> Self {
121        self.acquire_timeout_ms = ms;
122        self
123    }
124
125    /// Set max lifetime in milliseconds.
126    #[must_use]
127    pub fn max_lifetime(mut self, ms: u64) -> Self {
128        self.max_lifetime_ms = ms;
129        self
130    }
131
132    /// Enable/disable test on checkout.
133    #[must_use]
134    pub fn test_on_checkout(mut self, enabled: bool) -> Self {
135        self.test_on_checkout = enabled;
136        self
137    }
138
139    /// Enable/disable test on return.
140    #[must_use]
141    pub fn test_on_return(mut self, enabled: bool) -> Self {
142        self.test_on_return = enabled;
143        self
144    }
145}
146
147/// Pool statistics.
148#[derive(Debug, Clone, Default)]
149pub struct PoolStats {
150    /// Total number of connections (active + idle)
151    pub total_connections: usize,
152    /// Number of idle connections
153    pub idle_connections: usize,
154    /// Number of active connections (currently in use)
155    pub active_connections: usize,
156    /// Number of pending acquire requests
157    pub pending_requests: usize,
158    /// Total number of connections created
159    pub connections_created: u64,
160    /// Total number of connections closed
161    pub connections_closed: u64,
162    /// Total number of successful acquires
163    pub acquires: u64,
164    /// Total number of acquire timeouts
165    pub timeouts: u64,
166}
167
168/// Metadata about a pooled connection.
169#[derive(Debug)]
170struct ConnectionMeta<C> {
171    /// The actual connection
172    conn: C,
173    /// When this connection was created
174    created_at: Instant,
175    /// When this connection was last used
176    last_used: Instant,
177}
178
179impl<C> ConnectionMeta<C> {
180    fn new(conn: C) -> Self {
181        let now = Instant::now();
182        Self {
183            conn,
184            created_at: now,
185            last_used: now,
186        }
187    }
188
189    fn touch(&mut self) {
190        self.last_used = Instant::now();
191    }
192
193    fn age(&self) -> Duration {
194        self.created_at.elapsed()
195    }
196
197    fn idle_time(&self) -> Duration {
198        self.last_used.elapsed()
199    }
200}
201
202/// Internal pool state shared between pool and connections.
203struct PoolInner<C> {
204    /// Pool configuration
205    config: PoolConfig,
206    /// Idle connections available for use
207    idle: VecDeque<ConnectionMeta<C>>,
208    /// Number of connections currently checked out
209    active_count: usize,
210    /// Total number of connections (idle + active)
211    total_count: usize,
212    /// Number of waiters in the queue
213    waiter_count: usize,
214    /// Whether the pool has been closed
215    closed: bool,
216}
217
218impl<C> PoolInner<C> {
219    fn new(config: PoolConfig) -> Self {
220        Self {
221            config,
222            idle: VecDeque::new(),
223            active_count: 0,
224            total_count: 0,
225            waiter_count: 0,
226            closed: false,
227        }
228    }
229
230    fn can_create_new(&self) -> bool {
231        !self.closed && self.total_count < self.config.max_connections
232    }
233
234    fn stats(&self) -> PoolStats {
235        PoolStats {
236            total_connections: self.total_count,
237            idle_connections: self.idle.len(),
238            active_connections: self.active_count,
239            pending_requests: self.waiter_count,
240            ..Default::default()
241        }
242    }
243}
244
245/// Shared state wrapper with condition variable for notification.
246struct PoolShared<C> {
247    /// Protected pool state
248    inner: Mutex<PoolInner<C>>,
249    /// Notifies waiters when connections become available
250    conn_available: Condvar,
251    /// Statistics counters (atomic for lock-free reads)
252    connections_created: AtomicU64,
253    connections_closed: AtomicU64,
254    acquires: AtomicU64,
255    timeouts: AtomicU64,
256}
257
258impl<C> PoolShared<C> {
259    fn new(config: PoolConfig) -> Self {
260        Self {
261            inner: Mutex::new(PoolInner::new(config)),
262            conn_available: Condvar::new(),
263            connections_created: AtomicU64::new(0),
264            connections_closed: AtomicU64::new(0),
265            acquires: AtomicU64::new(0),
266            timeouts: AtomicU64::new(0),
267        }
268    }
269
270    /// Lock the inner mutex, recovering from poisoning for read-only access.
271    ///
272    /// A poisoned mutex occurs when a thread panicked while holding the lock.
273    /// The data inside is still valid for reading, so we recover by logging
274    /// and using `into_inner()` to get the guard.
275    ///
276    /// This should only be used for read-only operations where the data is
277    /// always valid regardless of whether a previous operation completed.
278    fn lock_or_recover(&self) -> std::sync::MutexGuard<'_, PoolInner<C>> {
279        self.inner.lock().unwrap_or_else(|poisoned| {
280            tracing::error!(
281                "Pool mutex poisoned; recovering for read-only access. \
282                 A thread panicked while holding the lock."
283            );
284            poisoned.into_inner()
285        })
286    }
287
288    /// Lock the inner mutex, returning an error if poisoned.
289    ///
290    /// Use this for mutation operations where the pool state may be inconsistent
291    /// after a panic. Unlike `lock_or_recover()`, this propagates the error
292    /// to the caller.
293    #[allow(clippy::result_large_err)] // Error type is large by design for rich diagnostics
294    fn lock_or_error(
295        &self,
296        operation: &'static str,
297    ) -> Result<std::sync::MutexGuard<'_, PoolInner<C>>, Error> {
298        self.inner
299            .lock()
300            .map_err(|_| Error::Pool(PoolError::poisoned(operation)))
301    }
302}
303
304/// A connection pool for database connections.
305///
306/// The pool manages a collection of connections, reusing them across
307/// requests to avoid the overhead of establishing new connections.
308///
309/// # Type Parameters
310///
311/// - `C`: The connection type, must implement `Connection`
312///
313/// # Cancellation
314///
315/// Pool operations respect cancellation via the `Cx` context:
316/// - `acquire` will return early if cancellation is requested
317/// - Connections are properly cleaned up on cancellation
318pub struct Pool<C: Connection> {
319    shared: Arc<PoolShared<C>>,
320}
321
322impl<C: Connection> Pool<C> {
323    /// Create a new connection pool with the given configuration.
324    #[must_use]
325    pub fn new(config: PoolConfig) -> Self {
326        Self {
327            shared: Arc::new(PoolShared::new(config)),
328        }
329    }
330
331    /// Get the pool configuration.
332    #[must_use]
333    pub fn config(&self) -> PoolConfig {
334        let inner = self.shared.lock_or_recover();
335        inner.config.clone()
336    }
337
338    /// Get the current pool statistics.
339    #[must_use]
340    pub fn stats(&self) -> PoolStats {
341        let inner = self.shared.lock_or_recover();
342        let mut stats = inner.stats();
343        stats.connections_created = self.shared.connections_created.load(Ordering::Relaxed);
344        stats.connections_closed = self.shared.connections_closed.load(Ordering::Relaxed);
345        stats.acquires = self.shared.acquires.load(Ordering::Relaxed);
346        stats.timeouts = self.shared.timeouts.load(Ordering::Relaxed);
347        stats
348    }
349
350    /// Check if the pool is at capacity.
351    #[must_use]
352    pub fn at_capacity(&self) -> bool {
353        let inner = self.shared.lock_or_recover();
354        inner.total_count >= inner.config.max_connections
355    }
356
357    /// Check if the pool has been closed.
358    #[must_use]
359    pub fn is_closed(&self) -> bool {
360        let inner = self.shared.lock_or_recover();
361        inner.closed
362    }
363
364    /// Acquire a connection from the pool.
365    ///
366    /// This method will:
367    /// 1. Return an idle connection if one is available
368    /// 2. Create a new connection if below capacity
369    /// 3. Wait for a connection to become available (up to timeout)
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if:
374    /// - The pool is closed
375    /// - The acquire timeout is exceeded
376    /// - Cancellation is requested via the `Cx` context
377    /// - Connection validation fails (if `test_on_checkout` is enabled)
378    pub async fn acquire<F, Fut>(&self, cx: &Cx, factory: F) -> Outcome<PooledConnection<C>, Error>
379    where
380        F: Fn() -> Fut,
381        Fut: Future<Output = Outcome<C, Error>>,
382    {
383        let deadline = Instant::now() + Duration::from_millis(self.config().acquire_timeout_ms);
384        let test_on_checkout = self.config().test_on_checkout;
385        let max_lifetime = Duration::from_millis(self.config().max_lifetime_ms);
386        let idle_timeout = Duration::from_millis(self.config().idle_timeout_ms);
387
388        loop {
389            // Check cancellation
390            if cx.is_cancel_requested() {
391                return Outcome::Cancelled(CancelReason::user("pool acquire cancelled"));
392            }
393
394            // Check timeout
395            if Instant::now() >= deadline {
396                self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
397                return Outcome::Err(Error::Pool(PoolError {
398                    kind: PoolErrorKind::Timeout,
399                    message: "acquire timeout: no connections available".to_string(),
400                    source: None,
401                }));
402            }
403
404            // Try to get an idle connection or determine if we can create new
405            let action = {
406                let mut inner = match self.shared.lock_or_error("acquire") {
407                    Ok(guard) => guard,
408                    Err(e) => return Outcome::Err(e),
409                };
410
411                if inner.closed {
412                    AcquireAction::PoolClosed
413                } else {
414                    // Try to get an idle connection
415                    let mut found_conn = None;
416                    while let Some(mut meta) = inner.idle.pop_front() {
417                        // Check if connection is too old
418                        if meta.age() > max_lifetime {
419                            inner.total_count -= 1;
420                            self.shared
421                                .connections_closed
422                                .fetch_add(1, Ordering::Relaxed);
423                            continue;
424                        }
425
426                        // Check if connection has been idle too long
427                        if meta.idle_time() > idle_timeout {
428                            inner.total_count -= 1;
429                            self.shared
430                                .connections_closed
431                                .fetch_add(1, Ordering::Relaxed);
432                            continue;
433                        }
434
435                        // Found a valid connection
436                        meta.touch();
437                        inner.active_count += 1;
438                        found_conn = Some(meta);
439                        break;
440                    }
441
442                    if let Some(meta) = found_conn {
443                        AcquireAction::ValidateExisting(meta)
444                    } else if inner.can_create_new() {
445                        // No idle connections, can we create new?
446                        inner.total_count += 1;
447                        inner.active_count += 1;
448                        AcquireAction::CreateNew
449                    } else {
450                        // Must wait
451                        inner.waiter_count += 1;
452                        AcquireAction::Wait
453                    }
454                }
455            };
456
457            match action {
458                AcquireAction::PoolClosed => {
459                    return Outcome::Err(Error::Pool(PoolError {
460                        kind: PoolErrorKind::Closed,
461                        message: "pool has been closed".to_string(),
462                        source: None,
463                    }));
464                }
465                AcquireAction::ValidateExisting(meta) => {
466                    // Validate and wrap the connection (lock is released)
467                    return self.validate_and_wrap(cx, meta, test_on_checkout).await;
468                }
469                AcquireAction::CreateNew => {
470                    // Create new connection outside of lock
471                    match factory().await {
472                        Outcome::Ok(conn) => {
473                            self.shared
474                                .connections_created
475                                .fetch_add(1, Ordering::Relaxed);
476                            self.shared.acquires.fetch_add(1, Ordering::Relaxed);
477                            let meta = ConnectionMeta::new(conn);
478                            return Outcome::Ok(PooledConnection::new(
479                                meta,
480                                Arc::downgrade(&self.shared),
481                            ));
482                        }
483                        Outcome::Err(e) => {
484                            // Failed to create, decrement counts
485                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
486                                inner.total_count -= 1;
487                                inner.active_count -= 1;
488                            }
489                            // Even if we can't decrement counts, still return the original error
490                            return Outcome::Err(e);
491                        }
492                        Outcome::Cancelled(reason) => {
493                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
494                                inner.total_count -= 1;
495                                inner.active_count -= 1;
496                            }
497                            return Outcome::Cancelled(reason);
498                        }
499                        Outcome::Panicked(info) => {
500                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
501                                inner.total_count -= 1;
502                                inner.active_count -= 1;
503                            }
504                            return Outcome::Panicked(info);
505                        }
506                    }
507                }
508                AcquireAction::Wait => {
509                    // Wait for a connection to become available
510                    let remaining = deadline.saturating_duration_since(Instant::now());
511                    if remaining.is_zero() {
512                        if let Ok(mut inner) = self.shared.lock_or_error("acquire_timeout") {
513                            inner.waiter_count -= 1;
514                        }
515                        self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
516                        return Outcome::Err(Error::Pool(PoolError {
517                            kind: PoolErrorKind::Timeout,
518                            message: "acquire timeout: no connections available".to_string(),
519                            source: None,
520                        }));
521                    }
522
523                    // Wait with timeout (use shorter interval for cancellation checks)
524                    let wait_time = remaining.min(Duration::from_millis(100));
525                    {
526                        let inner = match self.shared.lock_or_error("acquire_wait") {
527                            Ok(guard) => guard,
528                            Err(e) => return Outcome::Err(e),
529                        };
530                        // wait_timeout can also return a poisoned error, handle it
531                        let _ = self
532                            .shared
533                            .conn_available
534                            .wait_timeout(inner, wait_time)
535                            .map_err(|_| {
536                                tracing::error!("Pool mutex poisoned during wait_timeout");
537                            });
538                    }
539
540                    // Decrement waiter count after waking
541                    {
542                        if let Ok(mut inner) = self.shared.lock_or_error("acquire_wake") {
543                            inner.waiter_count = inner.waiter_count.saturating_sub(1);
544                        }
545                    }
546
547                    // Loop back to try again
548                }
549            }
550        }
551    }
552
553    /// Validate a connection and wrap it in a PooledConnection.
554    async fn validate_and_wrap(
555        &self,
556        cx: &Cx,
557        meta: ConnectionMeta<C>,
558        test_on_checkout: bool,
559    ) -> Outcome<PooledConnection<C>, Error> {
560        if test_on_checkout {
561            // Validate the connection
562            match meta.conn.ping(cx).await {
563                Outcome::Ok(()) => {
564                    self.shared.acquires.fetch_add(1, Ordering::Relaxed);
565                    Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
566                }
567                Outcome::Err(_) | Outcome::Cancelled(_) | Outcome::Panicked(_) => {
568                    // Connection is invalid, decrement counts and try again
569                    {
570                        if let Ok(mut inner) = self.shared.lock_or_error("validate_cleanup") {
571                            inner.total_count -= 1;
572                            inner.active_count -= 1;
573                        }
574                    }
575                    self.shared
576                        .connections_closed
577                        .fetch_add(1, Ordering::Relaxed);
578                    // Return error - caller should retry
579                    Outcome::Err(Error::Connection(ConnectionError {
580                        kind: ConnectionErrorKind::Disconnected,
581                        message: "connection validation failed".to_string(),
582                        source: None,
583                    }))
584                }
585            }
586        } else {
587            self.shared.acquires.fetch_add(1, Ordering::Relaxed);
588            Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
589        }
590    }
591
592    /// Close the pool, preventing new connections and closing all idle connections.
593    ///
594    /// If the pool mutex is poisoned, this logs an error but still wakes waiters.
595    pub fn clear_idle(&self) {
596        if let Ok(mut inner) = self.shared.inner.lock() {
597            let idle_count = inner.idle.len();
598            inner.idle.clear();
599            inner.total_count -= idle_count;
600            self.shared.connections_closed.fetch_add(idle_count as u64, Ordering::Relaxed);
601        }
602    }
603
604    pub fn close(&self) {
605        match self.shared.inner.lock() {
606            Ok(mut inner) => {
607                inner.closed = true;
608
609                // Close all idle connections
610                let idle_count = inner.idle.len();
611                inner.idle.clear();
612                inner.total_count -= idle_count;
613                self.shared
614                    .connections_closed
615                    .fetch_add(idle_count as u64, Ordering::Relaxed);
616                drop(inner);
617            }
618            Err(poisoned) => {
619                // Recover from poisoning - we still want to mark the pool as closed
620                // and wake waiters even if counts may be inconsistent.
621                tracing::error!(
622                    "Pool mutex poisoned during close; attempting recovery. \
623                     Pool state may be inconsistent."
624                );
625                let mut inner = poisoned.into_inner();
626                inner.closed = true;
627                let idle_count = inner.idle.len();
628                inner.idle.clear();
629                inner.total_count -= idle_count;
630                self.shared
631                    .connections_closed
632                    .fetch_add(idle_count as u64, Ordering::Relaxed);
633            }
634        }
635
636        // Wake all waiters so they see the pool is closed
637        self.shared.conn_available.notify_all();
638    }
639
640    /// Get the number of idle connections.
641    #[must_use]
642    pub fn idle_count(&self) -> usize {
643        let inner = self.shared.lock_or_recover();
644        inner.idle.len()
645    }
646
647    /// Get the number of active connections.
648    #[must_use]
649    pub fn active_count(&self) -> usize {
650        let inner = self.shared.lock_or_recover();
651        inner.active_count
652    }
653
654    /// Get the total number of connections.
655    #[must_use]
656    pub fn total_count(&self) -> usize {
657        let inner = self.shared.lock_or_recover();
658        inner.total_count
659    }
660}
661
662/// Action to take when acquiring a connection.
663enum AcquireAction<C> {
664    /// Pool is closed
665    PoolClosed,
666    /// Found an existing connection to validate
667    ValidateExisting(ConnectionMeta<C>),
668    /// Create a new connection
669    CreateNew,
670    /// Wait for a connection to become available
671    Wait,
672}
673
674/// A connection borrowed from the pool.
675///
676/// When dropped, the connection is automatically returned to the pool.
677/// The connection can be used via `Deref` and `DerefMut`.
678pub struct PooledConnection<C: Connection> {
679    /// The connection metadata (Some while held, None after return)
680    meta: Option<ConnectionMeta<C>>,
681    /// Weak reference to pool for returning
682    pool: Weak<PoolShared<C>>,
683}
684
685impl<C: Connection> PooledConnection<C> {
686    fn new(meta: ConnectionMeta<C>, pool: Weak<PoolShared<C>>) -> Self {
687        Self {
688            meta: Some(meta),
689            pool,
690        }
691    }
692
693    /// Detach this connection from the pool.
694    ///
695    /// The connection will not be returned to the pool when dropped.
696    /// This is useful when you need to close a connection explicitly.
697    pub fn detach(mut self) -> C {
698        if let Some(pool) = self.pool.upgrade() {
699            // Try to update pool counters, but don't panic if mutex is poisoned.
700            // The connection is being detached anyway, so counts being off is acceptable.
701            match pool.inner.lock() {
702                Ok(mut inner) => {
703                    inner.total_count -= 1;
704                    inner.active_count -= 1;
705                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
706                }
707                Err(_poisoned) => {
708                    tracing::error!(
709                        "Pool mutex poisoned during detach; pool counters will be inconsistent"
710                    );
711                    // Still increment the atomic counter for tracking
712                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
713                }
714            }
715        }
716        self.meta.take().expect("connection already detached").conn
717    }
718
719    /// Get the age of this connection (time since creation).
720    #[must_use]
721    pub fn age(&self) -> Duration {
722        self.meta.as_ref().map_or(Duration::ZERO, |m| m.age())
723    }
724
725    /// Get the idle time of this connection (time since last use).
726    #[must_use]
727    pub fn idle_time(&self) -> Duration {
728        self.meta.as_ref().map_or(Duration::ZERO, |m| m.idle_time())
729    }
730}
731
732impl<C: Connection> std::ops::Deref for PooledConnection<C> {
733    type Target = C;
734
735    fn deref(&self) -> &Self::Target {
736        &self
737            .meta
738            .as_ref()
739            .expect("connection already returned to pool")
740            .conn
741    }
742}
743
744impl<C: Connection> std::ops::DerefMut for PooledConnection<C> {
745    fn deref_mut(&mut self) -> &mut Self::Target {
746        &mut self
747            .meta
748            .as_mut()
749            .expect("connection already returned to pool")
750            .conn
751    }
752}
753
754impl<C: Connection> Drop for PooledConnection<C> {
755    fn drop(&mut self) {
756        if let Some(mut meta) = self.meta.take() {
757            meta.touch(); // Update last used time
758            if let Some(pool) = self.pool.upgrade() {
759                // Return to pool - but if mutex is poisoned, we must not panic in Drop.
760                // Instead, log the error and leak the connection.
761                let mut inner = match pool.inner.lock() {
762                    Ok(guard) => guard,
763                    Err(_poisoned) => {
764                        tracing::error!(
765                            "Pool mutex poisoned during connection return; \
766                             connection will be leaked. A thread panicked while holding the lock."
767                        );
768                        // Connection is leaked - we can't safely return it or update counts.
769                        // The pool is likely in a bad state anyway.
770                        return;
771                    }
772                };
773
774                if inner.closed {
775                    inner.total_count -= 1;
776                    inner.active_count -= 1;
777                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
778                    return;
779                }
780
781                // Check max lifetime
782                let max_lifetime = Duration::from_millis(inner.config.max_lifetime_ms);
783                if meta.age() > max_lifetime {
784                    inner.total_count -= 1;
785                    inner.active_count -= 1;
786                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
787                    return;
788                }
789
790                inner.active_count -= 1;
791                inner.idle.push_back(meta);
792
793                drop(inner);
794                pool.conn_available.notify_one();
795            }
796            // If pool is gone, connection is just dropped
797        }
798    }
799}
800
801impl<C: Connection + std::fmt::Debug> std::fmt::Debug for PooledConnection<C> {
802    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
803        f.debug_struct("PooledConnection")
804            .field("conn", &self.meta.as_ref().map(|m| &m.conn))
805            .field("age", &self.age())
806            .field("idle_time", &self.idle_time())
807            .finish_non_exhaustive()
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814    use sqlmodel_core::connection::{IsolationLevel, PreparedStatement, TransactionOps};
815    use sqlmodel_core::{Row, Value};
816    use std::sync::atomic::AtomicBool;
817
818    /// A mock connection for testing pool behavior.
819    #[derive(Debug)]
820    struct MockConnection {
821        id: u32,
822        ping_should_fail: Arc<AtomicBool>,
823    }
824
825    impl MockConnection {
826        fn new(id: u32) -> Self {
827            Self {
828                id,
829                ping_should_fail: Arc::new(AtomicBool::new(false)),
830            }
831        }
832
833        #[allow(dead_code)]
834        fn with_ping_behavior(id: u32, should_fail: Arc<AtomicBool>) -> Self {
835            Self {
836                id,
837                ping_should_fail: should_fail,
838            }
839        }
840    }
841
842    /// Mock transaction for MockConnection.
843    struct MockTx;
844
845    impl TransactionOps for MockTx {
846        async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
847            Outcome::Ok(vec![])
848        }
849
850        async fn query_one(
851            &self,
852            _cx: &Cx,
853            _sql: &str,
854            _params: &[Value],
855        ) -> Outcome<Option<Row>, Error> {
856            Outcome::Ok(None)
857        }
858
859        async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
860            Outcome::Ok(0)
861        }
862
863        async fn savepoint(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
864            Outcome::Ok(())
865        }
866
867        async fn rollback_to(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
868            Outcome::Ok(())
869        }
870
871        async fn release(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
872            Outcome::Ok(())
873        }
874
875        async fn commit(self, _cx: &Cx) -> Outcome<(), Error> {
876            Outcome::Ok(())
877        }
878
879        async fn rollback(self, _cx: &Cx) -> Outcome<(), Error> {
880            Outcome::Ok(())
881        }
882    }
883
884    impl Connection for MockConnection {
885        type Tx<'conn> = MockTx;
886
887        async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
888            Outcome::Ok(vec![])
889        }
890
891        async fn query_one(
892            &self,
893            _cx: &Cx,
894            _sql: &str,
895            _params: &[Value],
896        ) -> Outcome<Option<Row>, Error> {
897            Outcome::Ok(None)
898        }
899
900        async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
901            Outcome::Ok(0)
902        }
903
904        async fn insert(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<i64, Error> {
905            Outcome::Ok(0)
906        }
907
908        async fn batch(
909            &self,
910            _cx: &Cx,
911            _statements: &[(String, Vec<Value>)],
912        ) -> Outcome<Vec<u64>, Error> {
913            Outcome::Ok(vec![])
914        }
915
916        async fn begin(&self, _cx: &Cx) -> Outcome<Self::Tx<'_>, Error> {
917            Outcome::Ok(MockTx)
918        }
919
920        async fn begin_with(
921            &self,
922            _cx: &Cx,
923            _isolation: IsolationLevel,
924        ) -> Outcome<Self::Tx<'_>, Error> {
925            Outcome::Ok(MockTx)
926        }
927
928        async fn prepare(&self, _cx: &Cx, _sql: &str) -> Outcome<PreparedStatement, Error> {
929            Outcome::Ok(PreparedStatement::new(1, String::new(), 0))
930        }
931
932        async fn query_prepared(
933            &self,
934            _cx: &Cx,
935            _stmt: &PreparedStatement,
936            _params: &[Value],
937        ) -> Outcome<Vec<Row>, Error> {
938            Outcome::Ok(vec![])
939        }
940
941        async fn execute_prepared(
942            &self,
943            _cx: &Cx,
944            _stmt: &PreparedStatement,
945            _params: &[Value],
946        ) -> Outcome<u64, Error> {
947            Outcome::Ok(0)
948        }
949
950        async fn ping(&self, _cx: &Cx) -> Outcome<(), Error> {
951            if self.ping_should_fail.load(Ordering::Relaxed) {
952                Outcome::Err(Error::Connection(ConnectionError {
953                    kind: ConnectionErrorKind::Disconnected,
954                    message: "mock ping failed".to_string(),
955                    source: None,
956                }))
957            } else {
958                Outcome::Ok(())
959            }
960        }
961
962        async fn close(self, _cx: &Cx) -> Result<(), Error> {
963            Ok(())
964        }
965    }
966
967    #[test]
968    fn test_config_default() {
969        let config = PoolConfig::default();
970        assert_eq!(config.min_connections, 1);
971        assert_eq!(config.max_connections, 10);
972        assert_eq!(config.idle_timeout_ms, 600_000);
973        assert_eq!(config.acquire_timeout_ms, 30_000);
974        assert_eq!(config.max_lifetime_ms, 1_800_000);
975        assert!(config.test_on_checkout);
976        assert!(!config.test_on_return);
977    }
978
979    #[test]
980    fn test_config_builder() {
981        let config = PoolConfig::new(20)
982            .min_connections(5)
983            .idle_timeout(60_000)
984            .acquire_timeout(5_000)
985            .max_lifetime(300_000)
986            .test_on_checkout(false)
987            .test_on_return(true);
988
989        assert_eq!(config.min_connections, 5);
990        assert_eq!(config.max_connections, 20);
991        assert_eq!(config.idle_timeout_ms, 60_000);
992        assert_eq!(config.acquire_timeout_ms, 5_000);
993        assert_eq!(config.max_lifetime_ms, 300_000);
994        assert!(!config.test_on_checkout);
995        assert!(config.test_on_return);
996    }
997
998    #[test]
999    fn test_config_clone() {
1000        let config = PoolConfig::new(15).min_connections(3);
1001        let cloned = config.clone();
1002        assert_eq!(config.max_connections, cloned.max_connections);
1003        assert_eq!(config.min_connections, cloned.min_connections);
1004    }
1005
1006    #[test]
1007    fn test_stats_default() {
1008        let stats = PoolStats::default();
1009        assert_eq!(stats.total_connections, 0);
1010        assert_eq!(stats.idle_connections, 0);
1011        assert_eq!(stats.active_connections, 0);
1012        assert_eq!(stats.pending_requests, 0);
1013        assert_eq!(stats.connections_created, 0);
1014        assert_eq!(stats.connections_closed, 0);
1015        assert_eq!(stats.acquires, 0);
1016        assert_eq!(stats.timeouts, 0);
1017    }
1018
1019    #[test]
1020    fn test_stats_clone() {
1021        let stats = PoolStats {
1022            total_connections: 5,
1023            acquires: 100,
1024            ..Default::default()
1025        };
1026        let cloned = stats.clone();
1027        assert_eq!(stats.total_connections, cloned.total_connections);
1028        assert_eq!(stats.acquires, cloned.acquires);
1029    }
1030
1031    #[test]
1032    fn test_connection_meta_timing() {
1033        use std::thread;
1034
1035        // Create a dummy type for testing
1036        struct DummyConn;
1037
1038        let meta = ConnectionMeta::new(DummyConn);
1039        let initial_age = meta.age();
1040
1041        // Small sleep to ensure time passes
1042        thread::sleep(Duration::from_millis(10));
1043
1044        // Age should have increased
1045        assert!(meta.age() > initial_age);
1046        assert!(meta.idle_time() > Duration::ZERO);
1047    }
1048
1049    #[test]
1050    fn test_connection_meta_touch() {
1051        use std::thread;
1052
1053        struct DummyConn;
1054
1055        let mut meta = ConnectionMeta::new(DummyConn);
1056
1057        // Small sleep to build up some idle time
1058        thread::sleep(Duration::from_millis(10));
1059        let idle_before_touch = meta.idle_time();
1060        assert!(idle_before_touch > Duration::ZERO);
1061
1062        // Touch should reset idle time
1063        meta.touch();
1064        let idle_after_touch = meta.idle_time();
1065
1066        // After touch, idle time should be very small (less than before)
1067        assert!(idle_after_touch < idle_before_touch);
1068    }
1069
1070    #[test]
1071    fn test_pool_new() {
1072        let config = PoolConfig::new(5);
1073        let pool: Pool<MockConnection> = Pool::new(config);
1074
1075        // New pool should be empty
1076        assert_eq!(pool.idle_count(), 0);
1077        assert_eq!(pool.active_count(), 0);
1078        assert_eq!(pool.total_count(), 0);
1079        assert!(!pool.is_closed());
1080        assert!(!pool.at_capacity());
1081    }
1082
1083    #[test]
1084    fn test_pool_config() {
1085        let config = PoolConfig::new(7).min_connections(2);
1086        let pool: Pool<MockConnection> = Pool::new(config);
1087
1088        let retrieved_config = pool.config();
1089        assert_eq!(retrieved_config.max_connections, 7);
1090        assert_eq!(retrieved_config.min_connections, 2);
1091    }
1092
1093    #[test]
1094    fn test_pool_stats_initial() {
1095        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1096
1097        let stats = pool.stats();
1098        assert_eq!(stats.total_connections, 0);
1099        assert_eq!(stats.idle_connections, 0);
1100        assert_eq!(stats.active_connections, 0);
1101        assert_eq!(stats.pending_requests, 0);
1102        assert_eq!(stats.connections_created, 0);
1103        assert_eq!(stats.connections_closed, 0);
1104        assert_eq!(stats.acquires, 0);
1105        assert_eq!(stats.timeouts, 0);
1106    }
1107
1108    #[test]
1109    fn test_pool_close() {
1110        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1111
1112        assert!(!pool.is_closed());
1113        pool.close();
1114        assert!(pool.is_closed());
1115    }
1116
1117    #[test]
1118    fn test_pool_inner_can_create_new() {
1119        let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(3));
1120
1121        // Initially can create new
1122        assert!(inner.can_create_new());
1123
1124        // At capacity
1125        inner.total_count = 3;
1126        assert!(!inner.can_create_new());
1127
1128        // Below capacity again
1129        inner.total_count = 2;
1130        assert!(inner.can_create_new());
1131
1132        // Closed pool
1133        inner.closed = true;
1134        assert!(!inner.can_create_new());
1135    }
1136
1137    #[test]
1138    fn test_pool_inner_stats() {
1139        let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(10));
1140
1141        inner.total_count = 5;
1142        inner.active_count = 3;
1143        inner.waiter_count = 2;
1144        inner
1145            .idle
1146            .push_back(ConnectionMeta::new(MockConnection::new(1)));
1147        inner
1148            .idle
1149            .push_back(ConnectionMeta::new(MockConnection::new(2)));
1150
1151        let stats = inner.stats();
1152        assert_eq!(stats.total_connections, 5);
1153        assert_eq!(stats.idle_connections, 2);
1154        assert_eq!(stats.active_connections, 3);
1155        assert_eq!(stats.pending_requests, 2);
1156    }
1157
1158    #[test]
1159    fn test_pooled_connection_age_and_idle_time() {
1160        use std::thread;
1161
1162        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1163
1164        // Properly initialize pool state as if acquire happened
1165        {
1166            let mut inner = pool.shared.inner.lock().unwrap();
1167            inner.total_count = 1;
1168            inner.active_count = 1;
1169        }
1170
1171        let meta = ConnectionMeta::new(MockConnection::new(1));
1172        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1173
1174        // Should have some small positive age
1175        assert!(pooled.age() >= Duration::ZERO);
1176
1177        thread::sleep(Duration::from_millis(5));
1178        assert!(pooled.age() > Duration::ZERO);
1179    }
1180
1181    #[test]
1182    fn test_pooled_connection_detach() {
1183        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1184
1185        // Manually add a connection to simulate acquire
1186        {
1187            let mut inner = pool.shared.inner.lock().unwrap();
1188            inner.total_count = 1;
1189            inner.active_count = 1;
1190        }
1191
1192        let meta = ConnectionMeta::new(MockConnection::new(42));
1193        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1194
1195        // Verify counts before detach
1196        assert_eq!(pool.total_count(), 1);
1197        assert_eq!(pool.active_count(), 1);
1198
1199        // Detach returns the connection
1200        let conn = pooled.detach();
1201        assert_eq!(conn.id, 42);
1202
1203        // After detach, counts should be decremented
1204        assert_eq!(pool.total_count(), 0);
1205        assert_eq!(pool.active_count(), 0);
1206
1207        // connections_closed should be incremented
1208        let stats = pool.stats();
1209        assert_eq!(stats.connections_closed, 1);
1210    }
1211
1212    #[test]
1213    fn test_pooled_connection_drop_returns_to_pool() {
1214        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1215
1216        // Manually set up pool state as if we acquired a connection
1217        {
1218            let mut inner = pool.shared.inner.lock().unwrap();
1219            inner.total_count = 1;
1220            inner.active_count = 1;
1221        }
1222
1223        let meta = ConnectionMeta::new(MockConnection::new(1));
1224        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1225
1226        // While held, active=1, idle=0
1227        assert_eq!(pool.active_count(), 1);
1228        assert_eq!(pool.idle_count(), 0);
1229
1230        // Drop the connection
1231        drop(pooled);
1232
1233        // After drop, active=0, idle=1 (returned to pool)
1234        assert_eq!(pool.active_count(), 0);
1235        assert_eq!(pool.idle_count(), 1);
1236        assert_eq!(pool.total_count(), 1); // Total unchanged
1237    }
1238
1239    #[test]
1240    fn test_pooled_connection_drop_when_pool_closed() {
1241        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1242
1243        // Set up pool state
1244        {
1245            let mut inner = pool.shared.inner.lock().unwrap();
1246            inner.total_count = 1;
1247            inner.active_count = 1;
1248        }
1249
1250        let meta = ConnectionMeta::new(MockConnection::new(1));
1251        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1252
1253        // Close the pool while connection is out
1254        pool.close();
1255
1256        // Drop the connection
1257        drop(pooled);
1258
1259        // Connection should not be returned to idle (pool is closed)
1260        assert_eq!(pool.idle_count(), 0);
1261        assert_eq!(pool.active_count(), 0);
1262        assert_eq!(pool.total_count(), 0);
1263
1264        // Connection was closed
1265        assert_eq!(pool.stats().connections_closed, 1);
1266    }
1267
1268    #[test]
1269    fn test_pooled_connection_deref() {
1270        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1271
1272        // Properly initialize pool state as if acquire happened
1273        {
1274            let mut inner = pool.shared.inner.lock().unwrap();
1275            inner.total_count = 1;
1276            inner.active_count = 1;
1277        }
1278
1279        let meta = ConnectionMeta::new(MockConnection::new(99));
1280        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1281
1282        // Deref should give access to the connection's id
1283        assert_eq!(pooled.id, 99);
1284    }
1285
1286    #[test]
1287    fn test_pooled_connection_deref_mut() {
1288        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1289
1290        // Properly initialize pool state as if acquire happened
1291        {
1292            let mut inner = pool.shared.inner.lock().unwrap();
1293            inner.total_count = 1;
1294            inner.active_count = 1;
1295        }
1296
1297        let meta = ConnectionMeta::new(MockConnection::new(1));
1298        let mut pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1299
1300        // DerefMut should allow mutation
1301        pooled.id = 50;
1302        assert_eq!(pooled.id, 50);
1303    }
1304
1305    #[test]
1306    fn test_pooled_connection_debug() {
1307        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1308
1309        // Properly initialize pool state as if acquire happened
1310        {
1311            let mut inner = pool.shared.inner.lock().unwrap();
1312            inner.total_count = 1;
1313            inner.active_count = 1;
1314        }
1315
1316        let meta = ConnectionMeta::new(MockConnection::new(1));
1317        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1318
1319        let debug_str = format!("{:?}", pooled);
1320        assert!(debug_str.contains("PooledConnection"));
1321        assert!(debug_str.contains("age"));
1322    }
1323
1324    #[test]
1325    fn test_pool_at_capacity() {
1326        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(2));
1327
1328        assert!(!pool.at_capacity());
1329
1330        // Simulate connections being created
1331        {
1332            let mut inner = pool.shared.inner.lock().unwrap();
1333            inner.total_count = 1;
1334        }
1335        assert!(!pool.at_capacity());
1336
1337        {
1338            let mut inner = pool.shared.inner.lock().unwrap();
1339            inner.total_count = 2;
1340        }
1341        assert!(pool.at_capacity());
1342    }
1343
1344    #[test]
1345    fn test_acquire_action_enum() {
1346        // Verify the enum variants exist and can be pattern-matched
1347        let closed: AcquireAction<MockConnection> = AcquireAction::PoolClosed;
1348        assert!(matches!(closed, AcquireAction::PoolClosed));
1349
1350        let create: AcquireAction<MockConnection> = AcquireAction::CreateNew;
1351        assert!(matches!(create, AcquireAction::CreateNew));
1352
1353        let wait: AcquireAction<MockConnection> = AcquireAction::Wait;
1354        assert!(matches!(wait, AcquireAction::Wait));
1355
1356        let meta = ConnectionMeta::new(MockConnection::new(1));
1357        let validate: AcquireAction<MockConnection> = AcquireAction::ValidateExisting(meta);
1358        assert!(matches!(validate, AcquireAction::ValidateExisting(_)));
1359    }
1360
1361    #[test]
1362    fn test_pool_shared_atomic_counters() {
1363        let shared = PoolShared::<MockConnection>::new(PoolConfig::new(5));
1364
1365        // Initial values should be 0
1366        assert_eq!(shared.connections_created.load(Ordering::Relaxed), 0);
1367        assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 0);
1368        assert_eq!(shared.acquires.load(Ordering::Relaxed), 0);
1369        assert_eq!(shared.timeouts.load(Ordering::Relaxed), 0);
1370
1371        // Test incrementing
1372        shared.connections_created.fetch_add(1, Ordering::Relaxed);
1373        shared.connections_closed.fetch_add(2, Ordering::Relaxed);
1374        shared.acquires.fetch_add(10, Ordering::Relaxed);
1375        shared.timeouts.fetch_add(3, Ordering::Relaxed);
1376
1377        assert_eq!(shared.connections_created.load(Ordering::Relaxed), 1);
1378        assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 2);
1379        assert_eq!(shared.acquires.load(Ordering::Relaxed), 10);
1380        assert_eq!(shared.timeouts.load(Ordering::Relaxed), 3);
1381    }
1382
1383    #[test]
1384    fn test_pool_close_clears_idle() {
1385        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1386
1387        // Add some idle connections
1388        {
1389            let mut inner = pool.shared.inner.lock().unwrap();
1390            inner.total_count = 3;
1391            inner
1392                .idle
1393                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1394            inner
1395                .idle
1396                .push_back(ConnectionMeta::new(MockConnection::new(2)));
1397            inner
1398                .idle
1399                .push_back(ConnectionMeta::new(MockConnection::new(3)));
1400        }
1401
1402        assert_eq!(pool.idle_count(), 3);
1403        assert_eq!(pool.total_count(), 3);
1404
1405        pool.close();
1406
1407        // After close, idle connections should be cleared
1408        assert_eq!(pool.idle_count(), 0);
1409        assert_eq!(pool.total_count(), 0);
1410        assert!(pool.is_closed());
1411
1412        // connections_closed should reflect the 3 idle connections
1413        assert_eq!(pool.stats().connections_closed, 3);
1414    }
1415
1416    // ==================== Lock Poisoning Safety Tests ====================
1417    //
1418    // These tests verify that the pool correctly handles mutex poisoning,
1419    // which occurs when a thread panics while holding the lock.
1420    //
1421    // Tier 1 (mutations): Return Error if poisoned
1422    // Tier 2 (read-only): Recover and return valid data
1423    // Tier 3 (Drop): Log error and leak connection (don't panic)
1424
1425    /// Helper to poison a pool's mutex by panicking while holding the lock.
1426    ///
1427    /// Returns the pool with a poisoned mutex.
1428    fn poison_pool_mutex() -> Pool<MockConnection> {
1429        use std::panic;
1430        use std::thread;
1431
1432        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1433
1434        // Set up some valid state before poisoning
1435        {
1436            let mut inner = pool.shared.inner.lock().unwrap();
1437            inner.total_count = 2;
1438            inner.active_count = 1;
1439            inner
1440                .idle
1441                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1442        }
1443
1444        // Spawn a thread that will panic while holding the lock
1445        let shared_clone = Arc::clone(&pool.shared);
1446        let handle = thread::spawn(move || {
1447            let _guard = shared_clone.inner.lock().unwrap();
1448            // Panic while holding the lock - this poisons the mutex
1449            panic!("intentional panic to poison mutex");
1450        });
1451
1452        // Wait for the thread to panic (ignore the panic result)
1453        let _ = handle.join();
1454
1455        // Verify the mutex is now poisoned
1456        assert!(pool.shared.inner.lock().is_err());
1457
1458        pool
1459    }
1460
1461    // -------------------- Tier 2: Read-Only Methods --------------------
1462
1463    #[test]
1464    fn test_config_after_poisoning_returns_valid_data() {
1465        let pool = poison_pool_mutex();
1466
1467        // config() should recover and return the configuration
1468        let config = pool.config();
1469        assert_eq!(config.max_connections, 5);
1470    }
1471
1472    #[test]
1473    fn test_stats_after_poisoning_returns_valid_data() {
1474        let pool = poison_pool_mutex();
1475
1476        // stats() should recover and return valid statistics
1477        let stats = pool.stats();
1478        // The state before poisoning was: total=2, active=1, idle=1
1479        assert_eq!(stats.total_connections, 2);
1480        assert_eq!(stats.active_connections, 1);
1481        assert_eq!(stats.idle_connections, 1);
1482    }
1483
1484    #[test]
1485    fn test_at_capacity_after_poisoning() {
1486        let pool = poison_pool_mutex();
1487
1488        // at_capacity() should recover and return correct value
1489        // Pool has 2 connections, max is 5, so not at capacity
1490        assert!(!pool.at_capacity());
1491    }
1492
1493    #[test]
1494    fn test_is_closed_after_poisoning() {
1495        let pool = poison_pool_mutex();
1496
1497        // is_closed() should recover and return correct value
1498        assert!(!pool.is_closed());
1499    }
1500
1501    #[test]
1502    fn test_idle_count_after_poisoning() {
1503        let pool = poison_pool_mutex();
1504
1505        // idle_count() should recover and return correct value
1506        assert_eq!(pool.idle_count(), 1);
1507    }
1508
1509    #[test]
1510    fn test_active_count_after_poisoning() {
1511        let pool = poison_pool_mutex();
1512
1513        // active_count() should recover and return correct value
1514        assert_eq!(pool.active_count(), 1);
1515    }
1516
1517    #[test]
1518    fn test_total_count_after_poisoning() {
1519        let pool = poison_pool_mutex();
1520
1521        // total_count() should recover and return correct value
1522        assert_eq!(pool.total_count(), 2);
1523    }
1524
1525    // -------------------- Tier 1: Mutation Methods --------------------
1526
1527    #[test]
1528    fn test_lock_or_error_returns_error_when_poisoned() {
1529        use std::thread;
1530
1531        let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1532
1533        // Poison the mutex
1534        let shared_clone = Arc::clone(&shared);
1535        let handle = thread::spawn(move || {
1536            let _guard = shared_clone.inner.lock().unwrap();
1537            panic!("intentional panic to poison mutex");
1538        });
1539        let _ = handle.join();
1540
1541        // lock_or_error should return an error
1542        let result = shared.lock_or_error("test_operation");
1543
1544        // Verify it's a pool poisoning error
1545        match result {
1546            Err(Error::Pool(pool_err)) => {
1547                assert!(matches!(pool_err.kind, PoolErrorKind::Poisoned));
1548                assert!(pool_err.message.contains("poisoned"));
1549            }
1550            Err(other) => panic!("Expected Pool error, got: {:?}", other),
1551            Ok(_) => panic!("Expected error, got Ok"),
1552        }
1553    }
1554
1555    #[test]
1556    fn test_lock_or_recover_succeeds_when_poisoned() {
1557        use std::thread;
1558
1559        let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1560
1561        // Set up some state
1562        {
1563            let mut inner = shared.inner.lock().unwrap();
1564            inner.total_count = 42;
1565        }
1566
1567        // Poison the mutex
1568        let shared_clone = Arc::clone(&shared);
1569        let handle = thread::spawn(move || {
1570            let _guard = shared_clone.inner.lock().unwrap();
1571            panic!("intentional panic to poison mutex");
1572        });
1573        let _ = handle.join();
1574
1575        // Verify mutex is poisoned
1576        assert!(shared.inner.lock().is_err());
1577
1578        // lock_or_recover should still succeed and provide access to data
1579        let inner = shared.lock_or_recover();
1580        assert_eq!(inner.total_count, 42);
1581    }
1582
1583    #[test]
1584    fn test_close_after_poisoning_recovers_and_closes() {
1585        let pool = poison_pool_mutex();
1586
1587        // close() should recover from poisoning and still close the pool
1588        pool.close();
1589
1590        // After close, the pool should be marked as closed
1591        assert!(pool.is_closed());
1592
1593        // Idle connections should be cleared
1594        assert_eq!(pool.idle_count(), 0);
1595    }
1596
1597    // -------------------- Tier 3: Drop Safety --------------------
1598
1599    #[test]
1600    fn test_drop_pooled_connection_after_poisoning_does_not_panic() {
1601        use std::panic;
1602        use std::thread;
1603
1604        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1605
1606        // Set up a connection that's "checked out"
1607        {
1608            let mut inner = pool.shared.inner.lock().unwrap();
1609            inner.total_count = 1;
1610            inner.active_count = 1;
1611        }
1612
1613        // Create a pooled connection
1614        let meta = ConnectionMeta::new(MockConnection::new(1));
1615        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1616
1617        // Poison the mutex by panicking in another thread
1618        let shared_clone = Arc::clone(&pool.shared);
1619        let handle = thread::spawn(move || {
1620            let _guard = shared_clone.inner.lock().unwrap();
1621            panic!("intentional panic to poison mutex");
1622        });
1623        let _ = handle.join();
1624
1625        // Verify mutex is poisoned
1626        assert!(pool.shared.inner.lock().is_err());
1627
1628        // Drop the pooled connection - should NOT panic
1629        // The connection will be leaked, but that's the correct behavior
1630        let drop_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1631            drop(pooled);
1632        }));
1633
1634        // Dropping should not panic
1635        assert!(
1636            drop_result.is_ok(),
1637            "Dropping PooledConnection after mutex poisoning should not panic"
1638        );
1639    }
1640
1641    #[test]
1642    fn test_detach_after_poisoning_does_not_panic() {
1643        use std::panic;
1644        use std::thread;
1645
1646        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1647
1648        // Set up a connection that's "checked out"
1649        {
1650            let mut inner = pool.shared.inner.lock().unwrap();
1651            inner.total_count = 1;
1652            inner.active_count = 1;
1653        }
1654
1655        // Create a pooled connection
1656        let meta = ConnectionMeta::new(MockConnection::new(42));
1657        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1658
1659        // Poison the mutex
1660        let shared_clone = Arc::clone(&pool.shared);
1661        let handle = thread::spawn(move || {
1662            let _guard = shared_clone.inner.lock().unwrap();
1663            panic!("intentional panic to poison mutex");
1664        });
1665        let _ = handle.join();
1666
1667        // Verify mutex is poisoned
1668        assert!(pool.shared.inner.lock().is_err());
1669
1670        // Detach should not panic, even though it can't update counters
1671        let detach_result = panic::catch_unwind(panic::AssertUnwindSafe(|| pooled.detach()));
1672
1673        assert!(
1674            detach_result.is_ok(),
1675            "detach() after mutex poisoning should not panic"
1676        );
1677
1678        // Should still get the connection back
1679        let conn = detach_result.unwrap();
1680        assert_eq!(conn.id, 42);
1681    }
1682
1683    // -------------------- Integration: Pool Survives Thread Panic --------------------
1684
1685    #[test]
1686    fn test_pool_survives_thread_panic_during_acquire() {
1687        use std::thread;
1688
1689        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1690        let pool_arc = Arc::new(pool);
1691
1692        // Simulate a thread that acquires, does work, then panics
1693        // The connection should be leaked but pool should remain usable for reads
1694        let pool_clone = Arc::clone(&pool_arc);
1695        let handle = thread::spawn(move || {
1696            // Manually simulate having acquired a connection
1697            {
1698                let mut inner = pool_clone.shared.inner.lock().unwrap();
1699                inner.total_count = 1;
1700                inner.active_count = 1;
1701            }
1702
1703            // Panic while holding the pool's internal mutex to simulate a poisoned lock.
1704            // This models an internal panic in pool bookkeeping, not user code.
1705            let _guard = pool_clone.shared.inner.lock().unwrap();
1706            panic!("simulated panic during database operation");
1707        });
1708
1709        // Wait for thread to panic
1710        let _ = handle.join();
1711
1712        // Pool's mutex is now poisoned, but read-only methods should still work
1713        assert_eq!(pool_arc.total_count(), 1);
1714        assert_eq!(pool_arc.config().max_connections, 5);
1715
1716        // Stats should be recoverable
1717        let stats = pool_arc.stats();
1718        assert_eq!(stats.total_connections, 1);
1719    }
1720
1721    #[test]
1722    fn test_pool_close_after_thread_panic() {
1723        use std::thread;
1724
1725        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1726
1727        // Add some idle connections
1728        {
1729            let mut inner = pool.shared.inner.lock().unwrap();
1730            inner.total_count = 2;
1731            inner
1732                .idle
1733                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1734            inner
1735                .idle
1736                .push_back(ConnectionMeta::new(MockConnection::new(2)));
1737        }
1738
1739        // Poison the mutex
1740        let shared_clone = Arc::clone(&pool.shared);
1741        let handle = thread::spawn(move || {
1742            let _guard = shared_clone.inner.lock().unwrap();
1743            panic!("intentional panic");
1744        });
1745        let _ = handle.join();
1746
1747        // close() should recover and still work
1748        pool.close();
1749
1750        // Pool should be closed and idle connections cleared
1751        assert!(pool.is_closed());
1752        assert_eq!(pool.idle_count(), 0);
1753    }
1754
1755    #[test]
1756    fn test_multiple_reads_after_poisoning() {
1757        let pool = poison_pool_mutex();
1758
1759        // Multiple read operations should all succeed
1760        for _ in 0..10 {
1761            let _ = pool.config();
1762            let _ = pool.stats();
1763            let _ = pool.at_capacity();
1764            let _ = pool.is_closed();
1765            let _ = pool.idle_count();
1766            let _ = pool.active_count();
1767            let _ = pool.total_count();
1768        }
1769
1770        // All reads should have recovered successfully
1771        assert_eq!(pool.total_count(), 2);
1772    }
1773
1774    #[test]
1775    fn test_waiters_count_after_poisoning() {
1776        use std::thread;
1777
1778        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1779
1780        // Set up waiter count
1781        {
1782            let mut inner = pool.shared.inner.lock().unwrap();
1783            inner.waiter_count = 3;
1784        }
1785
1786        // Poison the mutex
1787        let shared_clone = Arc::clone(&pool.shared);
1788        let handle = thread::spawn(move || {
1789            let _guard = shared_clone.inner.lock().unwrap();
1790            panic!("intentional panic");
1791        });
1792        let _ = handle.join();
1793
1794        // stats() should recover and show correct waiter count
1795        let stats = pool.stats();
1796        assert_eq!(stats.pending_requests, 3);
1797    }
1798}