use super::pool::PoolInner;
use super::{KeyedObjectPool, KeyedPoolFactory};
use crate::pool::pooled_object::PooledObjectState;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::MissedTickBehavior;
fn zero2max(dur: Duration) -> Duration {
if dur.is_zero() {
return Duration::MAX;
}
dur
}
impl<K, T, F> KeyedObjectPool<K, T, F>
where
K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
F: KeyedPoolFactory<K, T>,
{
async fn evictor(inner: Arc<PoolInner<K, T, F>>) {
let time_between = inner.config.time_between_eviction_runs;
if !inner.config.test_while_idle || time_between.is_zero() {
return;
};
let min_idle_duration = inner.config.min_evictable_idle_duration;
let max_lifetime = zero2max(inner.config.max_lifetime);
let idle_timeout = zero2max(inner.config.idle_timeout);
let mut timer = tokio::time::interval(time_between);
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
if inner.closed() {
break;
}
timer.tick().await;
if inner.closed() {
break;
}
let keys = {
let pools_read = inner.pools.read().await;
pools_read.keys().cloned().collect::<Vec<_>>()
};
for key in keys {
let pool = {
let pools_read = inner.pools.read().await;
pools_read.get(&key).cloned()
};
let Some(pool) = pool else {
continue;
};
let ids = {
let guard = pool.lock().await;
let iter = guard.iter().map(|pooled| pooled.id);
if inner.config.lifo {
iter.rev().collect::<Vec<_>>()
} else {
iter.collect::<Vec<_>>()
}
};
for id in ids {
let mut snapshot = {
let mut guard = pool.lock().await;
let Some(pooled) = guard.iter_mut().find(|p| {
p.id == id
&& p.state == PooledObjectState::Idle
&& (p.idle_duration() >= min_idle_duration
|| p.expired(max_lifetime, idle_timeout))
}) else {
continue;
};
pooled.state = PooledObjectState::Eviction;
pooled.snapshot()
};
let expired = snapshot.expired(max_lifetime, idle_timeout);
if expired {
snapshot.state = PooledObjectState::Abandoned;
if let Some(obj) = snapshot.take() {
inner.factory.destroy(&key, obj).await;
};
continue;
}
snapshot.state = PooledObjectState::Validation;
let validate = inner.validate_pooled(&key, &mut snapshot, None).await;
if !validate.unwrap_or(false) {
snapshot.state = PooledObjectState::Invalid;
if let Some(obj) = snapshot.take() {
inner.factory.destroy(&key, obj).await;
};
continue;
}
snapshot.state = PooledObjectState::Idle;
{
let mut guard = pool.lock().await;
match guard.iter_mut().find(|p| p.id == id) {
Some(pooled) => pooled.restore(snapshot),
None => guard.push_front(snapshot),
}
}
}
}
}
if inner.closed() {
inner.remove_all().await;
}
}
pub(super) fn start_evictor(&mut self) {
let inner = self.inner.clone();
let handle = tokio::spawn(async move {
Self::evictor(inner).await;
});
self.evictor_handle = Arc::new(Some(handle));
}
}