Skip to main content

sqlmodel_pool/
lib.rs

1//! Connection pooling for SQLModel Rust using asupersync.
2//!
3//! `sqlmodel-pool` is the **connection lifecycle layer**. It provides a generic,
4//! budget-aware pool that integrates with structured concurrency and can wrap any
5//! `Connection` implementation.
6//!
7//! # Role In The Architecture
8//!
9//! - **Shared connection management**: reuse connections across tasks safely.
10//! - **Budget-aware acquisition**: respects `Cx` timeouts and cancellation.
11//! - **Health checks**: validates connections before handing them out.
12//! - **Metrics**: exposes stats for pool sizing and tuning.
13//!
14//! # Features
15//!
16//! - Generic over any `Connection` type
17//! - RAII-based connection return (connections returned on drop)
18//! - Timeout support via `Cx` context
19//! - Connection health validation
20//! - Idle and max lifetime tracking
21//! - Pool statistics
22//!
23//! # Example
24//!
25//! ```rust,ignore
26//! use sqlmodel_pool::{Pool, PoolConfig};
27//!
28//! // Create a pool
29//! let config = PoolConfig::new(10)
30//!     .min_connections(2)
31//!     .acquire_timeout(5000);
32//!
33//! let pool = Pool::new(config, || async {
34//!     // Factory function to create new connections
35//!     PgConnection::connect(&cx, &pg_config).await
36//! });
37//!
38//! // Acquire a connection
39//! let conn = pool.acquire(&cx).await?;
40//!
41//! // Use the connection (automatically returned to pool on drop)
42//! conn.query(&cx, "SELECT 1", &[]).await?;
43//! ```
44
45pub mod replica;
46pub use replica::{ReplicaPool, ReplicaStrategy};
47
48pub mod sharding;
49pub use sharding::{ModuloShardChooser, QueryHints, ShardChooser, ShardedPool, ShardedPoolStats};
50
51use std::collections::VecDeque;
52use std::future::Future;
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::{Arc, Condvar, Mutex, Weak};
55use std::time::{Duration, Instant};
56
57use asupersync::{CancelReason, Cx, Outcome};
58use sqlmodel_core::error::{ConnectionError, ConnectionErrorKind, PoolError, PoolErrorKind};
59use sqlmodel_core::{Connection, Error};
60
61/// Connection pool configuration.
62#[derive(Debug, Clone)]
63pub struct PoolConfig {
64    /// Minimum number of connections to maintain
65    pub min_connections: usize,
66    /// Maximum number of connections allowed
67    pub max_connections: usize,
68    /// Connection idle timeout in milliseconds
69    pub idle_timeout_ms: u64,
70    /// Maximum time to wait for a connection in milliseconds
71    pub acquire_timeout_ms: u64,
72    /// Maximum lifetime of a connection in milliseconds
73    pub max_lifetime_ms: u64,
74    /// Test connections before giving them out
75    pub test_on_checkout: bool,
76    /// Test connections when returning them to the pool
77    pub test_on_return: bool,
78}
79
80impl Default for PoolConfig {
81    fn default() -> Self {
82        Self {
83            min_connections: 1,
84            max_connections: 10,
85            idle_timeout_ms: 600_000,   // 10 minutes
86            acquire_timeout_ms: 30_000, // 30 seconds
87            max_lifetime_ms: 1_800_000, // 30 minutes
88            test_on_checkout: true,
89            test_on_return: false,
90        }
91    }
92}
93
94impl PoolConfig {
95    /// Create a new pool configuration with the given max connections.
96    #[must_use]
97    pub fn new(max_connections: usize) -> Self {
98        Self {
99            max_connections,
100            ..Default::default()
101        }
102    }
103
104    /// Set minimum connections.
105    #[must_use]
106    pub fn min_connections(mut self, n: usize) -> Self {
107        self.min_connections = n;
108        self
109    }
110
111    /// Set idle timeout in milliseconds.
112    #[must_use]
113    pub fn idle_timeout(mut self, ms: u64) -> Self {
114        self.idle_timeout_ms = ms;
115        self
116    }
117
118    /// Set acquire timeout in milliseconds.
119    #[must_use]
120    pub fn acquire_timeout(mut self, ms: u64) -> Self {
121        self.acquire_timeout_ms = ms;
122        self
123    }
124
125    /// Set max lifetime in milliseconds.
126    #[must_use]
127    pub fn max_lifetime(mut self, ms: u64) -> Self {
128        self.max_lifetime_ms = ms;
129        self
130    }
131
132    /// Enable/disable test on checkout.
133    #[must_use]
134    pub fn test_on_checkout(mut self, enabled: bool) -> Self {
135        self.test_on_checkout = enabled;
136        self
137    }
138
139    /// Enable/disable test on return.
140    #[must_use]
141    pub fn test_on_return(mut self, enabled: bool) -> Self {
142        self.test_on_return = enabled;
143        self
144    }
145}
146
147/// Pool statistics.
148#[derive(Debug, Clone, Default)]
149pub struct PoolStats {
150    /// Total number of connections (active + idle)
151    pub total_connections: usize,
152    /// Number of idle connections
153    pub idle_connections: usize,
154    /// Number of active connections (currently in use)
155    pub active_connections: usize,
156    /// Number of pending acquire requests
157    pub pending_requests: usize,
158    /// Total number of connections created
159    pub connections_created: u64,
160    /// Total number of connections closed
161    pub connections_closed: u64,
162    /// Total number of successful acquires
163    pub acquires: u64,
164    /// Total number of acquire timeouts
165    pub timeouts: u64,
166}
167
168/// Metadata about a pooled connection.
169#[derive(Debug)]
170struct ConnectionMeta<C> {
171    /// The actual connection
172    conn: C,
173    /// When this connection was created
174    created_at: Instant,
175    /// When this connection was last used
176    last_used: Instant,
177}
178
179impl<C> ConnectionMeta<C> {
180    fn new(conn: C) -> Self {
181        let now = Instant::now();
182        Self {
183            conn,
184            created_at: now,
185            last_used: now,
186        }
187    }
188
189    fn touch(&mut self) {
190        self.last_used = Instant::now();
191    }
192
193    fn age(&self) -> Duration {
194        self.created_at.elapsed()
195    }
196
197    fn idle_time(&self) -> Duration {
198        self.last_used.elapsed()
199    }
200}
201
202/// Internal pool state shared between pool and connections.
203struct PoolInner<C> {
204    /// Pool configuration
205    config: PoolConfig,
206    /// Idle connections available for use
207    idle: VecDeque<ConnectionMeta<C>>,
208    /// Number of connections currently checked out
209    active_count: usize,
210    /// Total number of connections (idle + active)
211    total_count: usize,
212    /// Number of waiters in the queue
213    waiter_count: usize,
214    /// Whether the pool has been closed
215    closed: bool,
216}
217
218impl<C> PoolInner<C> {
219    fn new(config: PoolConfig) -> Self {
220        Self {
221            config,
222            idle: VecDeque::new(),
223            active_count: 0,
224            total_count: 0,
225            waiter_count: 0,
226            closed: false,
227        }
228    }
229
230    fn can_create_new(&self) -> bool {
231        !self.closed && self.total_count < self.config.max_connections
232    }
233
234    fn stats(&self) -> PoolStats {
235        PoolStats {
236            total_connections: self.total_count,
237            idle_connections: self.idle.len(),
238            active_connections: self.active_count,
239            pending_requests: self.waiter_count,
240            ..Default::default()
241        }
242    }
243}
244
245/// Shared state wrapper with condition variable for notification.
246struct PoolShared<C> {
247    /// Protected pool state
248    inner: Mutex<PoolInner<C>>,
249    /// Notifies waiters when connections become available
250    conn_available: Condvar,
251    /// Statistics counters (atomic for lock-free reads)
252    connections_created: AtomicU64,
253    connections_closed: AtomicU64,
254    acquires: AtomicU64,
255    timeouts: AtomicU64,
256}
257
258impl<C> PoolShared<C> {
259    fn new(config: PoolConfig) -> Self {
260        Self {
261            inner: Mutex::new(PoolInner::new(config)),
262            conn_available: Condvar::new(),
263            connections_created: AtomicU64::new(0),
264            connections_closed: AtomicU64::new(0),
265            acquires: AtomicU64::new(0),
266            timeouts: AtomicU64::new(0),
267        }
268    }
269
270    /// Lock the inner mutex, recovering from poisoning for read-only access.
271    ///
272    /// A poisoned mutex occurs when a thread panicked while holding the lock.
273    /// The data inside is still valid for reading, so we recover by logging
274    /// and using `into_inner()` to get the guard.
275    ///
276    /// This should only be used for read-only operations where the data is
277    /// always valid regardless of whether a previous operation completed.
278    fn lock_or_recover(&self) -> std::sync::MutexGuard<'_, PoolInner<C>> {
279        self.inner.lock().unwrap_or_else(|poisoned| {
280            tracing::error!(
281                "Pool mutex poisoned; recovering for read-only access. \
282                 A thread panicked while holding the lock."
283            );
284            poisoned.into_inner()
285        })
286    }
287
288    /// Lock the inner mutex, returning an error if poisoned.
289    ///
290    /// Use this for mutation operations where the pool state may be inconsistent
291    /// after a panic. Unlike `lock_or_recover()`, this propagates the error
292    /// to the caller.
293    #[allow(clippy::result_large_err)] // Error type is large by design for rich diagnostics
294    fn lock_or_error(
295        &self,
296        operation: &'static str,
297    ) -> Result<std::sync::MutexGuard<'_, PoolInner<C>>, Error> {
298        self.inner
299            .lock()
300            .map_err(|_| Error::Pool(PoolError::poisoned(operation)))
301    }
302}
303
304/// A connection pool for database connections.
305///
306/// The pool manages a collection of connections, reusing them across
307/// requests to avoid the overhead of establishing new connections.
308///
309/// # Type Parameters
310///
311/// - `C`: The connection type, must implement `Connection`
312///
313/// # Cancellation
314///
315/// Pool operations respect cancellation via the `Cx` context:
316/// - `acquire` will return early if cancellation is requested
317/// - Connections are properly cleaned up on cancellation
318pub struct Pool<C: Connection> {
319    shared: Arc<PoolShared<C>>,
320}
321
322impl<C: Connection> Pool<C> {
323    /// Create a new connection pool with the given configuration.
324    #[must_use]
325    pub fn new(config: PoolConfig) -> Self {
326        Self {
327            shared: Arc::new(PoolShared::new(config)),
328        }
329    }
330
331    /// Get the pool configuration.
332    #[must_use]
333    pub fn config(&self) -> PoolConfig {
334        let inner = self.shared.lock_or_recover();
335        inner.config.clone()
336    }
337
338    /// Get the current pool statistics.
339    #[must_use]
340    pub fn stats(&self) -> PoolStats {
341        let inner = self.shared.lock_or_recover();
342        let mut stats = inner.stats();
343        stats.connections_created = self.shared.connections_created.load(Ordering::Relaxed);
344        stats.connections_closed = self.shared.connections_closed.load(Ordering::Relaxed);
345        stats.acquires = self.shared.acquires.load(Ordering::Relaxed);
346        stats.timeouts = self.shared.timeouts.load(Ordering::Relaxed);
347        stats
348    }
349
350    /// Check if the pool is at capacity.
351    #[must_use]
352    pub fn at_capacity(&self) -> bool {
353        let inner = self.shared.lock_or_recover();
354        inner.total_count >= inner.config.max_connections
355    }
356
357    /// Check if the pool has been closed.
358    #[must_use]
359    pub fn is_closed(&self) -> bool {
360        let inner = self.shared.lock_or_recover();
361        inner.closed
362    }
363
364    /// Acquire a connection from the pool.
365    ///
366    /// This method will:
367    /// 1. Return an idle connection if one is available
368    /// 2. Create a new connection if below capacity
369    /// 3. Wait for a connection to become available (up to timeout)
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if:
374    /// - The pool is closed
375    /// - The acquire timeout is exceeded
376    /// - Cancellation is requested via the `Cx` context
377    /// - Connection validation fails (if `test_on_checkout` is enabled)
378    pub async fn acquire<F, Fut>(&self, cx: &Cx, factory: F) -> Outcome<PooledConnection<C>, Error>
379    where
380        F: Fn() -> Fut,
381        Fut: Future<Output = Outcome<C, Error>>,
382    {
383        let deadline = Instant::now() + Duration::from_millis(self.config().acquire_timeout_ms);
384        let test_on_checkout = self.config().test_on_checkout;
385        let max_lifetime = Duration::from_millis(self.config().max_lifetime_ms);
386        let idle_timeout = Duration::from_millis(self.config().idle_timeout_ms);
387
388        loop {
389            // Check cancellation
390            if cx.is_cancel_requested() {
391                return Outcome::Cancelled(CancelReason::user("pool acquire cancelled"));
392            }
393
394            // Check timeout
395            if Instant::now() >= deadline {
396                self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
397                return Outcome::Err(Error::Pool(PoolError {
398                    kind: PoolErrorKind::Timeout,
399                    message: "acquire timeout: no connections available".to_string(),
400                    source: None,
401                }));
402            }
403
404            // Try to get an idle connection or determine if we can create new
405            let action = {
406                let mut inner = match self.shared.lock_or_error("acquire") {
407                    Ok(guard) => guard,
408                    Err(e) => return Outcome::Err(e),
409                };
410
411                if inner.closed {
412                    AcquireAction::PoolClosed
413                } else {
414                    // Try to get an idle connection
415                    let mut found_conn = None;
416                    while let Some(mut meta) = inner.idle.pop_front() {
417                        // Check if connection is too old
418                        if meta.age() > max_lifetime {
419                            inner.total_count -= 1;
420                            self.shared
421                                .connections_closed
422                                .fetch_add(1, Ordering::Relaxed);
423                            continue;
424                        }
425
426                        // Check if connection has been idle too long
427                        if meta.idle_time() > idle_timeout {
428                            inner.total_count -= 1;
429                            self.shared
430                                .connections_closed
431                                .fetch_add(1, Ordering::Relaxed);
432                            continue;
433                        }
434
435                        // Found a valid connection
436                        meta.touch();
437                        inner.active_count += 1;
438                        found_conn = Some(meta);
439                        break;
440                    }
441
442                    if let Some(meta) = found_conn {
443                        AcquireAction::ValidateExisting(meta)
444                    } else if inner.can_create_new() {
445                        // No idle connections, can we create new?
446                        inner.total_count += 1;
447                        inner.active_count += 1;
448                        AcquireAction::CreateNew
449                    } else {
450                        // Must wait
451                        inner.waiter_count += 1;
452                        AcquireAction::Wait
453                    }
454                }
455            };
456
457            match action {
458                AcquireAction::PoolClosed => {
459                    return Outcome::Err(Error::Pool(PoolError {
460                        kind: PoolErrorKind::Closed,
461                        message: "pool has been closed".to_string(),
462                        source: None,
463                    }));
464                }
465                AcquireAction::ValidateExisting(meta) => {
466                    // Validate and wrap the connection (lock is released)
467                    return self.validate_and_wrap(cx, meta, test_on_checkout).await;
468                }
469                AcquireAction::CreateNew => {
470                    // Create new connection outside of lock
471                    match factory().await {
472                        Outcome::Ok(conn) => {
473                            self.shared
474                                .connections_created
475                                .fetch_add(1, Ordering::Relaxed);
476                            self.shared.acquires.fetch_add(1, Ordering::Relaxed);
477                            let meta = ConnectionMeta::new(conn);
478                            return Outcome::Ok(PooledConnection::new(
479                                meta,
480                                Arc::downgrade(&self.shared),
481                            ));
482                        }
483                        Outcome::Err(e) => {
484                            // Failed to create, decrement counts
485                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
486                                inner.total_count -= 1;
487                                inner.active_count -= 1;
488                            }
489                            // Even if we can't decrement counts, still return the original error
490                            return Outcome::Err(e);
491                        }
492                        Outcome::Cancelled(reason) => {
493                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
494                                inner.total_count -= 1;
495                                inner.active_count -= 1;
496                            }
497                            return Outcome::Cancelled(reason);
498                        }
499                        Outcome::Panicked(info) => {
500                            if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
501                                inner.total_count -= 1;
502                                inner.active_count -= 1;
503                            }
504                            return Outcome::Panicked(info);
505                        }
506                    }
507                }
508                AcquireAction::Wait => {
509                    // Wait for a connection to become available
510                    let remaining = deadline.saturating_duration_since(Instant::now());
511                    if remaining.is_zero() {
512                        if let Ok(mut inner) = self.shared.lock_or_error("acquire_timeout") {
513                            inner.waiter_count -= 1;
514                        }
515                        self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
516                        return Outcome::Err(Error::Pool(PoolError {
517                            kind: PoolErrorKind::Timeout,
518                            message: "acquire timeout: no connections available".to_string(),
519                            source: None,
520                        }));
521                    }
522
523                    // Wait with timeout (use shorter interval for cancellation checks)
524                    let wait_time = remaining.min(Duration::from_millis(100));
525                    {
526                        let inner = match self.shared.lock_or_error("acquire_wait") {
527                            Ok(guard) => guard,
528                            Err(e) => return Outcome::Err(e),
529                        };
530                        // wait_timeout can also return a poisoned error, handle it
531                        let _ = self
532                            .shared
533                            .conn_available
534                            .wait_timeout(inner, wait_time)
535                            .map_err(|_| {
536                                tracing::error!("Pool mutex poisoned during wait_timeout");
537                            });
538                    }
539
540                    // Decrement waiter count after waking
541                    {
542                        if let Ok(mut inner) = self.shared.lock_or_error("acquire_wake") {
543                            inner.waiter_count = inner.waiter_count.saturating_sub(1);
544                        }
545                    }
546
547                    // Loop back to try again
548                }
549            }
550        }
551    }
552
553    /// Validate a connection and wrap it in a PooledConnection.
554    async fn validate_and_wrap(
555        &self,
556        cx: &Cx,
557        meta: ConnectionMeta<C>,
558        test_on_checkout: bool,
559    ) -> Outcome<PooledConnection<C>, Error> {
560        if test_on_checkout {
561            // Validate the connection
562            match meta.conn.ping(cx).await {
563                Outcome::Ok(()) => {
564                    self.shared.acquires.fetch_add(1, Ordering::Relaxed);
565                    Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
566                }
567                Outcome::Err(_) | Outcome::Cancelled(_) | Outcome::Panicked(_) => {
568                    // Connection is invalid, decrement counts and try again
569                    {
570                        if let Ok(mut inner) = self.shared.lock_or_error("validate_cleanup") {
571                            inner.total_count -= 1;
572                            inner.active_count -= 1;
573                        }
574                    }
575                    self.shared
576                        .connections_closed
577                        .fetch_add(1, Ordering::Relaxed);
578                    // Return error - caller should retry
579                    Outcome::Err(Error::Connection(ConnectionError {
580                        kind: ConnectionErrorKind::Disconnected,
581                        message: "connection validation failed".to_string(),
582                        source: None,
583                    }))
584                }
585            }
586        } else {
587            self.shared.acquires.fetch_add(1, Ordering::Relaxed);
588            Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
589        }
590    }
591
592    /// Close the pool, preventing new connections and closing all idle connections.
593    ///
594    /// If the pool mutex is poisoned, this logs an error but still wakes waiters.
595    pub fn clear_idle(&self) {
596        if let Ok(mut inner) = self.shared.inner.lock() {
597            let idle_count = inner.idle.len();
598            inner.idle.clear();
599            inner.total_count -= idle_count;
600            self.shared
601                .connections_closed
602                .fetch_add(idle_count as u64, Ordering::Relaxed);
603        }
604    }
605
606    pub fn close(&self) {
607        match self.shared.inner.lock() {
608            Ok(mut inner) => {
609                inner.closed = true;
610
611                // Close all idle connections
612                let idle_count = inner.idle.len();
613                inner.idle.clear();
614                inner.total_count -= idle_count;
615                self.shared
616                    .connections_closed
617                    .fetch_add(idle_count as u64, Ordering::Relaxed);
618                drop(inner);
619            }
620            Err(poisoned) => {
621                // Recover from poisoning - we still want to mark the pool as closed
622                // and wake waiters even if counts may be inconsistent.
623                tracing::error!(
624                    "Pool mutex poisoned during close; attempting recovery. \
625                     Pool state may be inconsistent."
626                );
627                let mut inner = poisoned.into_inner();
628                inner.closed = true;
629                let idle_count = inner.idle.len();
630                inner.idle.clear();
631                inner.total_count -= idle_count;
632                self.shared
633                    .connections_closed
634                    .fetch_add(idle_count as u64, Ordering::Relaxed);
635            }
636        }
637
638        // Wake all waiters so they see the pool is closed
639        self.shared.conn_available.notify_all();
640    }
641
642    /// Get the number of idle connections.
643    #[must_use]
644    pub fn idle_count(&self) -> usize {
645        let inner = self.shared.lock_or_recover();
646        inner.idle.len()
647    }
648
649    /// Get the number of active connections.
650    #[must_use]
651    pub fn active_count(&self) -> usize {
652        let inner = self.shared.lock_or_recover();
653        inner.active_count
654    }
655
656    /// Get the total number of connections.
657    #[must_use]
658    pub fn total_count(&self) -> usize {
659        let inner = self.shared.lock_or_recover();
660        inner.total_count
661    }
662}
663
664/// Action to take when acquiring a connection.
665enum AcquireAction<C> {
666    /// Pool is closed
667    PoolClosed,
668    /// Found an existing connection to validate
669    ValidateExisting(ConnectionMeta<C>),
670    /// Create a new connection
671    CreateNew,
672    /// Wait for a connection to become available
673    Wait,
674}
675
676/// A connection borrowed from the pool.
677///
678/// When dropped, the connection is automatically returned to the pool.
679/// The connection can be used via `Deref` and `DerefMut`.
680pub struct PooledConnection<C: Connection> {
681    /// The connection metadata (Some while held, None after return)
682    meta: Option<ConnectionMeta<C>>,
683    /// Weak reference to pool for returning
684    pool: Weak<PoolShared<C>>,
685}
686
687impl<C: Connection> PooledConnection<C> {
688    fn new(meta: ConnectionMeta<C>, pool: Weak<PoolShared<C>>) -> Self {
689        Self {
690            meta: Some(meta),
691            pool,
692        }
693    }
694
695    /// Detach this connection from the pool.
696    ///
697    /// The connection will not be returned to the pool when dropped.
698    /// This is useful when you need to close a connection explicitly.
699    pub fn detach(mut self) -> C {
700        if let Some(pool) = self.pool.upgrade() {
701            // Try to update pool counters, but don't panic if mutex is poisoned.
702            // The connection is being detached anyway, so counts being off is acceptable.
703            match pool.inner.lock() {
704                Ok(mut inner) => {
705                    inner.total_count -= 1;
706                    inner.active_count -= 1;
707                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
708                }
709                Err(_poisoned) => {
710                    tracing::error!(
711                        "Pool mutex poisoned during detach; pool counters will be inconsistent"
712                    );
713                    // Still increment the atomic counter for tracking
714                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
715                }
716            }
717        }
718        self.meta.take().expect("connection already detached").conn
719    }
720
721    /// Get the age of this connection (time since creation).
722    #[must_use]
723    pub fn age(&self) -> Duration {
724        self.meta.as_ref().map_or(Duration::ZERO, |m| m.age())
725    }
726
727    /// Get the idle time of this connection (time since last use).
728    #[must_use]
729    pub fn idle_time(&self) -> Duration {
730        self.meta.as_ref().map_or(Duration::ZERO, |m| m.idle_time())
731    }
732}
733
734impl<C: Connection> std::ops::Deref for PooledConnection<C> {
735    type Target = C;
736
737    fn deref(&self) -> &Self::Target {
738        &self
739            .meta
740            .as_ref()
741            .expect("connection already returned to pool")
742            .conn
743    }
744}
745
746impl<C: Connection> std::ops::DerefMut for PooledConnection<C> {
747    fn deref_mut(&mut self) -> &mut Self::Target {
748        &mut self
749            .meta
750            .as_mut()
751            .expect("connection already returned to pool")
752            .conn
753    }
754}
755
756impl<C: Connection> Drop for PooledConnection<C> {
757    fn drop(&mut self) {
758        if let Some(mut meta) = self.meta.take() {
759            meta.touch(); // Update last used time
760            if let Some(pool) = self.pool.upgrade() {
761                // Return to pool - but if mutex is poisoned, we must not panic in Drop.
762                // Instead, log the error and leak the connection.
763                let mut inner = match pool.inner.lock() {
764                    Ok(guard) => guard,
765                    Err(_poisoned) => {
766                        tracing::error!(
767                            "Pool mutex poisoned during connection return; \
768                             connection will be leaked. A thread panicked while holding the lock."
769                        );
770                        // Connection is leaked - we can't safely return it or update counts.
771                        // The pool is likely in a bad state anyway.
772                        return;
773                    }
774                };
775
776                if inner.closed {
777                    inner.total_count -= 1;
778                    inner.active_count -= 1;
779                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
780                    return;
781                }
782
783                // Check max lifetime
784                let max_lifetime = Duration::from_millis(inner.config.max_lifetime_ms);
785                if meta.age() > max_lifetime {
786                    inner.total_count -= 1;
787                    inner.active_count -= 1;
788                    pool.connections_closed.fetch_add(1, Ordering::Relaxed);
789                    return;
790                }
791
792                inner.active_count -= 1;
793                inner.idle.push_back(meta);
794
795                drop(inner);
796                pool.conn_available.notify_one();
797            }
798            // If pool is gone, connection is just dropped
799        }
800    }
801}
802
803impl<C: Connection + std::fmt::Debug> std::fmt::Debug for PooledConnection<C> {
804    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
805        f.debug_struct("PooledConnection")
806            .field("conn", &self.meta.as_ref().map(|m| &m.conn))
807            .field("age", &self.age())
808            .field("idle_time", &self.idle_time())
809            .finish_non_exhaustive()
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use super::*;
816    use sqlmodel_core::connection::{IsolationLevel, PreparedStatement, TransactionOps};
817    use sqlmodel_core::{Row, Value};
818    use std::sync::atomic::AtomicBool;
819
820    /// A mock connection for testing pool behavior.
821    #[derive(Debug)]
822    struct MockConnection {
823        id: u32,
824        ping_should_fail: Arc<AtomicBool>,
825    }
826
827    impl MockConnection {
828        fn new(id: u32) -> Self {
829            Self {
830                id,
831                ping_should_fail: Arc::new(AtomicBool::new(false)),
832            }
833        }
834
835        #[allow(dead_code)]
836        fn with_ping_behavior(id: u32, should_fail: Arc<AtomicBool>) -> Self {
837            Self {
838                id,
839                ping_should_fail: should_fail,
840            }
841        }
842    }
843
844    /// Mock transaction for MockConnection.
845    struct MockTx;
846
847    impl TransactionOps for MockTx {
848        async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
849            Outcome::Ok(vec![])
850        }
851
852        async fn query_one(
853            &self,
854            _cx: &Cx,
855            _sql: &str,
856            _params: &[Value],
857        ) -> Outcome<Option<Row>, Error> {
858            Outcome::Ok(None)
859        }
860
861        async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
862            Outcome::Ok(0)
863        }
864
865        async fn savepoint(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
866            Outcome::Ok(())
867        }
868
869        async fn rollback_to(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
870            Outcome::Ok(())
871        }
872
873        async fn release(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
874            Outcome::Ok(())
875        }
876
877        async fn commit(self, _cx: &Cx) -> Outcome<(), Error> {
878            Outcome::Ok(())
879        }
880
881        async fn rollback(self, _cx: &Cx) -> Outcome<(), Error> {
882            Outcome::Ok(())
883        }
884    }
885
886    impl Connection for MockConnection {
887        type Tx<'conn> = MockTx;
888
889        async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
890            Outcome::Ok(vec![])
891        }
892
893        async fn query_one(
894            &self,
895            _cx: &Cx,
896            _sql: &str,
897            _params: &[Value],
898        ) -> Outcome<Option<Row>, Error> {
899            Outcome::Ok(None)
900        }
901
902        async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
903            Outcome::Ok(0)
904        }
905
906        async fn insert(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<i64, Error> {
907            Outcome::Ok(0)
908        }
909
910        async fn batch(
911            &self,
912            _cx: &Cx,
913            _statements: &[(String, Vec<Value>)],
914        ) -> Outcome<Vec<u64>, Error> {
915            Outcome::Ok(vec![])
916        }
917
918        async fn begin(&self, _cx: &Cx) -> Outcome<Self::Tx<'_>, Error> {
919            Outcome::Ok(MockTx)
920        }
921
922        async fn begin_with(
923            &self,
924            _cx: &Cx,
925            _isolation: IsolationLevel,
926        ) -> Outcome<Self::Tx<'_>, Error> {
927            Outcome::Ok(MockTx)
928        }
929
930        async fn prepare(&self, _cx: &Cx, _sql: &str) -> Outcome<PreparedStatement, Error> {
931            Outcome::Ok(PreparedStatement::new(1, String::new(), 0))
932        }
933
934        async fn query_prepared(
935            &self,
936            _cx: &Cx,
937            _stmt: &PreparedStatement,
938            _params: &[Value],
939        ) -> Outcome<Vec<Row>, Error> {
940            Outcome::Ok(vec![])
941        }
942
943        async fn execute_prepared(
944            &self,
945            _cx: &Cx,
946            _stmt: &PreparedStatement,
947            _params: &[Value],
948        ) -> Outcome<u64, Error> {
949            Outcome::Ok(0)
950        }
951
952        async fn ping(&self, _cx: &Cx) -> Outcome<(), Error> {
953            if self.ping_should_fail.load(Ordering::Relaxed) {
954                Outcome::Err(Error::Connection(ConnectionError {
955                    kind: ConnectionErrorKind::Disconnected,
956                    message: "mock ping failed".to_string(),
957                    source: None,
958                }))
959            } else {
960                Outcome::Ok(())
961            }
962        }
963
964        async fn close(self, _cx: &Cx) -> Result<(), Error> {
965            Ok(())
966        }
967    }
968
969    #[test]
970    fn test_config_default() {
971        let config = PoolConfig::default();
972        assert_eq!(config.min_connections, 1);
973        assert_eq!(config.max_connections, 10);
974        assert_eq!(config.idle_timeout_ms, 600_000);
975        assert_eq!(config.acquire_timeout_ms, 30_000);
976        assert_eq!(config.max_lifetime_ms, 1_800_000);
977        assert!(config.test_on_checkout);
978        assert!(!config.test_on_return);
979    }
980
981    #[test]
982    fn test_config_builder() {
983        let config = PoolConfig::new(20)
984            .min_connections(5)
985            .idle_timeout(60_000)
986            .acquire_timeout(5_000)
987            .max_lifetime(300_000)
988            .test_on_checkout(false)
989            .test_on_return(true);
990
991        assert_eq!(config.min_connections, 5);
992        assert_eq!(config.max_connections, 20);
993        assert_eq!(config.idle_timeout_ms, 60_000);
994        assert_eq!(config.acquire_timeout_ms, 5_000);
995        assert_eq!(config.max_lifetime_ms, 300_000);
996        assert!(!config.test_on_checkout);
997        assert!(config.test_on_return);
998    }
999
1000    #[test]
1001    fn test_config_clone() {
1002        let config = PoolConfig::new(15).min_connections(3);
1003        let cloned = config.clone();
1004        assert_eq!(config.max_connections, cloned.max_connections);
1005        assert_eq!(config.min_connections, cloned.min_connections);
1006    }
1007
1008    #[test]
1009    fn test_stats_default() {
1010        let stats = PoolStats::default();
1011        assert_eq!(stats.total_connections, 0);
1012        assert_eq!(stats.idle_connections, 0);
1013        assert_eq!(stats.active_connections, 0);
1014        assert_eq!(stats.pending_requests, 0);
1015        assert_eq!(stats.connections_created, 0);
1016        assert_eq!(stats.connections_closed, 0);
1017        assert_eq!(stats.acquires, 0);
1018        assert_eq!(stats.timeouts, 0);
1019    }
1020
1021    #[test]
1022    fn test_stats_clone() {
1023        let stats = PoolStats {
1024            total_connections: 5,
1025            acquires: 100,
1026            ..Default::default()
1027        };
1028        let cloned = stats.clone();
1029        assert_eq!(stats.total_connections, cloned.total_connections);
1030        assert_eq!(stats.acquires, cloned.acquires);
1031    }
1032
1033    #[test]
1034    fn test_connection_meta_timing() {
1035        use std::thread;
1036
1037        // Create a dummy type for testing
1038        struct DummyConn;
1039
1040        let meta = ConnectionMeta::new(DummyConn);
1041        let initial_age = meta.age();
1042
1043        // Small sleep to ensure time passes
1044        thread::sleep(Duration::from_millis(10));
1045
1046        // Age should have increased
1047        assert!(meta.age() > initial_age);
1048        assert!(meta.idle_time() > Duration::ZERO);
1049    }
1050
1051    #[test]
1052    fn test_connection_meta_touch() {
1053        use std::thread;
1054
1055        struct DummyConn;
1056
1057        let mut meta = ConnectionMeta::new(DummyConn);
1058
1059        // Small sleep to build up some idle time
1060        thread::sleep(Duration::from_millis(10));
1061        let idle_before_touch = meta.idle_time();
1062        assert!(idle_before_touch > Duration::ZERO);
1063
1064        // Touch should reset idle time
1065        meta.touch();
1066        let idle_after_touch = meta.idle_time();
1067
1068        // After touch, idle time should be very small (less than before)
1069        assert!(idle_after_touch < idle_before_touch);
1070    }
1071
1072    #[test]
1073    fn test_pool_new() {
1074        let config = PoolConfig::new(5);
1075        let pool: Pool<MockConnection> = Pool::new(config);
1076
1077        // New pool should be empty
1078        assert_eq!(pool.idle_count(), 0);
1079        assert_eq!(pool.active_count(), 0);
1080        assert_eq!(pool.total_count(), 0);
1081        assert!(!pool.is_closed());
1082        assert!(!pool.at_capacity());
1083    }
1084
1085    #[test]
1086    fn test_pool_config() {
1087        let config = PoolConfig::new(7).min_connections(2);
1088        let pool: Pool<MockConnection> = Pool::new(config);
1089
1090        let retrieved_config = pool.config();
1091        assert_eq!(retrieved_config.max_connections, 7);
1092        assert_eq!(retrieved_config.min_connections, 2);
1093    }
1094
1095    #[test]
1096    fn test_pool_stats_initial() {
1097        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1098
1099        let stats = pool.stats();
1100        assert_eq!(stats.total_connections, 0);
1101        assert_eq!(stats.idle_connections, 0);
1102        assert_eq!(stats.active_connections, 0);
1103        assert_eq!(stats.pending_requests, 0);
1104        assert_eq!(stats.connections_created, 0);
1105        assert_eq!(stats.connections_closed, 0);
1106        assert_eq!(stats.acquires, 0);
1107        assert_eq!(stats.timeouts, 0);
1108    }
1109
1110    #[test]
1111    fn test_pool_close() {
1112        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1113
1114        assert!(!pool.is_closed());
1115        pool.close();
1116        assert!(pool.is_closed());
1117    }
1118
1119    #[test]
1120    fn test_pool_inner_can_create_new() {
1121        let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(3));
1122
1123        // Initially can create new
1124        assert!(inner.can_create_new());
1125
1126        // At capacity
1127        inner.total_count = 3;
1128        assert!(!inner.can_create_new());
1129
1130        // Below capacity again
1131        inner.total_count = 2;
1132        assert!(inner.can_create_new());
1133
1134        // Closed pool
1135        inner.closed = true;
1136        assert!(!inner.can_create_new());
1137    }
1138
1139    #[test]
1140    fn test_pool_inner_stats() {
1141        let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(10));
1142
1143        inner.total_count = 5;
1144        inner.active_count = 3;
1145        inner.waiter_count = 2;
1146        inner
1147            .idle
1148            .push_back(ConnectionMeta::new(MockConnection::new(1)));
1149        inner
1150            .idle
1151            .push_back(ConnectionMeta::new(MockConnection::new(2)));
1152
1153        let stats = inner.stats();
1154        assert_eq!(stats.total_connections, 5);
1155        assert_eq!(stats.idle_connections, 2);
1156        assert_eq!(stats.active_connections, 3);
1157        assert_eq!(stats.pending_requests, 2);
1158    }
1159
1160    #[test]
1161    fn test_pooled_connection_age_and_idle_time() {
1162        use std::thread;
1163
1164        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1165
1166        // Properly initialize pool state as if acquire happened
1167        {
1168            let mut inner = pool.shared.inner.lock().unwrap();
1169            inner.total_count = 1;
1170            inner.active_count = 1;
1171        }
1172
1173        let meta = ConnectionMeta::new(MockConnection::new(1));
1174        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1175
1176        // Should have some small positive age
1177        assert!(pooled.age() >= Duration::ZERO);
1178
1179        thread::sleep(Duration::from_millis(5));
1180        assert!(pooled.age() > Duration::ZERO);
1181    }
1182
1183    #[test]
1184    fn test_pooled_connection_detach() {
1185        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1186
1187        // Manually add a connection to simulate acquire
1188        {
1189            let mut inner = pool.shared.inner.lock().unwrap();
1190            inner.total_count = 1;
1191            inner.active_count = 1;
1192        }
1193
1194        let meta = ConnectionMeta::new(MockConnection::new(42));
1195        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1196
1197        // Verify counts before detach
1198        assert_eq!(pool.total_count(), 1);
1199        assert_eq!(pool.active_count(), 1);
1200
1201        // Detach returns the connection
1202        let conn = pooled.detach();
1203        assert_eq!(conn.id, 42);
1204
1205        // After detach, counts should be decremented
1206        assert_eq!(pool.total_count(), 0);
1207        assert_eq!(pool.active_count(), 0);
1208
1209        // connections_closed should be incremented
1210        let stats = pool.stats();
1211        assert_eq!(stats.connections_closed, 1);
1212    }
1213
1214    #[test]
1215    fn test_pooled_connection_drop_returns_to_pool() {
1216        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1217
1218        // Manually set up pool state as if we acquired a connection
1219        {
1220            let mut inner = pool.shared.inner.lock().unwrap();
1221            inner.total_count = 1;
1222            inner.active_count = 1;
1223        }
1224
1225        let meta = ConnectionMeta::new(MockConnection::new(1));
1226        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1227
1228        // While held, active=1, idle=0
1229        assert_eq!(pool.active_count(), 1);
1230        assert_eq!(pool.idle_count(), 0);
1231
1232        // Drop the connection
1233        drop(pooled);
1234
1235        // After drop, active=0, idle=1 (returned to pool)
1236        assert_eq!(pool.active_count(), 0);
1237        assert_eq!(pool.idle_count(), 1);
1238        assert_eq!(pool.total_count(), 1); // Total unchanged
1239    }
1240
1241    #[test]
1242    fn test_pooled_connection_drop_when_pool_closed() {
1243        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1244
1245        // Set up pool state
1246        {
1247            let mut inner = pool.shared.inner.lock().unwrap();
1248            inner.total_count = 1;
1249            inner.active_count = 1;
1250        }
1251
1252        let meta = ConnectionMeta::new(MockConnection::new(1));
1253        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1254
1255        // Close the pool while connection is out
1256        pool.close();
1257
1258        // Drop the connection
1259        drop(pooled);
1260
1261        // Connection should not be returned to idle (pool is closed)
1262        assert_eq!(pool.idle_count(), 0);
1263        assert_eq!(pool.active_count(), 0);
1264        assert_eq!(pool.total_count(), 0);
1265
1266        // Connection was closed
1267        assert_eq!(pool.stats().connections_closed, 1);
1268    }
1269
1270    #[test]
1271    fn test_pooled_connection_deref() {
1272        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1273
1274        // Properly initialize pool state as if acquire happened
1275        {
1276            let mut inner = pool.shared.inner.lock().unwrap();
1277            inner.total_count = 1;
1278            inner.active_count = 1;
1279        }
1280
1281        let meta = ConnectionMeta::new(MockConnection::new(99));
1282        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1283
1284        // Deref should give access to the connection's id
1285        assert_eq!(pooled.id, 99);
1286    }
1287
1288    #[test]
1289    fn test_pooled_connection_deref_mut() {
1290        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1291
1292        // Properly initialize pool state as if acquire happened
1293        {
1294            let mut inner = pool.shared.inner.lock().unwrap();
1295            inner.total_count = 1;
1296            inner.active_count = 1;
1297        }
1298
1299        let meta = ConnectionMeta::new(MockConnection::new(1));
1300        let mut pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1301
1302        // DerefMut should allow mutation
1303        pooled.id = 50;
1304        assert_eq!(pooled.id, 50);
1305    }
1306
1307    #[test]
1308    fn test_pooled_connection_debug() {
1309        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1310
1311        // Properly initialize pool state as if acquire happened
1312        {
1313            let mut inner = pool.shared.inner.lock().unwrap();
1314            inner.total_count = 1;
1315            inner.active_count = 1;
1316        }
1317
1318        let meta = ConnectionMeta::new(MockConnection::new(1));
1319        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1320
1321        let debug_str = format!("{:?}", pooled);
1322        assert!(debug_str.contains("PooledConnection"));
1323        assert!(debug_str.contains("age"));
1324    }
1325
1326    #[test]
1327    fn test_pool_at_capacity() {
1328        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(2));
1329
1330        assert!(!pool.at_capacity());
1331
1332        // Simulate connections being created
1333        {
1334            let mut inner = pool.shared.inner.lock().unwrap();
1335            inner.total_count = 1;
1336        }
1337        assert!(!pool.at_capacity());
1338
1339        {
1340            let mut inner = pool.shared.inner.lock().unwrap();
1341            inner.total_count = 2;
1342        }
1343        assert!(pool.at_capacity());
1344    }
1345
1346    #[test]
1347    fn test_acquire_action_enum() {
1348        // Verify the enum variants exist and can be pattern-matched
1349        let closed: AcquireAction<MockConnection> = AcquireAction::PoolClosed;
1350        assert!(matches!(closed, AcquireAction::PoolClosed));
1351
1352        let create: AcquireAction<MockConnection> = AcquireAction::CreateNew;
1353        assert!(matches!(create, AcquireAction::CreateNew));
1354
1355        let wait: AcquireAction<MockConnection> = AcquireAction::Wait;
1356        assert!(matches!(wait, AcquireAction::Wait));
1357
1358        let meta = ConnectionMeta::new(MockConnection::new(1));
1359        let validate: AcquireAction<MockConnection> = AcquireAction::ValidateExisting(meta);
1360        assert!(matches!(validate, AcquireAction::ValidateExisting(_)));
1361    }
1362
1363    #[test]
1364    fn test_pool_shared_atomic_counters() {
1365        let shared = PoolShared::<MockConnection>::new(PoolConfig::new(5));
1366
1367        // Initial values should be 0
1368        assert_eq!(shared.connections_created.load(Ordering::Relaxed), 0);
1369        assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 0);
1370        assert_eq!(shared.acquires.load(Ordering::Relaxed), 0);
1371        assert_eq!(shared.timeouts.load(Ordering::Relaxed), 0);
1372
1373        // Test incrementing
1374        shared.connections_created.fetch_add(1, Ordering::Relaxed);
1375        shared.connections_closed.fetch_add(2, Ordering::Relaxed);
1376        shared.acquires.fetch_add(10, Ordering::Relaxed);
1377        shared.timeouts.fetch_add(3, Ordering::Relaxed);
1378
1379        assert_eq!(shared.connections_created.load(Ordering::Relaxed), 1);
1380        assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 2);
1381        assert_eq!(shared.acquires.load(Ordering::Relaxed), 10);
1382        assert_eq!(shared.timeouts.load(Ordering::Relaxed), 3);
1383    }
1384
1385    #[test]
1386    fn test_pool_close_clears_idle() {
1387        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1388
1389        // Add some idle connections
1390        {
1391            let mut inner = pool.shared.inner.lock().unwrap();
1392            inner.total_count = 3;
1393            inner
1394                .idle
1395                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1396            inner
1397                .idle
1398                .push_back(ConnectionMeta::new(MockConnection::new(2)));
1399            inner
1400                .idle
1401                .push_back(ConnectionMeta::new(MockConnection::new(3)));
1402        }
1403
1404        assert_eq!(pool.idle_count(), 3);
1405        assert_eq!(pool.total_count(), 3);
1406
1407        pool.close();
1408
1409        // After close, idle connections should be cleared
1410        assert_eq!(pool.idle_count(), 0);
1411        assert_eq!(pool.total_count(), 0);
1412        assert!(pool.is_closed());
1413
1414        // connections_closed should reflect the 3 idle connections
1415        assert_eq!(pool.stats().connections_closed, 3);
1416    }
1417
1418    // ==================== Lock Poisoning Safety Tests ====================
1419    //
1420    // These tests verify that the pool correctly handles mutex poisoning,
1421    // which occurs when a thread panics while holding the lock.
1422    //
1423    // Tier 1 (mutations): Return Error if poisoned
1424    // Tier 2 (read-only): Recover and return valid data
1425    // Tier 3 (Drop): Log error and leak connection (don't panic)
1426
1427    /// Helper to poison a pool's mutex by panicking while holding the lock.
1428    ///
1429    /// Returns the pool with a poisoned mutex.
1430    fn poison_pool_mutex() -> Pool<MockConnection> {
1431        use std::panic;
1432        use std::thread;
1433
1434        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1435
1436        // Set up some valid state before poisoning
1437        {
1438            let mut inner = pool.shared.inner.lock().unwrap();
1439            inner.total_count = 2;
1440            inner.active_count = 1;
1441            inner
1442                .idle
1443                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1444        }
1445
1446        // Spawn a thread that will panic while holding the lock
1447        let shared_clone = Arc::clone(&pool.shared);
1448        let handle = thread::spawn(move || {
1449            let _guard = shared_clone.inner.lock().unwrap();
1450            // Panic while holding the lock - this poisons the mutex
1451            panic!("intentional panic to poison mutex");
1452        });
1453
1454        // Wait for the thread to panic (ignore the panic result)
1455        let _ = handle.join();
1456
1457        // Verify the mutex is now poisoned
1458        assert!(pool.shared.inner.lock().is_err());
1459
1460        pool
1461    }
1462
1463    // -------------------- Tier 2: Read-Only Methods --------------------
1464
1465    #[test]
1466    fn test_config_after_poisoning_returns_valid_data() {
1467        let pool = poison_pool_mutex();
1468
1469        // config() should recover and return the configuration
1470        let config = pool.config();
1471        assert_eq!(config.max_connections, 5);
1472    }
1473
1474    #[test]
1475    fn test_stats_after_poisoning_returns_valid_data() {
1476        let pool = poison_pool_mutex();
1477
1478        // stats() should recover and return valid statistics
1479        let stats = pool.stats();
1480        // The state before poisoning was: total=2, active=1, idle=1
1481        assert_eq!(stats.total_connections, 2);
1482        assert_eq!(stats.active_connections, 1);
1483        assert_eq!(stats.idle_connections, 1);
1484    }
1485
1486    #[test]
1487    fn test_at_capacity_after_poisoning() {
1488        let pool = poison_pool_mutex();
1489
1490        // at_capacity() should recover and return correct value
1491        // Pool has 2 connections, max is 5, so not at capacity
1492        assert!(!pool.at_capacity());
1493    }
1494
1495    #[test]
1496    fn test_is_closed_after_poisoning() {
1497        let pool = poison_pool_mutex();
1498
1499        // is_closed() should recover and return correct value
1500        assert!(!pool.is_closed());
1501    }
1502
1503    #[test]
1504    fn test_idle_count_after_poisoning() {
1505        let pool = poison_pool_mutex();
1506
1507        // idle_count() should recover and return correct value
1508        assert_eq!(pool.idle_count(), 1);
1509    }
1510
1511    #[test]
1512    fn test_active_count_after_poisoning() {
1513        let pool = poison_pool_mutex();
1514
1515        // active_count() should recover and return correct value
1516        assert_eq!(pool.active_count(), 1);
1517    }
1518
1519    #[test]
1520    fn test_total_count_after_poisoning() {
1521        let pool = poison_pool_mutex();
1522
1523        // total_count() should recover and return correct value
1524        assert_eq!(pool.total_count(), 2);
1525    }
1526
1527    // -------------------- Tier 1: Mutation Methods --------------------
1528
1529    #[test]
1530    fn test_lock_or_error_returns_error_when_poisoned() {
1531        use std::thread;
1532
1533        let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1534
1535        // Poison the mutex
1536        let shared_clone = Arc::clone(&shared);
1537        let handle = thread::spawn(move || {
1538            let _guard = shared_clone.inner.lock().unwrap();
1539            panic!("intentional panic to poison mutex");
1540        });
1541        let _ = handle.join();
1542
1543        // lock_or_error should return an error
1544        let result = shared.lock_or_error("test_operation");
1545
1546        // Verify it's a pool poisoning error
1547        match result {
1548            Err(Error::Pool(pool_err)) => {
1549                assert!(matches!(pool_err.kind, PoolErrorKind::Poisoned));
1550                assert!(pool_err.message.contains("poisoned"));
1551            }
1552            Err(other) => panic!("Expected Pool error, got: {:?}", other),
1553            Ok(_) => panic!("Expected error, got Ok"),
1554        }
1555    }
1556
1557    #[test]
1558    fn test_lock_or_recover_succeeds_when_poisoned() {
1559        use std::thread;
1560
1561        let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
1562
1563        // Set up some state
1564        {
1565            let mut inner = shared.inner.lock().unwrap();
1566            inner.total_count = 42;
1567        }
1568
1569        // Poison the mutex
1570        let shared_clone = Arc::clone(&shared);
1571        let handle = thread::spawn(move || {
1572            let _guard = shared_clone.inner.lock().unwrap();
1573            panic!("intentional panic to poison mutex");
1574        });
1575        let _ = handle.join();
1576
1577        // Verify mutex is poisoned
1578        assert!(shared.inner.lock().is_err());
1579
1580        // lock_or_recover should still succeed and provide access to data
1581        let inner = shared.lock_or_recover();
1582        assert_eq!(inner.total_count, 42);
1583    }
1584
1585    #[test]
1586    fn test_close_after_poisoning_recovers_and_closes() {
1587        let pool = poison_pool_mutex();
1588
1589        // close() should recover from poisoning and still close the pool
1590        pool.close();
1591
1592        // After close, the pool should be marked as closed
1593        assert!(pool.is_closed());
1594
1595        // Idle connections should be cleared
1596        assert_eq!(pool.idle_count(), 0);
1597    }
1598
1599    // -------------------- Tier 3: Drop Safety --------------------
1600
1601    #[test]
1602    fn test_drop_pooled_connection_after_poisoning_does_not_panic() {
1603        use std::panic;
1604        use std::thread;
1605
1606        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1607
1608        // Set up a connection that's "checked out"
1609        {
1610            let mut inner = pool.shared.inner.lock().unwrap();
1611            inner.total_count = 1;
1612            inner.active_count = 1;
1613        }
1614
1615        // Create a pooled connection
1616        let meta = ConnectionMeta::new(MockConnection::new(1));
1617        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1618
1619        // Poison the mutex by panicking in another thread
1620        let shared_clone = Arc::clone(&pool.shared);
1621        let handle = thread::spawn(move || {
1622            let _guard = shared_clone.inner.lock().unwrap();
1623            panic!("intentional panic to poison mutex");
1624        });
1625        let _ = handle.join();
1626
1627        // Verify mutex is poisoned
1628        assert!(pool.shared.inner.lock().is_err());
1629
1630        // Drop the pooled connection - should NOT panic
1631        // The connection will be leaked, but that's the correct behavior
1632        let drop_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1633            drop(pooled);
1634        }));
1635
1636        // Dropping should not panic
1637        assert!(
1638            drop_result.is_ok(),
1639            "Dropping PooledConnection after mutex poisoning should not panic"
1640        );
1641    }
1642
1643    #[test]
1644    fn test_detach_after_poisoning_does_not_panic() {
1645        use std::panic;
1646        use std::thread;
1647
1648        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1649
1650        // Set up a connection that's "checked out"
1651        {
1652            let mut inner = pool.shared.inner.lock().unwrap();
1653            inner.total_count = 1;
1654            inner.active_count = 1;
1655        }
1656
1657        // Create a pooled connection
1658        let meta = ConnectionMeta::new(MockConnection::new(42));
1659        let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
1660
1661        // Poison the mutex
1662        let shared_clone = Arc::clone(&pool.shared);
1663        let handle = thread::spawn(move || {
1664            let _guard = shared_clone.inner.lock().unwrap();
1665            panic!("intentional panic to poison mutex");
1666        });
1667        let _ = handle.join();
1668
1669        // Verify mutex is poisoned
1670        assert!(pool.shared.inner.lock().is_err());
1671
1672        // Detach should not panic, even though it can't update counters
1673        let detach_result = panic::catch_unwind(panic::AssertUnwindSafe(|| pooled.detach()));
1674
1675        assert!(
1676            detach_result.is_ok(),
1677            "detach() after mutex poisoning should not panic"
1678        );
1679
1680        // Should still get the connection back
1681        let conn = detach_result.unwrap();
1682        assert_eq!(conn.id, 42);
1683    }
1684
1685    // -------------------- Integration: Pool Survives Thread Panic --------------------
1686
1687    #[test]
1688    fn test_pool_survives_thread_panic_during_acquire() {
1689        use std::thread;
1690
1691        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1692        let pool_arc = Arc::new(pool);
1693
1694        // Simulate a thread that acquires, does work, then panics
1695        // The connection should be leaked but pool should remain usable for reads
1696        let pool_clone = Arc::clone(&pool_arc);
1697        let handle = thread::spawn(move || {
1698            // Manually simulate having acquired a connection
1699            {
1700                let mut inner = pool_clone.shared.inner.lock().unwrap();
1701                inner.total_count = 1;
1702                inner.active_count = 1;
1703            }
1704
1705            // Panic while holding the pool's internal mutex to simulate a poisoned lock.
1706            // This models an internal panic in pool bookkeeping, not user code.
1707            let _guard = pool_clone.shared.inner.lock().unwrap();
1708            panic!("simulated panic during database operation");
1709        });
1710
1711        // Wait for thread to panic
1712        let _ = handle.join();
1713
1714        // Pool's mutex is now poisoned, but read-only methods should still work
1715        assert_eq!(pool_arc.total_count(), 1);
1716        assert_eq!(pool_arc.config().max_connections, 5);
1717
1718        // Stats should be recoverable
1719        let stats = pool_arc.stats();
1720        assert_eq!(stats.total_connections, 1);
1721    }
1722
1723    #[test]
1724    fn test_pool_close_after_thread_panic() {
1725        use std::thread;
1726
1727        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1728
1729        // Add some idle connections
1730        {
1731            let mut inner = pool.shared.inner.lock().unwrap();
1732            inner.total_count = 2;
1733            inner
1734                .idle
1735                .push_back(ConnectionMeta::new(MockConnection::new(1)));
1736            inner
1737                .idle
1738                .push_back(ConnectionMeta::new(MockConnection::new(2)));
1739        }
1740
1741        // Poison the mutex
1742        let shared_clone = Arc::clone(&pool.shared);
1743        let handle = thread::spawn(move || {
1744            let _guard = shared_clone.inner.lock().unwrap();
1745            panic!("intentional panic");
1746        });
1747        let _ = handle.join();
1748
1749        // close() should recover and still work
1750        pool.close();
1751
1752        // Pool should be closed and idle connections cleared
1753        assert!(pool.is_closed());
1754        assert_eq!(pool.idle_count(), 0);
1755    }
1756
1757    #[test]
1758    fn test_multiple_reads_after_poisoning() {
1759        let pool = poison_pool_mutex();
1760
1761        // Multiple read operations should all succeed
1762        for _ in 0..10 {
1763            let _ = pool.config();
1764            let _ = pool.stats();
1765            let _ = pool.at_capacity();
1766            let _ = pool.is_closed();
1767            let _ = pool.idle_count();
1768            let _ = pool.active_count();
1769            let _ = pool.total_count();
1770        }
1771
1772        // All reads should have recovered successfully
1773        assert_eq!(pool.total_count(), 2);
1774    }
1775
1776    #[test]
1777    fn test_waiters_count_after_poisoning() {
1778        use std::thread;
1779
1780        let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
1781
1782        // Set up waiter count
1783        {
1784            let mut inner = pool.shared.inner.lock().unwrap();
1785            inner.waiter_count = 3;
1786        }
1787
1788        // Poison the mutex
1789        let shared_clone = Arc::clone(&pool.shared);
1790        let handle = thread::spawn(move || {
1791            let _guard = shared_clone.inner.lock().unwrap();
1792            panic!("intentional panic");
1793        });
1794        let _ = handle.join();
1795
1796        // stats() should recover and show correct waiter count
1797        let stats = pool.stats();
1798        assert_eq!(stats.pending_requests, 3);
1799    }
1800}