fastdfs 0.1.1

Rust client for FastDFS distributed file system
Documentation
#![allow(dead_code)]

use crate::pool::pool::PoolInner;
use crate::pool::KeyedPoolFactory;
use std::sync::Weak;
use std::time::{Duration, Instant};
use tokio::sync::OwnedSemaphorePermit;

/// Provides all possible states of a [`PooledObject`].
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum PooledObjectState {
    /**
     * In the queue, not in use.
     */
    Idle,

    /**
     * In use.
     */
    Allocated,

    /**
     * In the queue, currently being tested for possible eviction.
     */
    Eviction,

    /**
     * Not in the queue, currently being tested for possible eviction. An attempt to borrow the object was made while
     * being tested which removed it from the queue. It should be returned to the head of the queue once eviction
     * testing completes.
     * <p>
     * TODO: Consider allocating object and ignoring the result of the eviction test.
     * </p>
     */
    EvictionReturnToHead,

    /**
     * In the queue, currently being validated.
     */
    Validation,

    /**
     * Not in queue, currently being validated. The object was borrowed while being validated and since testOnBorrow was
     * configured, it was removed from the queue and pre-allocated. It should be allocated once validation completes.
     */
    ValidationPreallocated,

    /**
     * Not in queue, currently being validated. An attempt to borrow the object was made while previously being tested
     * for eviction which removed it from the queue. It should be returned to the head of the queue once validation
     * completes.
     */
    ValidationReturnToHead,

    /**
     * Failed maintenance (e.g. eviction test or validation) and will be / has been destroyed
     */
    Invalid,

    /**
     * Deemed abandoned, to be invalidated.
     */
    Abandoned,

    /**
     * Returning to the pool.
     */
    Returning,
}

pub struct PooledObject<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    pub(super) id: u32,
    pub(super) obj: Option<T>, // 使用 Option 以便取出所有权
    pub(super) key: Option<K>,
    pub(crate) state: PooledObjectState,
    pub(super) create: Instant,
    pub(super) last_borrow: Instant,
    pub(super) last_use: Instant,
    pub(super) last_return: Instant,
    pub(super) borrowed_count: u64,
    // 持有 Pool 的引用,以便 Drop 时能找到归还的地方
    pub(super) pool: Option<Weak<PoolInner<K, T, F>>>,
    // 使用 OwnedSemaphorePermit 可以安全地在结构体中持有并在跨线程移动
    pub(super) _permit: Option<OwnedSemaphorePermit>,
}

impl<K, T, F> PooledObject<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    /// setup last_use instant
    pub fn in_use(&mut self) {
        self.last_use = Instant::now();
    }

    pub(super) fn drop_return_new(&self) -> Self {
        PooledObject {
            id: self.id,
            state: self.state,
            create: self.create,
            last_borrow: self.last_borrow,
            last_use: self.last_use,
            last_return: self.last_return,
            borrowed_count: self.borrowed_count,
            pool: None,
            _permit: None,
            obj: None,
            key: None,
        }
    }

    pub fn idle_duration(&self) -> Duration {
        self.last_return.elapsed()
    }

    pub fn create_duration(&self) -> Duration {
        self.create.elapsed()
    }

    pub fn expired(&self, max_lifetime: Duration, idle_timeout: Duration) -> bool {
        self.create.elapsed() >= max_lifetime || self.last_return.elapsed() >= idle_timeout
    }

    pub(super) fn take(&mut self) -> Option<T> {
        self.obj.take()
    }

    pub(super) fn snapshot(&mut self) -> Self {
        Self {
            id: self.id,
            state: self.state,
            create: self.create,
            last_borrow: self.last_borrow,
            last_use: self.last_use,
            last_return: self.last_return,
            borrowed_count: self.borrowed_count,
            pool: self.pool.take(),
            _permit: self._permit.take(),
            obj: self.obj.take(),
            key: self.key.take(),
        }
    }

    pub(super) fn restore(&mut self, mut that: Self) {
        self.state = that.state;
        self.pool = that.pool.take();
        self._permit = that._permit.take();
        self.obj = that.obj.take();
        self.key = that.key.take();
    }
}

impl<K, T, F> Default for PooledObject<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    fn default() -> Self {
        Self {
            id: 0,
            obj: None,
            key: None,
            state: PooledObjectState::Idle,
            create: Instant::now(),
            last_borrow: Instant::now(),
            last_use: Instant::now(),
            last_return: Instant::now(),
            borrowed_count: 0,
            pool: None,
            _permit: None,
        }
    }
}

// 实现 Deref 让使用者像使用 T 一样使用 PooledObject
impl<K, T, F> std::ops::Deref for PooledObject<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    type Target = T;
    fn deref(&self) -> &Self::Target {
        self.obj.as_ref().unwrap()
    }
}

impl<K, T, F> std::ops::DerefMut for PooledObject<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.obj.as_mut().unwrap()
    }
}

// -------------------------------------------------------------------
// 实现 Drop 自动归还
// -------------------------------------------------------------------
impl<K, T, F> Drop for PooledObject<K, T, F>
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    T: Send + Sync + 'static,
    F: KeyedPoolFactory<K, T>,
{
    fn drop(&mut self) {
        // 无效的、废弃的不回收
        match self.state {
            PooledObjectState::Eviction
            | PooledObjectState::EvictionReturnToHead
            | PooledObjectState::Invalid
            | PooledObjectState::Abandoned
            | PooledObjectState::Returning => return,
            _ => {}
        };

        // 1. 取出对象 (如果已经被手动 return 过,这里就是 None)
        let Some(obj) = self.obj.take() else {
            return;
        };
        let Some(key) = self.key.take() else {
            return;
        };
        let Some(pool) = self.pool.take() else {
            return;
        };

        self._permit = None;
        self.state = PooledObjectState::Returning;
        let new_pooled = self.drop_return_new();

        // 2. 启动一个后台任务来处理归还逻辑
        // 因为 Drop 是同步的,不能 await,所以必须 spawn
        tokio::spawn(async move {
            let Some(pool) = pool.upgrade() else {
                return;
            };
            // 这里的逻辑与之前的 return_object 一致
            pool.return_to_pool(new_pooled, key, obj).await;
        });
    }
}