fastdfs 0.1.1

Rust client for FastDFS distributed file system
Documentation
use crate::pool::config::PoolConfig;
use crate::pool::pooled_object::{PooledObject, PooledObjectState};
use crate::pool::KeyedPoolFactory;
use crate::{FastDFSError, Result};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{Mutex, RwLock, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::timeout;

pub(super) struct PoolInner<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    pub pools: RwLock<HashMap<K, Arc<Mutex<VecDeque<PooledObject<K, T, F>>>>>>,
    pub semaphores: RwLock<HashMap<K, Arc<Semaphore>>>,
    pub factory: Arc<F>,
    pub config: PoolConfig,
    pub(super) closed: AtomicBool,
    next_id: AtomicU32,
}

impl<K, T, F> PoolInner<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    async fn get_semaphore(&self, key: &K) -> Arc<Semaphore> {
        // 1. 先尝试读锁获取,避免不必要的写锁阻塞
        {
            let read_guard = self.semaphores.read().await;
            if let Some(sem) = read_guard.get(key) {
                return sem.clone();
            }
        }

        // 2. 如果没找到,获取写锁进行插入
        // 注意:这里存在 TOCTOU (Time Of Check To Time Of Use) 竞态条件
        // 即在释放读锁和获取写锁之间,其他线程可能已经插入了。
        // 所以获取写锁后要再次检查。
        let mut write_guard = self.semaphores.write().await;
        write_guard
            .entry(key.clone())
            .or_insert_with(|| Arc::new(Semaphore::new(self.config.max_total_per_key)))
            .clone()
    }

    pub(super) async fn validate_pooled(
        &self,
        key: &K,
        pooled: &mut PooledObject<K, T, F>,
        obj: Option<&mut T>,
    ) -> Result<bool> {
        let obj = match (obj, pooled.obj.as_mut()) {
            (Some(obj), _) => obj,
            (None, Some(obj)) => obj,
            (_, _) => return Ok(false),
        };

        let result = timeout(self.config.test_timeout, self.factory.validate(key, obj));

        match result.await {
            Ok(Ok(r)) => Ok(r),
            // validate error
            Ok(Err(e)) => Err(e),
            // validate timeout
            Err(_) => Ok(false),
        }
    }

    /// 借出对象
    pub(super) async fn borrow(self: &Arc<Self>, key: &K) -> Result<PooledObject<K, T, F>> {
        if self.closed() {
            return Err(FastDFSError::ClientClosed);
        }
        let semaphore = self.get_semaphore(key).await;
        // 获取许可 (如果满了,这里会等待)
        let permit = semaphore.clone().acquire_owned().await.unwrap();

        // 尝试从现有池中获取
        {
            let pools_read = self.pools.read().await;
            if let Some(queue_ref) = pools_read.get(key).cloned() {
                drop(pools_read);
                loop {
                    let mut queue_lock = queue_ref.lock().await;
                    let Some(mut pooled) = queue_lock.pop_front() else {
                        break;
                    };
                    // 及时释放锁,避免借用时检测对 idle 池长时间加锁
                    drop(queue_lock);

                    if pooled.state != PooledObjectState::Idle {
                        if pooled.state == PooledObjectState::Eviction {
                            pooled.state = PooledObjectState::EvictionReturnToHead;
                        }
                        continue;
                    }
                    let Some(mut obj) = pooled.take() else {
                        continue;
                    };
                    if self.config.test_on_borrow {
                        pooled.state = PooledObjectState::ValidationPreallocated;
                        let validate = self.validate_pooled(key, &mut pooled, Some(&mut obj)).await;
                        if !validate.unwrap_or(false) {
                            pooled.state = PooledObjectState::Invalid;
                            self.factory.destroy(key, obj).await;
                            continue;
                        }
                    }
                    pooled.state = PooledObjectState::Allocated;
                    pooled.last_borrow = Instant::now();
                    pooled.last_use = Instant::now();
                    pooled.obj = Some(obj);
                    pooled._permit = Some(permit);
                    pooled.borrowed_count = pooled.borrowed_count.wrapping_add(1);
                    return Ok(pooled);
                }
            }
        }

        // 没有可用对象,创建新的
        let obj = self.factory.create(key).await?;
        {
            let mut pools_write = self.pools.write().await;
            if !pools_write.contains_key(key) {
                pools_write.insert(
                    key.clone(),
                    Arc::new(Mutex::new(VecDeque::with_capacity(
                        self.config.max_total_per_key,
                    ))),
                );
            }
        }
        Ok(PooledObject {
            id: self.next_id.fetch_add(1, Ordering::SeqCst),
            obj: Some(obj),
            key: Some(key.clone()),
            state: PooledObjectState::Allocated,
            borrowed_count: 1,
            pool: Some(Arc::downgrade(self)),
            _permit: Some(permit),
            ..PooledObject::default()
        })
    }

    /// 归还对象
    pub(super) async fn return_object(self: &Arc<Self>, mut pooled: PooledObject<K, T, F>) {
        // 无效的、废弃的不回收
        match pooled.state {
            PooledObjectState::Eviction
            | PooledObjectState::EvictionReturnToHead
            | PooledObjectState::Invalid
            | PooledObjectState::Abandoned
            | PooledObjectState::Returning => return,
            _ => {}
        };

        let Some(obj) = pooled.take() else {
            return;
        };
        let Some(key) = pooled.key.clone() else {
            return;
        };

        pooled._permit = None;
        pooled.state = PooledObjectState::Returning;
        let new_pooled = pooled.drop_return_new();

        self.return_to_pool(new_pooled, key, obj).await;
    }

    pub(super) async fn return_to_pool(
        self: &Arc<Self>,
        mut new_pooled: PooledObject<K, T, F>,
        key: K,
        mut obj: T,
    ) {
        // 如果对象池关闭,就不再放入对象池,直接销毁
        if self.closed() {
            new_pooled.state = PooledObjectState::Abandoned;
            self.factory.destroy(&key, obj).await;
            return;
        }

        let pools_read = self.pools.read().await;
        let Some(queue_arc) = pools_read.get(&key).cloned() else {
            drop(pools_read);
            // 如果对象池里没有对应的key,说明已被移除,不必归还
            new_pooled.state = PooledObjectState::Abandoned;
            self.factory.destroy(&key, obj).await;
            return;
        };
        drop(pools_read);

        if self.config.test_on_return {
            new_pooled.state = PooledObjectState::ValidationReturnToHead;
            let validate = self
                .validate_pooled(&key, &mut new_pooled, Some(&mut obj))
                .await;
            if !validate.unwrap_or(false) {
                new_pooled.state = PooledObjectState::Invalid;
                self.factory.destroy(&key, obj).await;
                return;
            }
        }

        // 如果对象池关闭,就不再放入对象池,直接销毁
        if self.closed() {
            new_pooled.state = PooledObjectState::Abandoned;
            self.factory.destroy(&key, obj).await;
            return;
        }

        new_pooled.key = Some(key);
        new_pooled.pool = Some(Arc::downgrade(self));
        new_pooled.obj = Some(obj);
        let mut queue = queue_arc.lock().await;
        new_pooled.state = PooledObjectState::Idle;
        new_pooled.last_return = Instant::now();
        if self.config.lifo {
            queue.push_front(new_pooled);
        } else {
            queue.push_back(new_pooled);
        }
    }

    pub(super) async fn clear_key(self: &Arc<Self>, key: &K) {
        {
            let mut w_lock = self.pools.write().await;
            if let Some(pools) = w_lock.get_mut(key) {
                let mut guard = pools.lock().await;
                guard.clear();
            }
        }
        {
            let mut w_lock = self.semaphores.write().await;
            if let Some(semaphores) = w_lock.get_mut(key) {
                semaphores.close();
            }
            w_lock.insert(
                key.clone(),
                Arc::new(Semaphore::new(self.config.max_total_per_key)),
            );
        }
    }

    pub(super) async fn remove_key(self: &Arc<Self>, key: &K) {
        {
            let mut w_lock = self.pools.write().await;
            w_lock.remove(key);
        }
        {
            let mut w_lock = self.semaphores.write().await;
            w_lock.remove(key);
        }
    }

    pub(super) async fn remove_all(&self) {
        self.pools.write().await.clear();
        self.semaphores.write().await.clear();
    }

    #[inline]
    pub fn closed(&self) -> bool {
        self.closed.load(Ordering::Relaxed)
    }
}

pub struct KeyedObjectPool<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    pub(super) inner: Arc<PoolInner<K, T, F>>,
    pub(super) evictor_handle: Arc<Option<JoinHandle<()>>>,
}

#[allow(dead_code)]
impl<K, T, F> KeyedObjectPool<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    pub fn new(factory: F, config: Option<PoolConfig>) -> Self {
        let inner = PoolInner {
            pools: RwLock::new(HashMap::new()),
            semaphores: RwLock::new(HashMap::new()),
            factory: Arc::new(factory),
            config: config.unwrap_or_default(),
            closed: AtomicBool::new(false),
            next_id: AtomicU32::new(0),
        };
        let mut pool = Self {
            inner: Arc::new(inner),
            evictor_handle: Arc::new(None),
        };
        pool.start_evictor();

        pool
    }

    pub async fn borrow(&self, key: &K) -> Result<PooledObject<K, T, F>> {
        self.inner.borrow(key).await
    }

    pub async fn get(&self, key: &K) -> Result<PooledObject<K, T, F>> {
        self.inner.borrow(key).await
    }

    pub async fn return_object(&self, pooled: PooledObject<K, T, F>) {
        self.inner.return_object(pooled).await
    }

    pub async fn clear_key(&self, key: &K) {
        self.inner.clear_key(key).await
    }

    pub async fn remove_key(&self, key: &K) {
        self.inner.remove_key(key).await
    }

    pub async fn remove_all(&self) {
        self.inner.remove_all().await
    }

    pub fn close(&self) {
        self.inner.closed.store(true, Ordering::Relaxed)
    }
}

impl<K, T, F> Clone for KeyedObjectPool<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            evictor_handle: self.evictor_handle.clone(),
        }
    }
}

impl<K, T, F> Drop for KeyedObjectPool<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    fn drop(&mut self) {
        let count = Arc::strong_count(&self.evictor_handle);
        if count <= 1 {
            self.close();
        }
    }
}