use crate::pool::config::PoolConfig;
use crate::pool::pooled_object::{PooledObject, PooledObjectState};
use crate::pool::KeyedPoolFactory;
use crate::{FastDFSError, Result};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{Mutex, RwLock, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::timeout;
pub(super) struct PoolInner<K, T, F>
where
K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
F: KeyedPoolFactory<K, T>,
{
pub pools: RwLock<HashMap<K, Arc<Mutex<VecDeque<PooledObject<K, T, F>>>>>>,
pub semaphores: RwLock<HashMap<K, Arc<Semaphore>>>,
pub factory: Arc<F>,
pub config: PoolConfig,
pub(super) closed: AtomicBool,
next_id: AtomicU32,
}
impl<K, T, F> PoolInner<K, T, F>
where
K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
F: KeyedPoolFactory<K, T>,
{
async fn get_semaphore(&self, key: &K) -> Arc<Semaphore> {
{
let read_guard = self.semaphores.read().await;
if let Some(sem) = read_guard.get(key) {
return sem.clone();
}
}
let mut write_guard = self.semaphores.write().await;
write_guard
.entry(key.clone())
.or_insert_with(|| Arc::new(Semaphore::new(self.config.max_total_per_key)))
.clone()
}
pub(super) async fn validate_pooled(
&self,
key: &K,
pooled: &mut PooledObject<K, T, F>,
obj: Option<&mut T>,
) -> Result<bool> {
let obj = match (obj, pooled.obj.as_mut()) {
(Some(obj), _) => obj,
(None, Some(obj)) => obj,
(_, _) => return Ok(false),
};
let result = timeout(self.config.test_timeout, self.factory.validate(key, obj));
match result.await {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(e),
Err(_) => Ok(false),
}
}
pub(super) async fn borrow(self: &Arc<Self>, key: &K) -> Result<PooledObject<K, T, F>> {
if self.closed() {
return Err(FastDFSError::ClientClosed);
}
let semaphore = self.get_semaphore(key).await;
let permit = semaphore.clone().acquire_owned().await.unwrap();
{
let pools_read = self.pools.read().await;
if let Some(queue_ref) = pools_read.get(key).cloned() {
drop(pools_read);
loop {
let mut queue_lock = queue_ref.lock().await;
let Some(mut pooled) = queue_lock.pop_front() else {
break;
};
drop(queue_lock);
if pooled.state != PooledObjectState::Idle {
if pooled.state == PooledObjectState::Eviction {
pooled.state = PooledObjectState::EvictionReturnToHead;
}
continue;
}
let Some(mut obj) = pooled.take() else {
continue;
};
if self.config.test_on_borrow {
pooled.state = PooledObjectState::ValidationPreallocated;
let validate = self.validate_pooled(key, &mut pooled, Some(&mut obj)).await;
if !validate.unwrap_or(false) {
pooled.state = PooledObjectState::Invalid;
self.factory.destroy(key, obj).await;
continue;
}
}
pooled.state = PooledObjectState::Allocated;
pooled.last_borrow = Instant::now();
pooled.last_use = Instant::now();
pooled.obj = Some(obj);
pooled._permit = Some(permit);
pooled.borrowed_count = pooled.borrowed_count.wrapping_add(1);
return Ok(pooled);
}
}
}
let obj = self.factory.create(key).await?;
{
let mut pools_write = self.pools.write().await;
if !pools_write.contains_key(key) {
pools_write.insert(
key.clone(),
Arc::new(Mutex::new(VecDeque::with_capacity(
self.config.max_total_per_key,
))),
);
}
}
Ok(PooledObject {
id: self.next_id.fetch_add(1, Ordering::SeqCst),
obj: Some(obj),
key: Some(key.clone()),
state: PooledObjectState::Allocated,
borrowed_count: 1,
pool: Some(Arc::downgrade(self)),
_permit: Some(permit),
..PooledObject::default()
})
}
pub(super) async fn return_object(self: &Arc<Self>, mut pooled: PooledObject<K, T, F>) {
match pooled.state {
PooledObjectState::Eviction
| PooledObjectState::EvictionReturnToHead
| PooledObjectState::Invalid
| PooledObjectState::Abandoned
| PooledObjectState::Returning => return,
_ => {}
};
let Some(obj) = pooled.take() else {
return;
};
let Some(key) = pooled.key.clone() else {
return;
};
pooled._permit = None;
pooled.state = PooledObjectState::Returning;
let new_pooled = pooled.drop_return_new();
self.return_to_pool(new_pooled, key, obj).await;
}
pub(super) async fn return_to_pool(
self: &Arc<Self>,
mut new_pooled: PooledObject<K, T, F>,
key: K,
mut obj: T,
) {
if self.closed() {
new_pooled.state = PooledObjectState::Abandoned;
self.factory.destroy(&key, obj).await;
return;
}
let pools_read = self.pools.read().await;
let Some(queue_arc) = pools_read.get(&key).cloned() else {
drop(pools_read);
new_pooled.state = PooledObjectState::Abandoned;
self.factory.destroy(&key, obj).await;
return;
};
drop(pools_read);
if self.config.test_on_return {
new_pooled.state = PooledObjectState::ValidationReturnToHead;
let validate = self
.validate_pooled(&key, &mut new_pooled, Some(&mut obj))
.await;
if !validate.unwrap_or(false) {
new_pooled.state = PooledObjectState::Invalid;
self.factory.destroy(&key, obj).await;
return;
}
}
if self.closed() {
new_pooled.state = PooledObjectState::Abandoned;
self.factory.destroy(&key, obj).await;
return;
}
new_pooled.key = Some(key);
new_pooled.pool = Some(Arc::downgrade(self));
new_pooled.obj = Some(obj);
let mut queue = queue_arc.lock().await;
new_pooled.state = PooledObjectState::Idle;
new_pooled.last_return = Instant::now();
if self.config.lifo {
queue.push_front(new_pooled);
} else {
queue.push_back(new_pooled);
}
}
pub(super) async fn clear_key(self: &Arc<Self>, key: &K) {
{
let mut w_lock = self.pools.write().await;
if let Some(pools) = w_lock.get_mut(key) {
let mut guard = pools.lock().await;
guard.clear();
}
}
{
let mut w_lock = self.semaphores.write().await;
if let Some(semaphores) = w_lock.get_mut(key) {
semaphores.close();
}
w_lock.insert(
key.clone(),
Arc::new(Semaphore::new(self.config.max_total_per_key)),
);
}
}
pub(super) async fn remove_key(self: &Arc<Self>, key: &K) {
{
let mut w_lock = self.pools.write().await;
w_lock.remove(key);
}
{
let mut w_lock = self.semaphores.write().await;
w_lock.remove(key);
}
}
pub(super) async fn remove_all(&self) {
self.pools.write().await.clear();
self.semaphores.write().await.clear();
}
#[inline]
pub fn closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
}
pub struct KeyedObjectPool<K, T, F>
where
K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
F: KeyedPoolFactory<K, T>,
{
pub(super) inner: Arc<PoolInner<K, T, F>>,
pub(super) evictor_handle: Arc<Option<JoinHandle<()>>>,
}
#[allow(dead_code)]
impl<K, T, F> KeyedObjectPool<K, T, F>
where
K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
F: KeyedPoolFactory<K, T>,
{
pub fn new(factory: F, config: Option<PoolConfig>) -> Self {
let inner = PoolInner {
pools: RwLock::new(HashMap::new()),
semaphores: RwLock::new(HashMap::new()),
factory: Arc::new(factory),
config: config.unwrap_or_default(),
closed: AtomicBool::new(false),
next_id: AtomicU32::new(0),
};
let mut pool = Self {
inner: Arc::new(inner),
evictor_handle: Arc::new(None),
};
pool.start_evictor();
pool
}
pub async fn borrow(&self, key: &K) -> Result<PooledObject<K, T, F>> {
self.inner.borrow(key).await
}
pub async fn get(&self, key: &K) -> Result<PooledObject<K, T, F>> {
self.inner.borrow(key).await
}
pub async fn return_object(&self, pooled: PooledObject<K, T, F>) {
self.inner.return_object(pooled).await
}
pub async fn clear_key(&self, key: &K) {
self.inner.clear_key(key).await
}
pub async fn remove_key(&self, key: &K) {
self.inner.remove_key(key).await
}
pub async fn remove_all(&self) {
self.inner.remove_all().await
}
pub fn close(&self) {
self.inner.closed.store(true, Ordering::Relaxed)
}
}
impl<K, T, F> Clone for KeyedObjectPool<K, T, F>
where
K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
F: KeyedPoolFactory<K, T>,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
evictor_handle: self.evictor_handle.clone(),
}
}
}
impl<K, T, F> Drop for KeyedObjectPool<K, T, F>
where
K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
F: KeyedPoolFactory<K, T>,
{
fn drop(&mut self) {
let count = Arc::strong_count(&self.evictor_handle);
if count <= 1 {
self.close();
}
}
}