fast_pool 1.0.4

The Fast Pool based on channel
Documentation
use crate::duration::AtomicDuration;
use crate::guard::ConnectionGuard;
use crate::state::State;
use crate::Manager;
use crossfire::compat::mpmc::unbounded_async;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

/// Connection pool with configurable manager and lifecycle management
pub struct Pool<M: Manager> {
    pub manager: Arc<M>,
    pub idle_send: Arc<crossfire::compat::MTx<M::Connection>>,
    pub idle_recv: Arc<crossfire::compat::MAsyncRx<M::Connection>>,
    /// Maximum open connections (default: 32)
    pub max_open: Arc<AtomicU64>,
    /// Maximum idle connections (default: same as max_open)
    pub max_idle: Arc<AtomicU64>,
    pub(crate) in_use: Arc<AtomicU64>,
    pub(crate) waits: Arc<AtomicU64>,
    pub(crate) connecting: Arc<AtomicU64>,
    pub(crate) checking: Arc<AtomicU64>,
    pub(crate) connections: Arc<AtomicU64>,
    /// Connection check timeout (default: 10s)
    pub timeout_check: Arc<AtomicDuration>,
    /// Connection check timeout (default: 30s)
    pub timeout_wait: Arc<AtomicDuration>,
}

impl<M: Manager> Debug for Pool<M> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        let state = self.state();
        Debug::fmt(&state, f)
    }
}

impl<M: Manager> Clone for Pool<M> {
    fn clone(&self) -> Self {
        Self {
            manager: self.manager.clone(),
            idle_send: self.idle_send.clone(),
            idle_recv: self.idle_recv.clone(),
            max_open: self.max_open.clone(),
            max_idle: self.max_idle.clone(),
            in_use: self.in_use.clone(),
            waits: self.waits.clone(),
            connecting: self.connecting.clone(),
            checking: self.checking.clone(),
            connections: self.connections.clone(),
            timeout_check: self.timeout_check.clone(),
            timeout_wait: self.timeout_wait.clone(),
        }
    }
}

impl<M: Manager> Pool<M> {
    /// Create new connection pool with default settings
    pub fn new(m: M) -> Self
    where
        M::Connection: Unpin + Send,
    {
        let (s, r) = unbounded_async();
        let max_open = 32;
        Self {
            manager: Arc::new(m),
            idle_send: Arc::new(s),
            idle_recv: Arc::new(r),
            max_open: Arc::new(AtomicU64::new(max_open)),
            max_idle: Arc::new(AtomicU64::new(max_open)),
            in_use: Arc::new(AtomicU64::new(0)),
            waits: Arc::new(AtomicU64::new(0)),
            connecting: Arc::new(AtomicU64::new(0)),
            checking: Arc::new(AtomicU64::new(0)),
            connections: Arc::new(AtomicU64::new(0)),
            timeout_check: Arc::new(AtomicDuration::new(Some(Duration::from_secs(10)))),
            timeout_wait: Arc::new(AtomicDuration::new(Some(Duration::from_secs(30)))),
        }
    }

    /// Get connection from pool (blocks until available)
    pub async fn get(&self) -> Result<ConnectionGuard<M>, M::Error> {
        self.get_timeout(None).await
    }

    /// Get connection with optional timeout
    pub async fn get_timeout(&self, d: Option<Duration>) -> Result<ConnectionGuard<M>, M::Error> {
        self.waits.fetch_add(1, Ordering::SeqCst);
        defer!(|| {
            self.waits.fetch_sub(1, Ordering::SeqCst);
        });
        let f = async {
            let v: Result<ConnectionGuard<M>, M::Error> = loop {
                let connections = self.connections.load(Ordering::SeqCst)
                    + self.connecting.load(Ordering::SeqCst);
                if connections < self.max_open.load(Ordering::SeqCst) {
                    //Use In_use placeholder when create connection
                    self.connecting.fetch_add(1, Ordering::SeqCst);
                    defer!(|| {
                        self.connecting.fetch_sub(1, Ordering::SeqCst);
                    });
                    //create connection,this can limit max idle,current now max idle = max_open
                    let conn = self.manager.connect().await?;
                    self.idle_send
                        .send(conn)
                        .map_err(|e| M::Error::from(&e.to_string()))?;
                    self.connections.fetch_add(1, Ordering::SeqCst);
                }
                let conn = self
                    .idle_recv
                    .recv()
                    .await
                    .map_err(|e| M::Error::from(&e.to_string()))?;

                let mut guard = ConnectionGuard::new(conn, self.clone());
                guard.set_checked(false);
                //check connection
                self.checking.fetch_add(1, Ordering::SeqCst);
                defer!(|| {
                    self.checking.fetch_sub(1, Ordering::SeqCst);
                });
                let check_result = tokio::time::timeout(
                    self.timeout_check.get().unwrap_or_default(),
                    self.manager.check(&mut guard),
                )
                .await
                .map_err(|e| M::Error::from(&format!("check_timeout={}", e)))?;
                match check_result {
                    Ok(_) => {
                        guard.set_checked(true);
                        break Ok(guard);
                    }
                    Err(_e) => {
                        drop(guard);
                        tokio::task::yield_now().await;
                        continue;
                    }
                }
            };
            v
        };
        let conn = {
           tokio::time::timeout(d.unwrap_or_else(|| self.timeout_wait.get().unwrap_or_default()), f)
                    .await
                    .map_err(|_e| M::Error::from("get_timeout"))??
        };
        Ok(conn)
    }

    /// Get current pool state
    pub fn state(&self) -> State {
        State {
            max_open: self.max_open.load(Ordering::Relaxed),
            connections: self.connections.load(Ordering::Relaxed),
            in_use: self.in_use.load(Ordering::SeqCst),
            idle: self.idle_send.len() as u64,
            waits: self.waits.load(Ordering::SeqCst),
            connecting: self.connecting.load(Ordering::SeqCst),
            checking: self.checking.load(Ordering::SeqCst),
        }
    }

    /// Set maximum open connections
    pub fn set_max_open(&self, n: u64) {
        if n == 0 {
            return;
        }
        self.max_open.store(n, Ordering::SeqCst);
        // Ensure max_idle does not exceed max_open
        let current_max_idle = self.max_idle.load(Ordering::SeqCst);
        if current_max_idle > n {
            self.max_idle.store(n, Ordering::SeqCst);
        }
        loop {
            if self.idle_send.len() > n as usize {
                _ = self.idle_recv.try_recv();
                if self.connections.load(Ordering::SeqCst) > 0 {
                    self.connections.fetch_sub(1, Ordering::SeqCst);
                }
            } else {
                break;
            }
        }
    }

    pub fn get_max_open(&self) -> u64 {
        self.max_open.load(Ordering::SeqCst)
    }

    /// Set maximum number of idle connections
    pub fn set_max_idle_conns(&self, n: u64) {
        self.max_idle.store(n, Ordering::SeqCst);
        // Clean up excess idle connections
        while self.idle_send.len() > n as usize {
            _ = self.idle_recv.try_recv();
            if self.connections.load(Ordering::SeqCst) > 0 {
                self.connections.fetch_sub(1, Ordering::SeqCst);
            }
        }
    }

    /// Get maximum number of idle connections
    pub fn get_max_idle_conns(&self) -> u64 {
        self.max_idle.load(Ordering::SeqCst)
    }

    pub fn recycle(&self, arg: M::Connection) {
        self.in_use.fetch_sub(1, Ordering::SeqCst);
        if self.idle_send.len() < self.max_idle.load(Ordering::SeqCst) as usize {
            _ = self.idle_send.send(arg);
        } else {
            if self.connections.load(Ordering::SeqCst) > 0 {
                self.connections.fetch_sub(1, Ordering::SeqCst);
            }
        }
    }

    /// Set the timeout for checking connections in the pool.
    pub fn set_timeout_check(&self, duration: Option<Duration>) {
        self.timeout_check.store(duration);
    }

    /// Set the timeout for checking connections in the pool.
    pub fn get_timeout_check(&self) -> Option<Duration> {
        self.timeout_check.get()
    }

    pub fn set_timeout_wait(&self, duration: Option<Duration>) {
        self.timeout_wait.store(duration);
    }

    pub fn get_timeout_wait(&self) -> Option<Duration> {
        self.timeout_wait.get()
    }

    /// Downcast the manager to a concrete type.
    ///
    /// This function attempts to downcast the Arc<M> to a specific concrete type.
    /// It's useful when you need to access the underlying manager's specific methods.
    ///
    /// # Example
    /// ```no_run
    /// use fast_pool::{Manager, Pool};
    /// use fast_pool::plugin::{DurationManager, CheckMode};
    /// use std::time::Duration;
    ///
    /// struct MyManager;
    ///
    /// impl Manager for MyManager {
    ///     type Connection = ();
    ///     type Error = String;
    ///
    ///     async fn connect(&self) -> Result<Self::Connection, Self::Error> {
    ///         Ok(())
    ///     }
    ///
    ///     async fn check(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
    ///         Ok(())
    ///     }
    /// }
    ///
    /// let duration_manager = DurationManager::new(MyManager, CheckMode::SkipInterval(Duration::from_secs(30)));
    /// let pool = Pool::new(duration_manager);
    ///
    /// // Downcast to access DurationManager specific methods
    /// if let Some(duration_manager) = pool.downcast_manager::<DurationManager<MyManager>>() {
    ///     // Now you can access DurationManager-specific fields
    ///     let mode = duration_manager.mode.get_mode();
    /// }
    /// ```
    pub fn downcast_manager<T>(&self) -> Option<&T>
    where
        T: Manager,
    {
        // Get a reference to the manager as &dyn Any
        let any_ref = self.manager.as_ref() as &dyn std::any::Any;
        // Try to downcast to &T
        let t = any_ref.downcast_ref::<T>()?;
        Some(t)
    }
}