use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, Weak};
use std::thread;
use std::time::{Duration, Instant};
use crate::config::PoolConfig;
use crate::error::Error;
use crate::manager::Manager;
use crate::object::Pooled;
use crate::status::Status;
#[inline]
pub(crate) fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
mutex.lock().unwrap_or_else(PoisonError::into_inner)
}
pub(crate) struct Idle<R> {
pub(crate) resource: R,
pub(crate) created_at: Instant,
pub(crate) last_used: Instant,
}
struct State<R> {
idle: VecDeque<Idle<R>>,
total: usize,
waiters: usize,
closed: bool,
}
enum Action<R> {
Reuse(Idle<R>),
Create,
}
struct Shutdown {
stop: Mutex<bool>,
wake: Condvar,
}
impl Shutdown {
fn new() -> Self {
Shutdown {
stop: Mutex::new(false),
wake: Condvar::new(),
}
}
fn signal(&self) {
let mut stop = lock(&self.stop);
*stop = true;
drop(stop);
self.wake.notify_all();
}
}
pub(crate) struct PoolInner<M: Manager> {
pub(crate) manager: M,
config: PoolConfig,
state: Mutex<State<M::Resource>>,
available: Condvar,
shutdown: Arc<Shutdown>,
}
impl<M: Manager> Drop for PoolInner<M> {
fn drop(&mut self) {
self.shutdown.signal();
}
}
impl<M: Manager> PoolInner<M> {
fn acquire(
&self,
deadline: Option<Instant>,
) -> Result<(M::Resource, Instant), Error<M::Error>> {
loop {
let action = {
let mut state = lock(&self.state);
loop {
if state.closed {
return Err(Error::Closed);
}
if let Some(idle) = state.idle.pop_front() {
break Action::Reuse(idle);
}
if state.total < self.config.max_size {
state.total += 1;
break Action::Create;
}
match deadline {
None => {
state.waiters += 1;
state = self
.available
.wait(state)
.unwrap_or_else(PoisonError::into_inner);
state.waiters -= 1;
}
Some(dl) => {
let now = Instant::now();
if now >= dl {
return Err(Error::Timeout);
}
state.waiters += 1;
let (guard, _) = self
.available
.wait_timeout(state, dl - now)
.unwrap_or_else(PoisonError::into_inner);
state = guard;
state.waiters -= 1;
}
}
}
};
match action {
Action::Reuse(idle) => {
if let Some(prepared) = self.prepare(idle) {
return Ok(prepared);
}
self.release_slot();
}
Action::Create => match self.manager.create() {
Ok(resource) => return Ok((resource, Instant::now())),
Err(source) => {
self.release_slot();
return Err(Error::Backend(source));
}
},
}
}
}
fn prepare(&self, mut idle: Idle<M::Resource>) -> Option<(M::Resource, Instant)> {
if self.is_time_expired(&idle, Instant::now()) {
return None;
}
if !self.manager.validate(&mut idle.resource) {
return None;
}
Some((idle.resource, idle.created_at))
}
fn is_time_expired(&self, idle: &Idle<M::Resource>, now: Instant) -> bool {
if let Some(max_lifetime) = self.config.max_lifetime {
if now.saturating_duration_since(idle.created_at) >= max_lifetime {
return true;
}
}
if let Some(idle_timeout) = self.config.idle_timeout {
if now.saturating_duration_since(idle.last_used) >= idle_timeout {
return true;
}
}
false
}
fn reap(&self) {
if self.config.idle_timeout.is_none() && self.config.max_lifetime.is_none() {
return;
}
let now = Instant::now();
let mut expired = Vec::new();
let waiters;
{
let mut state = lock(&self.state);
if state.closed {
return;
}
let mut kept = VecDeque::with_capacity(state.idle.len());
while let Some(idle) = state.idle.pop_front() {
if self.is_time_expired(&idle, now) {
expired.push(idle.resource);
} else {
kept.push_back(idle);
}
}
state.total = state.total.saturating_sub(expired.len());
state.idle = kept;
waiters = state.waiters;
}
drop(expired); self.wake_all(waiters);
}
fn release_slot(&self) {
let mut state = lock(&self.state);
state.total = state.total.saturating_sub(1);
let waiters = state.waiters;
drop(state);
self.wake_one(waiters);
}
pub(crate) fn checkin(&self, mut resource: M::Resource, created_at: Instant) {
let recycled = self.manager.recycle(&mut resource);
let mut state = lock(&self.state);
let waiters = state.waiters;
if state.closed || recycled.is_err() {
state.total = state.total.saturating_sub(1);
drop(state);
self.wake_one(waiters);
} else {
let last_used = Instant::now();
state.idle.push_back(Idle {
resource,
created_at,
last_used,
});
drop(state);
self.wake_one(waiters);
}
}
#[inline]
fn wake_one(&self, waiters: usize) {
if waiters > 0 {
self.available.notify_one();
}
}
#[inline]
fn wake_all(&self, waiters: usize) {
if waiters > 0 {
self.available.notify_all();
}
}
}
fn reaper_loop<M: Manager>(pool: Weak<PoolInner<M>>, shutdown: Arc<Shutdown>, interval: Duration) {
loop {
{
let stop = lock(&shutdown.stop);
if *stop {
return;
}
let (stop, _timed_out) = shutdown
.wake
.wait_timeout(stop, interval)
.unwrap_or_else(PoisonError::into_inner);
if *stop {
return;
}
}
match pool.upgrade() {
Some(inner) => inner.reap(),
None => return,
}
}
}
pub struct Pool<M: Manager>(Arc<PoolInner<M>>);
impl<M: Manager> Clone for Pool<M> {
fn clone(&self) -> Self {
Pool(Arc::clone(&self.0))
}
}
impl<M: Manager> Pool<M> {
pub fn builder(manager: M) -> Builder<M> {
Builder::new(manager)
}
pub fn new(manager: M) -> Result<Self, Error<M::Error>> {
Builder::new(manager).build()
}
pub fn get(&self) -> Result<Pooled<M>, Error<M::Error>> {
let deadline = self
.0
.config
.create_timeout
.map(|timeout| Instant::now() + timeout);
self.acquire(deadline)
}
pub fn get_timeout(&self, timeout: Duration) -> Result<Pooled<M>, Error<M::Error>> {
self.acquire(Some(Instant::now() + timeout))
}
pub fn try_get(&self) -> Result<Pooled<M>, Error<M::Error>> {
self.acquire(Some(Instant::now()))
}
fn acquire(&self, deadline: Option<Instant>) -> Result<Pooled<M>, Error<M::Error>> {
let (resource, created_at) = self.0.acquire(deadline)?;
Ok(Pooled::new(Arc::clone(&self.0), resource, created_at))
}
pub fn status(&self) -> Status {
let state = lock(&self.0.state);
let idle = state.idle.len();
let size = state.total;
Status {
size,
idle,
in_use: size.saturating_sub(idle),
max_size: self.0.config.max_size,
}
}
pub fn close(&self) {
let mut state = lock(&self.0.state);
let drained = std::mem::take(&mut state.idle);
state.total = state.total.saturating_sub(drained.len());
state.closed = true;
let waiters = state.waiters;
drop(state);
self.0.wake_all(waiters); self.0.shutdown.signal(); drop(drained); }
#[must_use]
pub fn is_closed(&self) -> bool {
lock(&self.0.state).closed
}
}
#[must_use = "a Builder does nothing until `.build()` is called"]
pub struct Builder<M: Manager> {
manager: M,
config: PoolConfig,
}
impl<M: Manager> Builder<M> {
pub fn new(manager: M) -> Self {
Builder {
manager,
config: PoolConfig::default(),
}
}
pub fn max_size(mut self, max_size: usize) -> Self {
self.config.max_size = max_size;
self
}
pub fn min_idle(mut self, min_idle: usize) -> Self {
self.config.min_idle = min_idle;
self
}
pub fn create_timeout(mut self, timeout: Option<Duration>) -> Self {
self.config.create_timeout = timeout;
self
}
pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
self.config.idle_timeout = timeout;
self
}
pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
self.config.max_lifetime = lifetime;
self
}
pub fn reap_interval(mut self, interval: Option<Duration>) -> Self {
self.config.reap_interval = interval;
self
}
pub fn config(mut self, config: PoolConfig) -> Self {
self.config = config;
self
}
pub fn build(self) -> Result<Pool<M>, Error<M::Error>> {
if self.config.max_size == 0 {
return Err(Error::InvalidConfig("max_size must be at least 1"));
}
if self.config.min_idle > self.config.max_size {
return Err(Error::InvalidConfig("min_idle must not exceed max_size"));
}
let pool = Pool(Arc::new(PoolInner {
manager: self.manager,
config: self.config,
state: Mutex::new(State {
idle: VecDeque::with_capacity(self.config.max_size),
total: 0,
waiters: 0,
closed: false,
}),
available: Condvar::new(),
shutdown: Arc::new(Shutdown::new()),
}));
for _ in 0..pool.0.config.min_idle {
match pool.0.manager.create() {
Ok(resource) => {
let now = Instant::now();
let mut state = lock(&pool.0.state);
state.idle.push_back(Idle {
resource,
created_at: now,
last_used: now,
});
state.total += 1;
}
Err(source) => return Err(Error::Backend(source)),
}
}
pool.spawn_reaper();
Ok(pool)
}
}
impl<M: Manager> Pool<M> {
fn spawn_reaper(&self) {
let Some(interval) = self.0.config.reap_interval else {
return;
};
let pool = Arc::downgrade(&self.0);
let shutdown = Arc::clone(&self.0.shutdown);
match thread::Builder::new()
.name("pool-mod-reaper".to_owned())
.spawn(move || reaper_loop(pool, shutdown, interval))
{
Ok(handle) => drop(handle),
Err(_) => self.0.shutdown.signal(),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[derive(Debug, PartialEq, Eq)]
struct TestError(&'static str);
impl std::fmt::Display for TestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.0)
}
}
impl std::error::Error for TestError {}
struct Steerable {
created: AtomicUsize,
recycled: AtomicUsize,
validated: AtomicUsize,
create_fails: AtomicBool,
recycle_fails: AtomicBool,
valid: AtomicBool,
}
impl Steerable {
fn new() -> Self {
Steerable {
created: AtomicUsize::new(0),
recycled: AtomicUsize::new(0),
validated: AtomicUsize::new(0),
create_fails: AtomicBool::new(false),
recycle_fails: AtomicBool::new(false),
valid: AtomicBool::new(true),
}
}
}
impl Manager for Steerable {
type Resource = usize;
type Error = TestError;
fn create(&self) -> Result<usize, TestError> {
if self.create_fails.load(Ordering::SeqCst) {
return Err(TestError("create failed"));
}
Ok(self.created.fetch_add(1, Ordering::SeqCst))
}
fn recycle(&self, _resource: &mut usize) -> Result<(), TestError> {
let _ = self.recycled.fetch_add(1, Ordering::SeqCst);
if self.recycle_fails.load(Ordering::SeqCst) {
return Err(TestError("recycle failed"));
}
Ok(())
}
fn validate(&self, _resource: &mut usize) -> bool {
let _ = self.validated.fetch_add(1, Ordering::SeqCst);
self.valid.load(Ordering::SeqCst)
}
}
fn pool(builder: impl FnOnce(Builder<Steerable>) -> Builder<Steerable>) -> Pool<Steerable> {
builder(Pool::builder(Steerable::new())).build().unwrap()
}
#[test]
fn test_build_min_idle_precreates_resources() {
let p = pool(|b| b.max_size(4).min_idle(2));
assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
let status = p.status();
assert_eq!(status.idle, 2);
assert_eq!(status.size, 2);
assert_eq!(status.in_use, 0);
}
#[test]
fn test_get_then_drop_reuses_same_resource() {
let p = pool(|b| b.max_size(4));
{
let first = p.get().unwrap();
assert_eq!(*first, 0);
}
let second = p.get().unwrap();
assert_eq!(*second, 0); assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
assert_eq!(p.0.manager.recycled.load(Ordering::SeqCst), 1);
}
#[test]
fn test_in_use_tracks_outstanding_guards() {
let p = pool(|b| b.max_size(2));
let a = p.get().unwrap();
let b = p.get().unwrap();
assert_eq!(p.status().in_use, 2);
assert_eq!(p.status().idle, 0);
drop(a);
drop(b);
assert_eq!(p.status().in_use, 0);
assert_eq!(p.status().idle, 2);
}
#[test]
fn test_saturated_pool_times_out() {
let p = pool(|b| b.max_size(1));
let _held = p.get().unwrap();
let result = p.get_timeout(Duration::ZERO);
assert!(matches!(result, Err(Error::Timeout)));
}
#[test]
fn test_invalid_resource_is_discarded_and_replaced() {
let p = pool(|b| b.max_size(4).min_idle(1));
assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
p.0.manager.valid.store(false, Ordering::SeqCst);
let resource = p.get().unwrap();
assert_eq!(*resource, 1);
assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
assert!(p.0.manager.validated.load(Ordering::SeqCst) >= 1);
}
#[test]
fn test_max_lifetime_forces_replacement() {
let p = pool(|b| b.max_size(4).min_idle(1).max_lifetime(Some(Duration::ZERO)));
let resource = p.get().unwrap();
assert_eq!(*resource, 1);
assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
}
#[test]
fn test_idle_timeout_forces_replacement() {
let p = pool(|b| b.max_size(4).min_idle(1).idle_timeout(Some(Duration::ZERO)));
let resource = p.get().unwrap();
assert_eq!(*resource, 1);
assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
}
#[test]
fn test_recycle_failure_drops_resource() {
let p = pool(|b| b.max_size(2));
p.0.manager.recycle_fails.store(true, Ordering::SeqCst);
{
let _resource = p.get().unwrap();
assert_eq!(p.status().size, 1);
}
assert_eq!(p.status().size, 0);
assert_eq!(p.status().idle, 0);
}
#[test]
fn test_create_failure_surfaces_and_frees_slot() {
let p = pool(|b| b.max_size(2));
p.0.manager.create_fails.store(true, Ordering::SeqCst);
let result = p.get();
assert!(matches!(
result,
Err(Error::Backend(TestError("create failed")))
));
assert_eq!(p.status().size, 0);
}
#[test]
fn test_closed_pool_rejects_checkout() {
let p = pool(|b| b.max_size(2).min_idle(1));
p.close();
assert!(p.is_closed());
assert!(matches!(p.get(), Err(Error::Closed)));
assert_eq!(p.status().idle, 0); }
#[test]
fn test_close_is_idempotent() {
let p = pool(|b| b.max_size(2).min_idle(2));
p.close();
p.close();
assert!(p.is_closed());
}
#[test]
fn test_build_rejects_zero_max_size() {
let result = Pool::builder(Steerable::new()).max_size(0).build();
assert!(matches!(result, Err(Error::InvalidConfig(_))));
}
#[test]
fn test_build_rejects_min_idle_above_max_size() {
let result = Pool::builder(Steerable::new())
.max_size(2)
.min_idle(3)
.build();
assert!(matches!(result, Err(Error::InvalidConfig(_))));
}
#[test]
fn test_try_get_does_not_block_when_saturated() {
let p = pool(|b| b.max_size(1));
let _held = p.try_get().unwrap();
assert!(matches!(p.try_get(), Err(Error::Timeout)));
}
#[test]
fn test_reap_prunes_time_expired_idle() {
let p = pool(|b| b.max_size(4).min_idle(2).idle_timeout(Some(Duration::ZERO)));
assert_eq!(p.status().idle, 2);
p.0.reap();
assert_eq!(p.status().idle, 0);
assert_eq!(p.status().size, 0);
}
#[test]
fn test_reap_is_noop_without_expiry_policy() {
let p = pool(|b| b.max_size(4).min_idle(2));
p.0.reap();
assert_eq!(p.status().idle, 2); }
#[test]
fn test_clone_shares_one_pool() {
let p = pool(|b| b.max_size(1));
let clone = p.clone();
let _held = p.get().unwrap();
assert!(matches!(
clone.get_timeout(Duration::ZERO),
Err(Error::Timeout)
));
}
}