pub mod replica;
pub use replica::{ReplicaPool, ReplicaStrategy};
pub mod sharding;
pub use sharding::{ModuloShardChooser, QueryHints, ShardChooser, ShardedPool, ShardedPoolStats};
use std::collections::VecDeque;
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::time::{Duration, Instant};
use asupersync::{CancelReason, Cx, Outcome};
use sqlmodel_core::error::{ConnectionError, ConnectionErrorKind, PoolError, PoolErrorKind};
use sqlmodel_core::{Connection, Error};
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub min_connections: usize,
pub max_connections: usize,
pub idle_timeout_ms: u64,
pub acquire_timeout_ms: u64,
pub max_lifetime_ms: u64,
pub test_on_checkout: bool,
pub test_on_return: bool,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
min_connections: 1,
max_connections: 10,
idle_timeout_ms: 600_000, acquire_timeout_ms: 30_000, max_lifetime_ms: 1_800_000, test_on_checkout: true,
test_on_return: false,
}
}
}
impl PoolConfig {
#[must_use]
pub fn new(max_connections: usize) -> Self {
Self {
max_connections,
..Default::default()
}
}
#[must_use]
pub fn min_connections(mut self, n: usize) -> Self {
self.min_connections = n;
self
}
#[must_use]
pub fn idle_timeout(mut self, ms: u64) -> Self {
self.idle_timeout_ms = ms;
self
}
#[must_use]
pub fn acquire_timeout(mut self, ms: u64) -> Self {
self.acquire_timeout_ms = ms;
self
}
#[must_use]
pub fn max_lifetime(mut self, ms: u64) -> Self {
self.max_lifetime_ms = ms;
self
}
#[must_use]
pub fn test_on_checkout(mut self, enabled: bool) -> Self {
self.test_on_checkout = enabled;
self
}
#[must_use]
pub fn test_on_return(mut self, enabled: bool) -> Self {
self.test_on_return = enabled;
self
}
}
#[derive(Debug, Clone, Default)]
pub struct PoolStats {
pub total_connections: usize,
pub idle_connections: usize,
pub active_connections: usize,
pub pending_requests: usize,
pub connections_created: u64,
pub connections_closed: u64,
pub acquires: u64,
pub timeouts: u64,
}
#[derive(Debug)]
struct ConnectionMeta<C> {
conn: C,
created_at: Instant,
last_used: Instant,
}
impl<C> ConnectionMeta<C> {
fn new(conn: C) -> Self {
let now = Instant::now();
Self {
conn,
created_at: now,
last_used: now,
}
}
fn touch(&mut self) {
self.last_used = Instant::now();
}
fn age(&self) -> Duration {
self.created_at.elapsed()
}
fn idle_time(&self) -> Duration {
self.last_used.elapsed()
}
}
struct PoolInner<C> {
config: PoolConfig,
idle: VecDeque<ConnectionMeta<C>>,
active_count: usize,
total_count: usize,
waiter_count: usize,
closed: bool,
}
impl<C> PoolInner<C> {
fn new(config: PoolConfig) -> Self {
Self {
config,
idle: VecDeque::new(),
active_count: 0,
total_count: 0,
waiter_count: 0,
closed: false,
}
}
fn can_create_new(&self) -> bool {
!self.closed && self.total_count < self.config.max_connections
}
fn stats(&self) -> PoolStats {
PoolStats {
total_connections: self.total_count,
idle_connections: self.idle.len(),
active_connections: self.active_count,
pending_requests: self.waiter_count,
..Default::default()
}
}
}
struct PoolShared<C> {
inner: Mutex<PoolInner<C>>,
conn_available: Condvar,
connections_created: AtomicU64,
connections_closed: AtomicU64,
acquires: AtomicU64,
timeouts: AtomicU64,
}
impl<C> PoolShared<C> {
fn new(config: PoolConfig) -> Self {
Self {
inner: Mutex::new(PoolInner::new(config)),
conn_available: Condvar::new(),
connections_created: AtomicU64::new(0),
connections_closed: AtomicU64::new(0),
acquires: AtomicU64::new(0),
timeouts: AtomicU64::new(0),
}
}
fn lock_or_recover(&self) -> std::sync::MutexGuard<'_, PoolInner<C>> {
self.inner.lock().unwrap_or_else(|poisoned| {
tracing::error!(
"Pool mutex poisoned; recovering for read-only access. \
A thread panicked while holding the lock."
);
poisoned.into_inner()
})
}
#[allow(clippy::result_large_err)] fn lock_or_error(
&self,
operation: &'static str,
) -> Result<std::sync::MutexGuard<'_, PoolInner<C>>, Error> {
self.inner
.lock()
.map_err(|_| Error::Pool(PoolError::poisoned(operation)))
}
}
pub struct Pool<C: Connection> {
shared: Arc<PoolShared<C>>,
}
impl<C: Connection> Pool<C> {
#[must_use]
pub fn new(config: PoolConfig) -> Self {
Self {
shared: Arc::new(PoolShared::new(config)),
}
}
#[must_use]
pub fn config(&self) -> PoolConfig {
let inner = self.shared.lock_or_recover();
inner.config.clone()
}
#[must_use]
pub fn stats(&self) -> PoolStats {
let inner = self.shared.lock_or_recover();
let mut stats = inner.stats();
stats.connections_created = self.shared.connections_created.load(Ordering::Relaxed);
stats.connections_closed = self.shared.connections_closed.load(Ordering::Relaxed);
stats.acquires = self.shared.acquires.load(Ordering::Relaxed);
stats.timeouts = self.shared.timeouts.load(Ordering::Relaxed);
stats
}
#[must_use]
pub fn at_capacity(&self) -> bool {
let inner = self.shared.lock_or_recover();
inner.total_count >= inner.config.max_connections
}
#[must_use]
pub fn is_closed(&self) -> bool {
let inner = self.shared.lock_or_recover();
inner.closed
}
pub async fn acquire<F, Fut>(&self, cx: &Cx, factory: F) -> Outcome<PooledConnection<C>, Error>
where
F: Fn() -> Fut,
Fut: Future<Output = Outcome<C, Error>>,
{
let deadline = Instant::now() + Duration::from_millis(self.config().acquire_timeout_ms);
let test_on_checkout = self.config().test_on_checkout;
let max_lifetime = Duration::from_millis(self.config().max_lifetime_ms);
let idle_timeout = Duration::from_millis(self.config().idle_timeout_ms);
loop {
if cx.is_cancel_requested() {
return Outcome::Cancelled(CancelReason::user("pool acquire cancelled"));
}
if Instant::now() >= deadline {
self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
return Outcome::Err(Error::Pool(PoolError {
kind: PoolErrorKind::Timeout,
message: "acquire timeout: no connections available".to_string(),
source: None,
}));
}
let action = {
let mut inner = match self.shared.lock_or_error("acquire") {
Ok(guard) => guard,
Err(e) => return Outcome::Err(e),
};
if inner.closed {
AcquireAction::PoolClosed
} else {
let mut found_conn = None;
while let Some(mut meta) = inner.idle.pop_front() {
if meta.age() > max_lifetime {
inner.total_count -= 1;
self.shared
.connections_closed
.fetch_add(1, Ordering::Relaxed);
continue;
}
if meta.idle_time() > idle_timeout {
inner.total_count -= 1;
self.shared
.connections_closed
.fetch_add(1, Ordering::Relaxed);
continue;
}
meta.touch();
inner.active_count += 1;
found_conn = Some(meta);
break;
}
if let Some(meta) = found_conn {
AcquireAction::ValidateExisting(meta)
} else if inner.can_create_new() {
inner.total_count += 1;
inner.active_count += 1;
AcquireAction::CreateNew
} else {
inner.waiter_count += 1;
AcquireAction::Wait
}
}
};
match action {
AcquireAction::PoolClosed => {
return Outcome::Err(Error::Pool(PoolError {
kind: PoolErrorKind::Closed,
message: "pool has been closed".to_string(),
source: None,
}));
}
AcquireAction::ValidateExisting(meta) => {
return self.validate_and_wrap(cx, meta, test_on_checkout).await;
}
AcquireAction::CreateNew => {
match factory().await {
Outcome::Ok(conn) => {
self.shared
.connections_created
.fetch_add(1, Ordering::Relaxed);
self.shared.acquires.fetch_add(1, Ordering::Relaxed);
let meta = ConnectionMeta::new(conn);
return Outcome::Ok(PooledConnection::new(
meta,
Arc::downgrade(&self.shared),
));
}
Outcome::Err(e) => {
if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
inner.total_count -= 1;
inner.active_count -= 1;
}
return Outcome::Err(e);
}
Outcome::Cancelled(reason) => {
if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
inner.total_count -= 1;
inner.active_count -= 1;
}
return Outcome::Cancelled(reason);
}
Outcome::Panicked(info) => {
if let Ok(mut inner) = self.shared.lock_or_error("acquire_cleanup") {
inner.total_count -= 1;
inner.active_count -= 1;
}
return Outcome::Panicked(info);
}
}
}
AcquireAction::Wait => {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
if let Ok(mut inner) = self.shared.lock_or_error("acquire_timeout") {
inner.waiter_count -= 1;
}
self.shared.timeouts.fetch_add(1, Ordering::Relaxed);
return Outcome::Err(Error::Pool(PoolError {
kind: PoolErrorKind::Timeout,
message: "acquire timeout: no connections available".to_string(),
source: None,
}));
}
let wait_time = remaining.min(Duration::from_millis(100));
{
let inner = match self.shared.lock_or_error("acquire_wait") {
Ok(guard) => guard,
Err(e) => return Outcome::Err(e),
};
let _ = self
.shared
.conn_available
.wait_timeout(inner, wait_time)
.map_err(|_| {
tracing::error!("Pool mutex poisoned during wait_timeout");
});
}
{
if let Ok(mut inner) = self.shared.lock_or_error("acquire_wake") {
inner.waiter_count = inner.waiter_count.saturating_sub(1);
}
}
}
}
}
}
async fn validate_and_wrap(
&self,
cx: &Cx,
meta: ConnectionMeta<C>,
test_on_checkout: bool,
) -> Outcome<PooledConnection<C>, Error> {
if test_on_checkout {
match meta.conn.ping(cx).await {
Outcome::Ok(()) => {
self.shared.acquires.fetch_add(1, Ordering::Relaxed);
Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
}
Outcome::Err(_) | Outcome::Cancelled(_) | Outcome::Panicked(_) => {
{
if let Ok(mut inner) = self.shared.lock_or_error("validate_cleanup") {
inner.total_count -= 1;
inner.active_count -= 1;
}
}
self.shared
.connections_closed
.fetch_add(1, Ordering::Relaxed);
Outcome::Err(Error::Connection(ConnectionError {
kind: ConnectionErrorKind::Disconnected,
message: "connection validation failed".to_string(),
source: None,
}))
}
}
} else {
self.shared.acquires.fetch_add(1, Ordering::Relaxed);
Outcome::Ok(PooledConnection::new(meta, Arc::downgrade(&self.shared)))
}
}
pub fn clear_idle(&self) {
if let Ok(mut inner) = self.shared.inner.lock() {
let idle_count = inner.idle.len();
inner.idle.clear();
inner.total_count -= idle_count;
self.shared
.connections_closed
.fetch_add(idle_count as u64, Ordering::Relaxed);
}
}
pub fn close(&self) {
match self.shared.inner.lock() {
Ok(mut inner) => {
inner.closed = true;
let idle_count = inner.idle.len();
inner.idle.clear();
inner.total_count -= idle_count;
self.shared
.connections_closed
.fetch_add(idle_count as u64, Ordering::Relaxed);
drop(inner);
}
Err(poisoned) => {
tracing::error!(
"Pool mutex poisoned during close; attempting recovery. \
Pool state may be inconsistent."
);
let mut inner = poisoned.into_inner();
inner.closed = true;
let idle_count = inner.idle.len();
inner.idle.clear();
inner.total_count -= idle_count;
self.shared
.connections_closed
.fetch_add(idle_count as u64, Ordering::Relaxed);
}
}
self.shared.conn_available.notify_all();
}
#[must_use]
pub fn idle_count(&self) -> usize {
let inner = self.shared.lock_or_recover();
inner.idle.len()
}
#[must_use]
pub fn active_count(&self) -> usize {
let inner = self.shared.lock_or_recover();
inner.active_count
}
#[must_use]
pub fn total_count(&self) -> usize {
let inner = self.shared.lock_or_recover();
inner.total_count
}
}
enum AcquireAction<C> {
PoolClosed,
ValidateExisting(ConnectionMeta<C>),
CreateNew,
Wait,
}
pub struct PooledConnection<C: Connection> {
meta: Option<ConnectionMeta<C>>,
pool: Weak<PoolShared<C>>,
}
impl<C: Connection> PooledConnection<C> {
fn new(meta: ConnectionMeta<C>, pool: Weak<PoolShared<C>>) -> Self {
Self {
meta: Some(meta),
pool,
}
}
pub fn detach(mut self) -> C {
if let Some(pool) = self.pool.upgrade() {
match pool.inner.lock() {
Ok(mut inner) => {
inner.total_count -= 1;
inner.active_count -= 1;
pool.connections_closed.fetch_add(1, Ordering::Relaxed);
}
Err(_poisoned) => {
tracing::error!(
"Pool mutex poisoned during detach; pool counters will be inconsistent"
);
pool.connections_closed.fetch_add(1, Ordering::Relaxed);
}
}
}
self.meta.take().expect("connection already detached").conn
}
#[must_use]
pub fn age(&self) -> Duration {
self.meta.as_ref().map_or(Duration::ZERO, |m| m.age())
}
#[must_use]
pub fn idle_time(&self) -> Duration {
self.meta.as_ref().map_or(Duration::ZERO, |m| m.idle_time())
}
}
impl<C: Connection> std::ops::Deref for PooledConnection<C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self
.meta
.as_ref()
.expect("connection already returned to pool")
.conn
}
}
impl<C: Connection> std::ops::DerefMut for PooledConnection<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self
.meta
.as_mut()
.expect("connection already returned to pool")
.conn
}
}
impl<C: Connection> Drop for PooledConnection<C> {
fn drop(&mut self) {
if let Some(mut meta) = self.meta.take() {
meta.touch(); if let Some(pool) = self.pool.upgrade() {
let mut inner = match pool.inner.lock() {
Ok(guard) => guard,
Err(_poisoned) => {
tracing::error!(
"Pool mutex poisoned during connection return; \
connection will be leaked. A thread panicked while holding the lock."
);
return;
}
};
if inner.closed {
inner.total_count -= 1;
inner.active_count -= 1;
pool.connections_closed.fetch_add(1, Ordering::Relaxed);
return;
}
let max_lifetime = Duration::from_millis(inner.config.max_lifetime_ms);
if meta.age() > max_lifetime {
inner.total_count -= 1;
inner.active_count -= 1;
pool.connections_closed.fetch_add(1, Ordering::Relaxed);
return;
}
inner.active_count -= 1;
inner.idle.push_back(meta);
drop(inner);
pool.conn_available.notify_one();
}
}
}
}
impl<C: Connection + std::fmt::Debug> std::fmt::Debug for PooledConnection<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PooledConnection")
.field("conn", &self.meta.as_ref().map(|m| &m.conn))
.field("age", &self.age())
.field("idle_time", &self.idle_time())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use sqlmodel_core::connection::{IsolationLevel, PreparedStatement, TransactionOps};
use sqlmodel_core::{Row, Value};
use std::sync::atomic::AtomicBool;
#[derive(Debug)]
struct MockConnection {
id: u32,
ping_should_fail: Arc<AtomicBool>,
}
impl MockConnection {
fn new(id: u32) -> Self {
Self {
id,
ping_should_fail: Arc::new(AtomicBool::new(false)),
}
}
#[allow(dead_code)]
fn with_ping_behavior(id: u32, should_fail: Arc<AtomicBool>) -> Self {
Self {
id,
ping_should_fail: should_fail,
}
}
}
struct MockTx;
impl TransactionOps for MockTx {
async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
Outcome::Ok(vec![])
}
async fn query_one(
&self,
_cx: &Cx,
_sql: &str,
_params: &[Value],
) -> Outcome<Option<Row>, Error> {
Outcome::Ok(None)
}
async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
Outcome::Ok(0)
}
async fn savepoint(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
Outcome::Ok(())
}
async fn rollback_to(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
Outcome::Ok(())
}
async fn release(&self, _cx: &Cx, _name: &str) -> Outcome<(), Error> {
Outcome::Ok(())
}
async fn commit(self, _cx: &Cx) -> Outcome<(), Error> {
Outcome::Ok(())
}
async fn rollback(self, _cx: &Cx) -> Outcome<(), Error> {
Outcome::Ok(())
}
}
impl Connection for MockConnection {
type Tx<'conn> = MockTx;
async fn query(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<Vec<Row>, Error> {
Outcome::Ok(vec![])
}
async fn query_one(
&self,
_cx: &Cx,
_sql: &str,
_params: &[Value],
) -> Outcome<Option<Row>, Error> {
Outcome::Ok(None)
}
async fn execute(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<u64, Error> {
Outcome::Ok(0)
}
async fn insert(&self, _cx: &Cx, _sql: &str, _params: &[Value]) -> Outcome<i64, Error> {
Outcome::Ok(0)
}
async fn batch(
&self,
_cx: &Cx,
_statements: &[(String, Vec<Value>)],
) -> Outcome<Vec<u64>, Error> {
Outcome::Ok(vec![])
}
async fn begin(&self, _cx: &Cx) -> Outcome<Self::Tx<'_>, Error> {
Outcome::Ok(MockTx)
}
async fn begin_with(
&self,
_cx: &Cx,
_isolation: IsolationLevel,
) -> Outcome<Self::Tx<'_>, Error> {
Outcome::Ok(MockTx)
}
async fn prepare(&self, _cx: &Cx, _sql: &str) -> Outcome<PreparedStatement, Error> {
Outcome::Ok(PreparedStatement::new(1, String::new(), 0))
}
async fn query_prepared(
&self,
_cx: &Cx,
_stmt: &PreparedStatement,
_params: &[Value],
) -> Outcome<Vec<Row>, Error> {
Outcome::Ok(vec![])
}
async fn execute_prepared(
&self,
_cx: &Cx,
_stmt: &PreparedStatement,
_params: &[Value],
) -> Outcome<u64, Error> {
Outcome::Ok(0)
}
async fn ping(&self, _cx: &Cx) -> Outcome<(), Error> {
if self.ping_should_fail.load(Ordering::Relaxed) {
Outcome::Err(Error::Connection(ConnectionError {
kind: ConnectionErrorKind::Disconnected,
message: "mock ping failed".to_string(),
source: None,
}))
} else {
Outcome::Ok(())
}
}
async fn close(self, _cx: &Cx) -> Result<(), Error> {
Ok(())
}
}
#[test]
fn test_config_default() {
let config = PoolConfig::default();
assert_eq!(config.min_connections, 1);
assert_eq!(config.max_connections, 10);
assert_eq!(config.idle_timeout_ms, 600_000);
assert_eq!(config.acquire_timeout_ms, 30_000);
assert_eq!(config.max_lifetime_ms, 1_800_000);
assert!(config.test_on_checkout);
assert!(!config.test_on_return);
}
#[test]
fn test_config_builder() {
let config = PoolConfig::new(20)
.min_connections(5)
.idle_timeout(60_000)
.acquire_timeout(5_000)
.max_lifetime(300_000)
.test_on_checkout(false)
.test_on_return(true);
assert_eq!(config.min_connections, 5);
assert_eq!(config.max_connections, 20);
assert_eq!(config.idle_timeout_ms, 60_000);
assert_eq!(config.acquire_timeout_ms, 5_000);
assert_eq!(config.max_lifetime_ms, 300_000);
assert!(!config.test_on_checkout);
assert!(config.test_on_return);
}
#[test]
fn test_config_clone() {
let config = PoolConfig::new(15).min_connections(3);
let cloned = config.clone();
assert_eq!(config.max_connections, cloned.max_connections);
assert_eq!(config.min_connections, cloned.min_connections);
}
#[test]
fn test_stats_default() {
let stats = PoolStats::default();
assert_eq!(stats.total_connections, 0);
assert_eq!(stats.idle_connections, 0);
assert_eq!(stats.active_connections, 0);
assert_eq!(stats.pending_requests, 0);
assert_eq!(stats.connections_created, 0);
assert_eq!(stats.connections_closed, 0);
assert_eq!(stats.acquires, 0);
assert_eq!(stats.timeouts, 0);
}
#[test]
fn test_stats_clone() {
let stats = PoolStats {
total_connections: 5,
acquires: 100,
..Default::default()
};
let cloned = stats.clone();
assert_eq!(stats.total_connections, cloned.total_connections);
assert_eq!(stats.acquires, cloned.acquires);
}
#[test]
fn test_connection_meta_timing() {
use std::thread;
struct DummyConn;
let meta = ConnectionMeta::new(DummyConn);
let initial_age = meta.age();
thread::sleep(Duration::from_millis(10));
assert!(meta.age() > initial_age);
assert!(meta.idle_time() > Duration::ZERO);
}
#[test]
fn test_connection_meta_touch() {
use std::thread;
struct DummyConn;
let mut meta = ConnectionMeta::new(DummyConn);
thread::sleep(Duration::from_millis(10));
let idle_before_touch = meta.idle_time();
assert!(idle_before_touch > Duration::ZERO);
meta.touch();
let idle_after_touch = meta.idle_time();
assert!(idle_after_touch < idle_before_touch);
}
#[test]
fn test_pool_new() {
let config = PoolConfig::new(5);
let pool: Pool<MockConnection> = Pool::new(config);
assert_eq!(pool.idle_count(), 0);
assert_eq!(pool.active_count(), 0);
assert_eq!(pool.total_count(), 0);
assert!(!pool.is_closed());
assert!(!pool.at_capacity());
}
#[test]
fn test_pool_config() {
let config = PoolConfig::new(7).min_connections(2);
let pool: Pool<MockConnection> = Pool::new(config);
let retrieved_config = pool.config();
assert_eq!(retrieved_config.max_connections, 7);
assert_eq!(retrieved_config.min_connections, 2);
}
#[test]
fn test_pool_stats_initial() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
let stats = pool.stats();
assert_eq!(stats.total_connections, 0);
assert_eq!(stats.idle_connections, 0);
assert_eq!(stats.active_connections, 0);
assert_eq!(stats.pending_requests, 0);
assert_eq!(stats.connections_created, 0);
assert_eq!(stats.connections_closed, 0);
assert_eq!(stats.acquires, 0);
assert_eq!(stats.timeouts, 0);
}
#[test]
fn test_pool_close() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
assert!(!pool.is_closed());
pool.close();
assert!(pool.is_closed());
}
#[test]
fn test_pool_inner_can_create_new() {
let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(3));
assert!(inner.can_create_new());
inner.total_count = 3;
assert!(!inner.can_create_new());
inner.total_count = 2;
assert!(inner.can_create_new());
inner.closed = true;
assert!(!inner.can_create_new());
}
#[test]
fn test_pool_inner_stats() {
let mut inner = PoolInner::<MockConnection>::new(PoolConfig::new(10));
inner.total_count = 5;
inner.active_count = 3;
inner.waiter_count = 2;
inner
.idle
.push_back(ConnectionMeta::new(MockConnection::new(1)));
inner
.idle
.push_back(ConnectionMeta::new(MockConnection::new(2)));
let stats = inner.stats();
assert_eq!(stats.total_connections, 5);
assert_eq!(stats.idle_connections, 2);
assert_eq!(stats.active_connections, 3);
assert_eq!(stats.pending_requests, 2);
}
#[test]
fn test_pooled_connection_age_and_idle_time() {
use std::thread;
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(1));
let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
assert!(pooled.age() >= Duration::ZERO);
thread::sleep(Duration::from_millis(5));
assert!(pooled.age() > Duration::ZERO);
}
#[test]
fn test_pooled_connection_detach() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(42));
let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
assert_eq!(pool.total_count(), 1);
assert_eq!(pool.active_count(), 1);
let conn = pooled.detach();
assert_eq!(conn.id, 42);
assert_eq!(pool.total_count(), 0);
assert_eq!(pool.active_count(), 0);
let stats = pool.stats();
assert_eq!(stats.connections_closed, 1);
}
#[test]
fn test_pooled_connection_drop_returns_to_pool() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(1));
let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
assert_eq!(pool.active_count(), 1);
assert_eq!(pool.idle_count(), 0);
drop(pooled);
assert_eq!(pool.active_count(), 0);
assert_eq!(pool.idle_count(), 1);
assert_eq!(pool.total_count(), 1); }
#[test]
fn test_pooled_connection_drop_when_pool_closed() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(1));
let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
pool.close();
drop(pooled);
assert_eq!(pool.idle_count(), 0);
assert_eq!(pool.active_count(), 0);
assert_eq!(pool.total_count(), 0);
assert_eq!(pool.stats().connections_closed, 1);
}
#[test]
fn test_pooled_connection_deref() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(99));
let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
assert_eq!(pooled.id, 99);
}
#[test]
fn test_pooled_connection_deref_mut() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(1));
let mut pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
pooled.id = 50;
assert_eq!(pooled.id, 50);
}
#[test]
fn test_pooled_connection_debug() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(1));
let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
let debug_str = format!("{:?}", pooled);
assert!(debug_str.contains("PooledConnection"));
assert!(debug_str.contains("age"));
}
#[test]
fn test_pool_at_capacity() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(2));
assert!(!pool.at_capacity());
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
}
assert!(!pool.at_capacity());
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 2;
}
assert!(pool.at_capacity());
}
#[test]
fn test_acquire_action_enum() {
let closed: AcquireAction<MockConnection> = AcquireAction::PoolClosed;
assert!(matches!(closed, AcquireAction::PoolClosed));
let create: AcquireAction<MockConnection> = AcquireAction::CreateNew;
assert!(matches!(create, AcquireAction::CreateNew));
let wait: AcquireAction<MockConnection> = AcquireAction::Wait;
assert!(matches!(wait, AcquireAction::Wait));
let meta = ConnectionMeta::new(MockConnection::new(1));
let validate: AcquireAction<MockConnection> = AcquireAction::ValidateExisting(meta);
assert!(matches!(validate, AcquireAction::ValidateExisting(_)));
}
#[test]
fn test_pool_shared_atomic_counters() {
let shared = PoolShared::<MockConnection>::new(PoolConfig::new(5));
assert_eq!(shared.connections_created.load(Ordering::Relaxed), 0);
assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 0);
assert_eq!(shared.acquires.load(Ordering::Relaxed), 0);
assert_eq!(shared.timeouts.load(Ordering::Relaxed), 0);
shared.connections_created.fetch_add(1, Ordering::Relaxed);
shared.connections_closed.fetch_add(2, Ordering::Relaxed);
shared.acquires.fetch_add(10, Ordering::Relaxed);
shared.timeouts.fetch_add(3, Ordering::Relaxed);
assert_eq!(shared.connections_created.load(Ordering::Relaxed), 1);
assert_eq!(shared.connections_closed.load(Ordering::Relaxed), 2);
assert_eq!(shared.acquires.load(Ordering::Relaxed), 10);
assert_eq!(shared.timeouts.load(Ordering::Relaxed), 3);
}
#[test]
fn test_pool_close_clears_idle() {
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 3;
inner
.idle
.push_back(ConnectionMeta::new(MockConnection::new(1)));
inner
.idle
.push_back(ConnectionMeta::new(MockConnection::new(2)));
inner
.idle
.push_back(ConnectionMeta::new(MockConnection::new(3)));
}
assert_eq!(pool.idle_count(), 3);
assert_eq!(pool.total_count(), 3);
pool.close();
assert_eq!(pool.idle_count(), 0);
assert_eq!(pool.total_count(), 0);
assert!(pool.is_closed());
assert_eq!(pool.stats().connections_closed, 3);
}
fn poison_pool_mutex() -> Pool<MockConnection> {
use std::panic;
use std::thread;
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 2;
inner.active_count = 1;
inner
.idle
.push_back(ConnectionMeta::new(MockConnection::new(1)));
}
let shared_clone = Arc::clone(&pool.shared);
let handle = thread::spawn(move || {
let _guard = shared_clone.inner.lock().unwrap();
panic!("intentional panic to poison mutex");
});
let _ = handle.join();
assert!(pool.shared.inner.lock().is_err());
pool
}
#[test]
fn test_config_after_poisoning_returns_valid_data() {
let pool = poison_pool_mutex();
let config = pool.config();
assert_eq!(config.max_connections, 5);
}
#[test]
fn test_stats_after_poisoning_returns_valid_data() {
let pool = poison_pool_mutex();
let stats = pool.stats();
assert_eq!(stats.total_connections, 2);
assert_eq!(stats.active_connections, 1);
assert_eq!(stats.idle_connections, 1);
}
#[test]
fn test_at_capacity_after_poisoning() {
let pool = poison_pool_mutex();
assert!(!pool.at_capacity());
}
#[test]
fn test_is_closed_after_poisoning() {
let pool = poison_pool_mutex();
assert!(!pool.is_closed());
}
#[test]
fn test_idle_count_after_poisoning() {
let pool = poison_pool_mutex();
assert_eq!(pool.idle_count(), 1);
}
#[test]
fn test_active_count_after_poisoning() {
let pool = poison_pool_mutex();
assert_eq!(pool.active_count(), 1);
}
#[test]
fn test_total_count_after_poisoning() {
let pool = poison_pool_mutex();
assert_eq!(pool.total_count(), 2);
}
#[test]
fn test_lock_or_error_returns_error_when_poisoned() {
use std::thread;
let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
let shared_clone = Arc::clone(&shared);
let handle = thread::spawn(move || {
let _guard = shared_clone.inner.lock().unwrap();
panic!("intentional panic to poison mutex");
});
let _ = handle.join();
let result = shared.lock_or_error("test_operation");
match result {
Err(Error::Pool(pool_err)) => {
assert!(matches!(pool_err.kind, PoolErrorKind::Poisoned));
assert!(pool_err.message.contains("poisoned"));
}
Err(other) => panic!("Expected Pool error, got: {:?}", other),
Ok(_) => panic!("Expected error, got Ok"),
}
}
#[test]
fn test_lock_or_recover_succeeds_when_poisoned() {
use std::thread;
let shared = Arc::new(PoolShared::<MockConnection>::new(PoolConfig::new(5)));
{
let mut inner = shared.inner.lock().unwrap();
inner.total_count = 42;
}
let shared_clone = Arc::clone(&shared);
let handle = thread::spawn(move || {
let _guard = shared_clone.inner.lock().unwrap();
panic!("intentional panic to poison mutex");
});
let _ = handle.join();
assert!(shared.inner.lock().is_err());
let inner = shared.lock_or_recover();
assert_eq!(inner.total_count, 42);
}
#[test]
fn test_close_after_poisoning_recovers_and_closes() {
let pool = poison_pool_mutex();
pool.close();
assert!(pool.is_closed());
assert_eq!(pool.idle_count(), 0);
}
#[test]
fn test_drop_pooled_connection_after_poisoning_does_not_panic() {
use std::panic;
use std::thread;
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(1));
let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
let shared_clone = Arc::clone(&pool.shared);
let handle = thread::spawn(move || {
let _guard = shared_clone.inner.lock().unwrap();
panic!("intentional panic to poison mutex");
});
let _ = handle.join();
assert!(pool.shared.inner.lock().is_err());
let drop_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
drop(pooled);
}));
assert!(
drop_result.is_ok(),
"Dropping PooledConnection after mutex poisoning should not panic"
);
}
#[test]
fn test_detach_after_poisoning_does_not_panic() {
use std::panic;
use std::thread;
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let meta = ConnectionMeta::new(MockConnection::new(42));
let pooled = PooledConnection::new(meta, Arc::downgrade(&pool.shared));
let shared_clone = Arc::clone(&pool.shared);
let handle = thread::spawn(move || {
let _guard = shared_clone.inner.lock().unwrap();
panic!("intentional panic to poison mutex");
});
let _ = handle.join();
assert!(pool.shared.inner.lock().is_err());
let detach_result = panic::catch_unwind(panic::AssertUnwindSafe(|| pooled.detach()));
assert!(
detach_result.is_ok(),
"detach() after mutex poisoning should not panic"
);
let conn = detach_result.unwrap();
assert_eq!(conn.id, 42);
}
#[test]
fn test_pool_survives_thread_panic_during_acquire() {
use std::thread;
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
let pool_arc = Arc::new(pool);
let pool_clone = Arc::clone(&pool_arc);
let handle = thread::spawn(move || {
{
let mut inner = pool_clone.shared.inner.lock().unwrap();
inner.total_count = 1;
inner.active_count = 1;
}
let _guard = pool_clone.shared.inner.lock().unwrap();
panic!("simulated panic during database operation");
});
let _ = handle.join();
assert_eq!(pool_arc.total_count(), 1);
assert_eq!(pool_arc.config().max_connections, 5);
let stats = pool_arc.stats();
assert_eq!(stats.total_connections, 1);
}
#[test]
fn test_pool_close_after_thread_panic() {
use std::thread;
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.total_count = 2;
inner
.idle
.push_back(ConnectionMeta::new(MockConnection::new(1)));
inner
.idle
.push_back(ConnectionMeta::new(MockConnection::new(2)));
}
let shared_clone = Arc::clone(&pool.shared);
let handle = thread::spawn(move || {
let _guard = shared_clone.inner.lock().unwrap();
panic!("intentional panic");
});
let _ = handle.join();
pool.close();
assert!(pool.is_closed());
assert_eq!(pool.idle_count(), 0);
}
#[test]
fn test_multiple_reads_after_poisoning() {
let pool = poison_pool_mutex();
for _ in 0..10 {
let _ = pool.config();
let _ = pool.stats();
let _ = pool.at_capacity();
let _ = pool.is_closed();
let _ = pool.idle_count();
let _ = pool.active_count();
let _ = pool.total_count();
}
assert_eq!(pool.total_count(), 2);
}
#[test]
fn test_waiters_count_after_poisoning() {
use std::thread;
let pool: Pool<MockConnection> = Pool::new(PoolConfig::new(5));
{
let mut inner = pool.shared.inner.lock().unwrap();
inner.waiter_count = 3;
}
let shared_clone = Arc::clone(&pool.shared);
let handle = thread::spawn(move || {
let _guard = shared_clone.inner.lock().unwrap();
panic!("intentional panic");
});
let _ = handle.join();
let stats = pool.stats();
assert_eq!(stats.pending_requests, 3);
}
}