use crate::error::{FusekiError, FusekiResult};
use serde::Serialize;
use std::sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
};
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct AdaptiveConnectionPoolConfig {
pub min_connections: usize,
pub max_connections: usize,
pub warmup_connections: usize,
pub idle_timeout: Duration,
pub max_lifetime: Duration,
pub acquire_timeout: Duration,
pub health_check_interval: Duration,
pub target_utilization: f64,
}
impl Default for AdaptiveConnectionPoolConfig {
fn default() -> Self {
AdaptiveConnectionPoolConfig {
min_connections: 2,
max_connections: 50,
warmup_connections: 2,
idle_timeout: Duration::from_secs(300),
max_lifetime: Duration::from_secs(3600),
acquire_timeout: Duration::from_secs(30),
health_check_interval: Duration::from_secs(60),
target_utilization: 0.70,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct AdaptivePoolMetrics {
pub total_connections: usize,
pub active_connections: usize,
pub idle_connections: usize,
pub waiting_connections: usize,
pub health_check_failures: u64,
pub total_recycled: u64,
pub total_created: u64,
pub utilization: f64,
pub resize_count: u64,
pub total_wait_ms: u64,
pub total_waits: u64,
}
struct ConnectionEntry<C> {
connection: C,
created_at: Instant,
last_used: Instant,
use_count: u64,
is_healthy: bool,
}
impl<C> ConnectionEntry<C> {
fn new(connection: C) -> Self {
let now = Instant::now();
ConnectionEntry {
connection,
created_at: now,
last_used: now,
use_count: 0,
is_healthy: true,
}
}
fn is_stale(&self, cfg: &AdaptiveConnectionPoolConfig) -> bool {
let now = Instant::now();
!self.is_healthy
|| now.duration_since(self.created_at) >= cfg.max_lifetime
|| now.duration_since(self.last_used) >= cfg.idle_timeout
}
}
struct PoolInner<C> {
idle: Vec<ConnectionEntry<C>>,
}
pub struct AdaptiveConnectionPool<C: Send + 'static> {
config: AdaptiveConnectionPoolConfig,
inner: Arc<Mutex<PoolInner<C>>>,
active_count: Arc<AtomicUsize>,
waiting_count: Arc<AtomicUsize>,
total_count: Arc<AtomicUsize>,
total_created: Arc<AtomicU64>,
total_recycled: Arc<AtomicU64>,
health_check_failures: Arc<AtomicU64>,
resize_count: Arc<AtomicU64>,
total_wait_ms: Arc<AtomicU64>,
total_waits: Arc<AtomicU64>,
factory: Arc<dyn Fn() -> FusekiResult<C> + Send + Sync>,
health_check: Arc<dyn Fn(&C) -> bool + Send + Sync>,
last_resize: Arc<Mutex<Instant>>,
}
impl<C: Send + 'static> AdaptiveConnectionPool<C> {
pub fn new_with_health_check(
config: AdaptiveConnectionPoolConfig,
factory: impl Fn() -> FusekiResult<C> + Send + Sync + 'static,
health_check: impl Fn(&C) -> bool + Send + Sync + 'static,
) -> FusekiResult<Self> {
if config.min_connections > config.max_connections {
return Err(FusekiError::Configuration {
message: format!(
"min_connections ({}) must be ≤ max_connections ({})",
config.min_connections, config.max_connections
),
});
}
let warmup = config
.warmup_connections
.min(config.max_connections)
.max(config.min_connections);
let factory: Arc<dyn Fn() -> FusekiResult<C> + Send + Sync> = Arc::new(factory);
let health_check: Arc<dyn Fn(&C) -> bool + Send + Sync> = Arc::new(health_check);
let mut idle_vec = Vec::with_capacity(warmup);
for _ in 0..warmup {
let conn = factory()?;
idle_vec.push(ConnectionEntry::new(conn));
}
let pre_warmed = idle_vec.len();
let total_count = Arc::new(AtomicUsize::new(pre_warmed));
let total_created = Arc::new(AtomicU64::new(pre_warmed as u64));
info!(
min = config.min_connections,
max = config.max_connections,
warmed = pre_warmed,
"AdaptiveConnectionPool created"
);
Ok(AdaptiveConnectionPool {
config,
inner: Arc::new(Mutex::new(PoolInner { idle: idle_vec })),
active_count: Arc::new(AtomicUsize::new(0)),
waiting_count: Arc::new(AtomicUsize::new(0)),
total_count,
total_created,
total_recycled: Arc::new(AtomicU64::new(0)),
health_check_failures: Arc::new(AtomicU64::new(0)),
resize_count: Arc::new(AtomicU64::new(0)),
total_wait_ms: Arc::new(AtomicU64::new(0)),
total_waits: Arc::new(AtomicU64::new(0)),
factory,
health_check,
last_resize: Arc::new(Mutex::new(Instant::now())),
})
}
pub fn new(
config: AdaptiveConnectionPoolConfig,
factory: impl Fn() -> FusekiResult<C> + Send + Sync + 'static,
) -> FusekiResult<Self> {
Self::new_with_health_check(config, factory, |_| true)
}
pub fn acquire(&self) -> FusekiResult<AdaptivePoolGuard<C>> {
let deadline = Instant::now() + self.config.acquire_timeout;
let mut is_waiting = false;
let wait_start = Instant::now();
loop {
let maybe_entry = {
let mut guard = self.inner.lock().map_err(|e| FusekiError::Internal {
message: format!("pool lock poisoned: {e}"),
})?;
let recycled = Self::evict_stale(&mut guard.idle, &self.config);
if recycled > 0 {
self.total_recycled
.fetch_add(recycled as u64, Ordering::Relaxed);
self.total_count.fetch_sub(recycled, Ordering::SeqCst);
debug!(recycled, "Evicted stale connections");
}
guard.idle.pop()
};
if let Some(mut entry) = maybe_entry {
let healthy = (self.health_check)(&entry.connection);
if !healthy {
self.health_check_failures.fetch_add(1, Ordering::Relaxed);
self.total_recycled.fetch_add(1, Ordering::Relaxed);
self.total_count.fetch_sub(1, Ordering::SeqCst);
debug!("Health check failed; discarding connection");
if Instant::now() >= deadline {
return Err(FusekiError::TimeoutWithMessage(format!(
"AdaptiveConnectionPool: acquire timed out after {:?} (all connections unhealthy)",
self.config.acquire_timeout
)));
}
continue; }
entry.last_used = Instant::now();
entry.use_count += 1;
self.active_count.fetch_add(1, Ordering::Relaxed);
if is_waiting {
let elapsed_ms = wait_start.elapsed().as_millis() as u64;
self.total_wait_ms.fetch_add(elapsed_ms, Ordering::Relaxed);
self.total_waits.fetch_add(1, Ordering::Relaxed);
self.waiting_count.fetch_sub(1, Ordering::Relaxed);
}
debug!(
active = self.active_count.load(Ordering::Relaxed),
"Connection acquired from idle pool"
);
return Ok(AdaptivePoolGuard {
inner: Arc::clone(&self.inner),
entry: Some(entry),
active_count: Arc::clone(&self.active_count),
total_count: Arc::clone(&self.total_count),
total_recycled: Arc::clone(&self.total_recycled),
config: self.config.clone(),
});
}
let current_total = self.total_count.load(Ordering::Relaxed);
if current_total < self.config.max_connections {
let prev = self.total_count.compare_exchange(
current_total,
current_total + 1,
Ordering::SeqCst,
Ordering::Relaxed,
);
if prev.is_ok() {
match (self.factory)() {
Ok(conn) => {
let healthy = (self.health_check)(&conn);
if !healthy {
self.health_check_failures.fetch_add(1, Ordering::Relaxed);
self.total_recycled.fetch_add(1, Ordering::Relaxed);
self.total_count.fetch_sub(1, Ordering::SeqCst);
debug!("Newly created connection failed health check; discarding");
if Instant::now() >= deadline {
return Err(FusekiError::TimeoutWithMessage(format!(
"AdaptiveConnectionPool: acquire timed out after {:?} (all connections unhealthy)",
self.config.acquire_timeout
)));
}
continue;
}
self.total_created.fetch_add(1, Ordering::Relaxed);
let mut entry = ConnectionEntry::new(conn);
entry.use_count = 1;
entry.last_used = Instant::now();
self.active_count.fetch_add(1, Ordering::Relaxed);
if is_waiting {
let elapsed_ms = wait_start.elapsed().as_millis() as u64;
self.total_wait_ms.fetch_add(elapsed_ms, Ordering::Relaxed);
self.total_waits.fetch_add(1, Ordering::Relaxed);
self.waiting_count.fetch_sub(1, Ordering::Relaxed);
}
debug!(
total = self.total_count.load(Ordering::Relaxed),
"New connection created"
);
return Ok(AdaptivePoolGuard {
inner: Arc::clone(&self.inner),
entry: Some(entry),
active_count: Arc::clone(&self.active_count),
total_count: Arc::clone(&self.total_count),
total_recycled: Arc::clone(&self.total_recycled),
config: self.config.clone(),
});
}
Err(e) => {
self.total_count.fetch_sub(1, Ordering::SeqCst);
if is_waiting {
self.waiting_count.fetch_sub(1, Ordering::Relaxed);
}
return Err(e);
}
}
}
continue;
}
if !is_waiting {
is_waiting = true;
self.waiting_count.fetch_add(1, Ordering::Relaxed);
}
if Instant::now() >= deadline {
self.waiting_count.fetch_sub(1, Ordering::Relaxed);
return Err(FusekiError::TimeoutWithMessage(format!(
"AdaptiveConnectionPool: acquire timed out after {:?}",
self.config.acquire_timeout
)));
}
std::thread::sleep(Duration::from_millis(5));
}
}
pub fn run_health_checks(&self) -> FusekiResult<usize> {
let mut guard = self.inner.lock().map_err(|e| FusekiError::Internal {
message: format!("pool lock poisoned during health check: {e}"),
})?;
let before = guard.idle.len();
let health_check = &*self.health_check;
guard.idle.retain(|entry| {
let ok = health_check(&entry.connection);
if !ok {
self.health_check_failures.fetch_add(1, Ordering::Relaxed);
}
ok
});
let removed = before - guard.idle.len();
if removed > 0 {
self.total_recycled
.fetch_add(removed as u64, Ordering::Relaxed);
self.total_count.fetch_sub(removed, Ordering::SeqCst);
debug!(removed, "Health-check removed unhealthy connections");
}
let current_total = self.total_count.load(Ordering::Relaxed);
let min = self.config.min_connections;
let active = self.active_count.load(Ordering::Relaxed);
let desired_idle = min.saturating_sub(active);
let current_idle = guard.idle.len();
if current_idle < desired_idle && current_total < self.config.max_connections {
let to_create = desired_idle - current_idle;
for _ in 0..to_create {
match (self.factory)() {
Ok(conn) => {
self.total_created.fetch_add(1, Ordering::Relaxed);
self.total_count.fetch_add(1, Ordering::SeqCst);
guard.idle.push(ConnectionEntry::new(conn));
}
Err(e) => {
warn!("Failed to create connection during re-warm: {}", e);
break;
}
}
}
}
Ok(removed)
}
pub fn maybe_resize(&self) -> FusekiResult<()> {
{
let mut last = self.last_resize.lock().map_err(|e| FusekiError::Internal {
message: format!("resize lock poisoned: {e}"),
})?;
if last.elapsed() < self.config.health_check_interval {
return Ok(());
}
*last = Instant::now();
}
let active = self.active_count.load(Ordering::Relaxed);
let total = self.total_count.load(Ordering::Relaxed);
let utilization = if total == 0 {
0.0
} else {
active as f64 / total as f64
};
if utilization > self.config.target_utilization && total < self.config.max_connections {
match (self.factory)() {
Ok(conn) => {
self.total_created.fetch_add(1, Ordering::Relaxed);
let entry = ConnectionEntry::new(conn);
let mut guard = self.inner.lock().map_err(|e| FusekiError::Internal {
message: format!("pool lock poisoned on grow: {e}"),
})?;
guard.idle.push(entry);
self.total_count.fetch_add(1, Ordering::SeqCst);
self.resize_count.fetch_add(1, Ordering::Relaxed);
info!(total = total + 1, utilization, "Pool grown");
}
Err(e) => {
warn!("Could not grow pool: {}", e);
}
}
} else if utilization < self.config.target_utilization / 2.0
&& total > self.config.min_connections
{
let removed = {
let mut guard = self.inner.lock().map_err(|e| FusekiError::Internal {
message: format!("pool lock poisoned on shrink: {e}"),
})?;
guard.idle.pop().is_some()
};
if removed {
self.total_count.fetch_sub(1, Ordering::SeqCst);
self.total_recycled.fetch_add(1, Ordering::Relaxed);
self.resize_count.fetch_add(1, Ordering::Relaxed);
info!(total = total - 1, utilization, "Pool shrunk");
}
}
Ok(())
}
pub fn metrics(&self) -> AdaptivePoolMetrics {
let idle = self.inner.lock().map(|g| g.idle.len()).unwrap_or(0);
let active = self.active_count.load(Ordering::Relaxed);
let total = self.total_count.load(Ordering::Relaxed);
AdaptivePoolMetrics {
total_connections: total,
active_connections: active,
idle_connections: idle,
waiting_connections: self.waiting_count.load(Ordering::Relaxed),
health_check_failures: self.health_check_failures.load(Ordering::Relaxed),
total_recycled: self.total_recycled.load(Ordering::Relaxed),
total_created: self.total_created.load(Ordering::Relaxed),
utilization: if total == 0 {
0.0
} else {
active as f64 / total as f64
},
resize_count: self.resize_count.load(Ordering::Relaxed),
total_wait_ms: self.total_wait_ms.load(Ordering::Relaxed),
total_waits: self.total_waits.load(Ordering::Relaxed),
}
}
fn evict_stale(
idle: &mut Vec<ConnectionEntry<C>>,
cfg: &AdaptiveConnectionPoolConfig,
) -> usize {
let before = idle.len();
idle.retain(|e| !e.is_stale(cfg));
before - idle.len()
}
}
pub struct AdaptivePoolGuard<C: Send + 'static> {
inner: Arc<Mutex<PoolInner<C>>>,
entry: Option<ConnectionEntry<C>>,
active_count: Arc<AtomicUsize>,
total_count: Arc<AtomicUsize>,
total_recycled: Arc<AtomicU64>,
config: AdaptiveConnectionPoolConfig,
}
impl<C: Send + 'static> std::ops::Deref for AdaptivePoolGuard<C> {
type Target = C;
fn deref(&self) -> &C {
&self
.entry
.as_ref()
.expect("AdaptivePoolGuard: entry is None")
.connection
}
}
impl<C: Send + 'static> std::ops::DerefMut for AdaptivePoolGuard<C> {
fn deref_mut(&mut self) -> &mut C {
&mut self
.entry
.as_mut()
.expect("AdaptivePoolGuard: entry is None")
.connection
}
}
impl<C: Send + 'static> Drop for AdaptivePoolGuard<C> {
fn drop(&mut self) {
let entry = match self.entry.take() {
Some(e) => e,
None => return,
};
self.active_count.fetch_sub(1, Ordering::Relaxed);
if entry.is_stale(&self.config) {
self.total_count.fetch_sub(1, Ordering::SeqCst);
self.total_recycled.fetch_add(1, Ordering::Relaxed);
debug!("Expired connection discarded on return");
return;
}
match self.inner.lock() {
Ok(mut guard) => {
guard.idle.push(entry);
debug!("Connection returned to idle pool");
}
Err(e) => {
warn!("Pool lock poisoned on return: {}", e);
self.total_count.fetch_sub(1, Ordering::SeqCst);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
struct Conn {
id: usize,
healthy: bool,
}
fn make_factory(counter: Arc<AtomicUsize>) -> impl Fn() -> FusekiResult<Conn> + Send + Sync {
move || {
let id = counter.fetch_add(1, Ordering::Relaxed);
Ok(Conn { id, healthy: true })
}
}
fn default_cfg(min: usize, max: usize) -> AdaptiveConnectionPoolConfig {
AdaptiveConnectionPoolConfig {
min_connections: min,
max_connections: max,
warmup_connections: min,
idle_timeout: Duration::from_secs(300),
max_lifetime: Duration::from_secs(3600),
acquire_timeout: Duration::from_millis(500),
health_check_interval: Duration::from_secs(60),
target_utilization: 0.7,
}
}
#[test]
fn test_warmup_connections() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(3, 10), make_factory(counter)).unwrap();
let m = pool.metrics();
assert_eq!(m.total_connections, 3, "Expected 3 warmed connections");
assert_eq!(m.idle_connections, 3);
assert_eq!(m.active_connections, 0);
}
#[test]
fn test_acquire_increases_active() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(2, 10), make_factory(counter)).unwrap();
let _guard = pool.acquire().unwrap();
assert_eq!(pool.metrics().active_connections, 1);
}
#[test]
fn test_release_decreases_active() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(2, 10), make_factory(counter)).unwrap();
{
let _guard = pool.acquire().unwrap();
assert_eq!(pool.metrics().active_connections, 1);
}
assert_eq!(pool.metrics().active_connections, 0);
}
#[test]
fn test_connection_returned_to_idle() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(2, 10), make_factory(counter)).unwrap();
let id = {
let guard = pool.acquire().unwrap();
guard.id
};
let m = pool.metrics();
assert_eq!(
m.idle_connections, 2,
"Connection returned; idle should be 2"
);
let _ = id;
}
#[test]
fn test_pool_grows_on_demand() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(1, 10), make_factory(counter)).unwrap();
let _g1 = pool.acquire().unwrap();
let _g2 = pool.acquire().unwrap();
assert!(pool.metrics().total_connections >= 2);
}
#[test]
fn test_max_connections_enforced() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(0, 2), make_factory(counter)).unwrap();
let _g1 = pool.acquire().unwrap();
let _g2 = pool.acquire().unwrap();
let result = pool.acquire();
assert!(result.is_err(), "Should time out when pool is at max");
}
#[test]
fn test_invalid_config_rejected() {
let counter = Arc::new(AtomicUsize::new(0));
let cfg = AdaptiveConnectionPoolConfig {
min_connections: 10,
max_connections: 5,
..Default::default()
};
let result = AdaptiveConnectionPool::new(cfg, make_factory(counter));
assert!(result.is_err());
}
#[test]
fn test_health_check_failures_counted() {
let counter = Arc::new(AtomicUsize::new(0));
let factory = {
let c = counter.clone();
move || {
let id = c.fetch_add(1, Ordering::Relaxed);
Ok(Conn { id, healthy: false })
}
};
let pool = AdaptiveConnectionPool::new_with_health_check(
default_cfg(2, 10),
factory,
|conn: &Conn| conn.healthy,
)
.unwrap();
let result = pool.acquire();
assert!(result.is_err());
assert!(pool.metrics().health_check_failures > 0);
}
#[test]
fn test_run_health_checks_removes_unhealthy() {
let counter = Arc::new(AtomicUsize::new(0));
let healthy_flag = Arc::new(std::sync::atomic::AtomicBool::new(true));
let flag = healthy_flag.clone();
let factory = {
let c = counter.clone();
move || {
let id = c.fetch_add(1, Ordering::Relaxed);
Ok(Conn { id, healthy: true })
}
};
let flag2 = flag.clone();
let pool = AdaptiveConnectionPool::new_with_health_check(
default_cfg(2, 10),
factory,
move |_conn: &Conn| flag2.load(Ordering::Relaxed),
)
.unwrap();
healthy_flag.store(false, Ordering::Relaxed);
let removed = pool.run_health_checks().unwrap();
assert_eq!(removed, 2, "Both idle connections should be removed");
}
#[test]
fn test_health_checks_rewarm() {
let counter = Arc::new(AtomicUsize::new(0));
let healthy_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let flag = healthy_flag.clone();
let factory_counter = counter.clone();
let factory = move || {
let id = factory_counter.fetch_add(1, Ordering::Relaxed);
Ok(Conn { id, healthy: true })
};
let flag2 = flag.clone();
let pool = AdaptiveConnectionPool::new_with_health_check(
default_cfg(2, 10),
factory,
move |_: &Conn| flag2.load(Ordering::Relaxed),
)
.unwrap();
pool.run_health_checks().unwrap();
let m = pool.metrics();
assert!(m.total_created >= 2);
}
#[test]
fn test_metrics_utilization() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(4, 10), make_factory(counter)).unwrap();
let _g1 = pool.acquire().unwrap();
let _g2 = pool.acquire().unwrap();
let m = pool.metrics();
assert!(m.utilization > 0.0 && m.utilization <= 1.0);
}
#[test]
fn test_deref_access() {
let counter = Arc::new(AtomicUsize::new(0));
let pool =
AdaptiveConnectionPool::new(default_cfg(1, 5), make_factory(counter.clone())).unwrap();
let guard = pool.acquire().unwrap();
let _id: usize = guard.id;
}
#[test]
fn test_deref_mut_access() {
let pool: AdaptiveConnectionPool<Vec<u8>> =
AdaptiveConnectionPool::new(default_cfg(1, 5), || Ok(Vec::new())).unwrap();
let mut guard = pool.acquire().unwrap();
guard.push(42u8);
assert_eq!(guard.len(), 1);
}
#[test]
fn test_sequential_reuse() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(2, 10), make_factory(counter)).unwrap();
let id1 = { pool.acquire().unwrap().id };
let id2 = { pool.acquire().unwrap().id };
assert!(id1 < 2 && id2 < 2, "Should reuse pre-warmed connections");
}
#[test]
fn test_waiting_count_resets() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(0, 0), make_factory(counter)).unwrap();
let _ = pool.acquire(); assert_eq!(pool.metrics().waiting_connections, 0);
}
#[test]
fn test_total_created_tracks_growth() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(1, 5), make_factory(counter)).unwrap();
let initial = pool.metrics().total_created;
let _g1 = pool.acquire().unwrap();
let _g2 = pool.acquire().unwrap();
let after = pool.metrics().total_created;
assert!(after > initial, "total_created should increase");
}
#[test]
fn test_warmup_clamped_to_max() {
let cfg = AdaptiveConnectionPoolConfig {
min_connections: 1,
max_connections: 3,
warmup_connections: 10, ..Default::default()
};
let pool = AdaptiveConnectionPool::new(cfg, || Ok(0usize)).unwrap();
let m = pool.metrics();
assert!(m.total_connections <= 3, "warmup should be clamped to max");
}
#[test]
fn test_idle_timeout_eviction() {
let counter = Arc::new(AtomicUsize::new(0));
let cfg = AdaptiveConnectionPoolConfig {
min_connections: 2,
max_connections: 10,
warmup_connections: 2,
idle_timeout: Duration::from_millis(1), max_lifetime: Duration::from_secs(3600),
acquire_timeout: Duration::from_secs(5),
health_check_interval: Duration::from_secs(60),
target_utilization: 0.7,
};
let pool = AdaptiveConnectionPool::new(cfg, make_factory(counter)).unwrap();
std::thread::sleep(Duration::from_millis(5)); let guard = pool.acquire().unwrap();
assert!(pool.metrics().total_recycled > 0 || guard.id >= 2);
}
#[test]
fn test_health_check_empty_pool() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(0, 5), make_factory(counter)).unwrap();
let removed = pool.run_health_checks().unwrap();
assert_eq!(removed, 0);
}
#[test]
fn test_resize_throttle() {
let counter = Arc::new(AtomicUsize::new(0));
let cfg = AdaptiveConnectionPoolConfig {
min_connections: 1,
max_connections: 10,
warmup_connections: 1,
health_check_interval: Duration::from_secs(3600), target_utilization: 0.0, ..Default::default()
};
let pool = AdaptiveConnectionPool::new(cfg, make_factory(counter)).unwrap();
pool.maybe_resize().unwrap();
let r1 = pool.metrics().resize_count;
pool.maybe_resize().unwrap(); let r2 = pool.metrics().resize_count;
assert_eq!(r1, r2, "Second resize should be throttled");
}
#[test]
fn test_pool_shrinks_on_low_utilization() {
let counter = Arc::new(AtomicUsize::new(0));
let cfg = AdaptiveConnectionPoolConfig {
min_connections: 1,
max_connections: 10,
warmup_connections: 5, health_check_interval: Duration::from_millis(1), target_utilization: 0.9, idle_timeout: Duration::from_secs(3600),
max_lifetime: Duration::from_secs(3600),
acquire_timeout: Duration::from_secs(5),
};
let pool = AdaptiveConnectionPool::new(cfg, make_factory(counter)).unwrap();
let before = pool.metrics().total_connections;
std::thread::sleep(Duration::from_millis(5));
pool.maybe_resize().unwrap();
let after = pool.metrics().total_connections;
assert!(after <= before);
}
#[test]
fn test_concurrent_acquires() {
use std::sync::Arc;
let counter = Arc::new(AtomicUsize::new(0));
let pool = Arc::new(
AdaptiveConnectionPool::new(default_cfg(2, 20), make_factory(counter)).unwrap(),
);
let mut handles = Vec::new();
for _ in 0..8 {
let p = Arc::clone(&pool);
handles.push(std::thread::spawn(move || {
let guard = p.acquire()?;
std::thread::sleep(Duration::from_millis(5));
drop(guard);
FusekiResult::Ok(())
}));
}
for h in handles {
h.join().unwrap().unwrap();
}
assert_eq!(pool.metrics().active_connections, 0);
}
#[test]
fn test_guard_id_accessible() {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AdaptiveConnectionPool::new(default_cfg(1, 5), make_factory(counter)).unwrap();
let guard = pool.acquire().unwrap();
assert!(guard.id < 100);
}
#[test]
fn test_waiting_during_saturation() {
use std::sync::Arc;
let counter = Arc::new(AtomicUsize::new(0));
let pool = Arc::new(
AdaptiveConnectionPool::new(default_cfg(2, 2), make_factory(counter)).unwrap(),
);
let _g1 = pool.acquire().unwrap();
let _g2 = pool.acquire().unwrap();
let pool2 = Arc::clone(&pool);
let handle = std::thread::spawn(move || {
pool2.acquire()
});
std::thread::sleep(Duration::from_millis(20));
let m = pool.metrics();
assert!(m.waiting_connections > 0 || m.active_connections == 2);
let _ = handle.join();
}
#[test]
fn test_total_recycled_increases_on_eviction() {
let counter = Arc::new(AtomicUsize::new(0));
let cfg = AdaptiveConnectionPoolConfig {
min_connections: 2,
max_connections: 10,
warmup_connections: 2,
idle_timeout: Duration::from_millis(1),
max_lifetime: Duration::from_secs(3600),
acquire_timeout: Duration::from_secs(5),
health_check_interval: Duration::from_secs(60),
target_utilization: 0.7,
};
let pool = AdaptiveConnectionPool::new(cfg, make_factory(counter)).unwrap();
std::thread::sleep(Duration::from_millis(5));
let _g = pool.acquire().unwrap(); assert!(pool.metrics().total_recycled > 0);
}
}