fastdfs 0.1.1

Rust client for FastDFS distributed file system
Documentation
use super::pool::PoolInner;
use super::{KeyedObjectPool, KeyedPoolFactory};
use crate::pool::pooled_object::PooledObjectState;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::MissedTickBehavior;

fn zero2max(dur: Duration) -> Duration {
    if dur.is_zero() {
        return Duration::MAX;
    }
    dur
}

impl<K, T, F> KeyedObjectPool<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    async fn evictor(inner: Arc<PoolInner<K, T, F>>) {
        let time_between = inner.config.time_between_eviction_runs;
        if !inner.config.test_while_idle || time_between.is_zero() {
            return;
        };
        let min_idle_duration = inner.config.min_evictable_idle_duration;
        let max_lifetime = zero2max(inner.config.max_lifetime);
        let idle_timeout = zero2max(inner.config.idle_timeout);

        let mut timer = tokio::time::interval(time_between);
        timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
        loop {
            if inner.closed() {
                break;
            }
            // sleep between runs of the idle object evictor thread
            timer.tick().await;
            if inner.closed() {
                break;
            }

            // 取出待检测的 key
            let keys = {
                let pools_read = inner.pools.read().await;
                pools_read.keys().cloned().collect::<Vec<_>>()
            };
            for key in keys {
                let pool = {
                    let pools_read = inner.pools.read().await;
                    pools_read.get(&key).cloned()
                };
                let Some(pool) = pool else {
                    continue;
                };
                // 取出待检测的对象 id
                let ids = {
                    let guard = pool.lock().await;
                    let iter = guard.iter().map(|pooled| pooled.id);
                    if inner.config.lifo {
                        iter.rev().collect::<Vec<_>>()
                    } else {
                        iter.collect::<Vec<_>>()
                    }
                };
                for id in ids {
                    // 通过快照的方式拿到 idle 池的对象,避免检测过程中长时间对整个池加锁。
                    // 池中原来所在位置设为空,待检测完成后还原到所在位置
                    let mut snapshot = {
                        let mut guard = pool.lock().await;
                        let Some(pooled) = guard.iter_mut().find(|p| {
                            p.id == id
                                && p.state == PooledObjectState::Idle
                                && (p.idle_duration() >= min_idle_duration
                                    || p.expired(max_lifetime, idle_timeout))
                        }) else {
                            continue;
                        };
                        pooled.state = PooledObjectState::Eviction;
                        pooled.snapshot()
                    };
                    let expired = snapshot.expired(max_lifetime, idle_timeout);

                    if expired {
                        snapshot.state = PooledObjectState::Abandoned;
                        if let Some(obj) = snapshot.take() {
                            inner.factory.destroy(&key, obj).await;
                        };
                        continue;
                    }

                    snapshot.state = PooledObjectState::Validation;
                    let validate = inner.validate_pooled(&key, &mut snapshot, None).await;
                    if !validate.unwrap_or(false) {
                        snapshot.state = PooledObjectState::Invalid;
                        if let Some(obj) = snapshot.take() {
                            inner.factory.destroy(&key, obj).await;
                        };
                        continue;
                    }

                    // 通过对象 id 还原到 idle 池中的位置,如果位置不存在(可能检测过程中借出)就推到池顶 优先下次借用
                    snapshot.state = PooledObjectState::Idle;
                    {
                        let mut guard = pool.lock().await;
                        match guard.iter_mut().find(|p| p.id == id) {
                            Some(pooled) => pooled.restore(snapshot),
                            // EvictionReturnToHead
                            None => guard.push_front(snapshot),
                        }
                    }
                }
            }
        }

        // 循环结束,说明对象池关闭,清空所有对象
        if inner.closed() {
            inner.remove_all().await;
        }
    }

    pub(super) fn start_evictor(&mut self) {
        let inner = self.inner.clone();
        let handle = tokio::spawn(async move {
            Self::evictor(inner).await;
        });

        self.evictor_handle = Arc::new(Some(handle));
    }
}