reool 0.13.0

An asynchrounous connection pool for Redis based on tokio and redis-rs
Documentation
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use futures::future;
use log::{debug, warn};

use crate::connection_factory::ConnectionFactory;
use crate::error::{CheckoutError, CheckoutErrorKind};
use crate::error::{InitializationError, InitializationResult};
use crate::executor_flavour::ExecutorFlavour;
use crate::instrumentation::Instrumentation;
use crate::multi_node_pool::instrumentation::*;
use crate::pool_internal::{CheckoutManaged, Config as PoolConfig, PoolInternal};
use crate::stats::PoolStats;
use crate::{Checkout, Poolable};

use super::Config;

pub struct Inner<T: Poolable> {
    count: AtomicUsize,
    pools: Vec<PoolInternal<T>>,
    checkout_timeout: Option<Duration>,
}

impl<T: Poolable + redis::r#async::ConnectionLike> Inner<T> {
    pub(crate) fn new<I, F>(
        config: Config,
        connection_factories: Vec<F>,
        executor_flavour: ExecutorFlavour,
        instrumentation: Option<I>,
    ) -> InitializationResult<Self>
    where
        I: Instrumentation + Send + Sync + 'static,
        F: ConnectionFactory<Connection = T> + Send + Sync + 'static,
    {
        let mut pools = Vec::new();

        let instrumentation_aggregator = instrumentation
            .map(InstrumentationAggregator::new)
            .map(Arc::new);

        let mut pool_idx = 0;
        for connection_factory in connection_factories {
            let pool_conf = PoolConfig {
                desired_pool_size: config.desired_pool_size,
                backoff_strategy: config.backoff_strategy,
                reservation_limit: config.reservation_limit,
                stats_interval: config.stats_interval,
                activation_order: config.activation_order,
            };

            let indexed_instrumentation = instrumentation_aggregator.as_ref().map(|agg| {
                let instr = IndexedInstrumentation::new(agg.clone(), pool_idx);
                agg.add_new_pool();
                instr
            });

            let pool = PoolInternal::new(
                pool_conf,
                connection_factory,
                executor_flavour.clone(),
                indexed_instrumentation,
            );

            pools.push(pool);

            pool_idx += 1;
        }

        if pools.len() < config.min_required_nodes {
            return Err(InitializationError::message_only(format!(
                "the minimum required nodes is {} but there are only {}",
                config.min_required_nodes,
                pools.len()
            )));
        } else if pools.is_empty() {
            warn!("no nodes is allowed and there are no nodes.");
        }

        debug!("replica set has {} nodes", pools.len());

        Ok(Self {
            pools,
            count: AtomicUsize::new(0),
            checkout_timeout: config.checkout_timeout,
        })
    }

    pub fn check_out(&self) -> Checkout<T> {
        self.check_out_explicit_timeout(self.checkout_timeout)
    }

    pub fn check_out_explicit_timeout(&self, timeout: Option<Duration>) -> Checkout<T> {
        if self.pools.is_empty() {
            Checkout(CheckoutManaged::new(future::err(CheckoutError::new(
                CheckoutErrorKind::NoPool,
            ))))
        } else {
            let count = self.count.fetch_add(1, Ordering::SeqCst);

            let idx = count % self.pools.len();

            Checkout(self.pools[idx].check_out(timeout))
        }
    }

    pub fn stats(&self) -> Vec<PoolStats> {
        self.pools.iter().map(PoolInternal::stats).collect()
    }

    pub fn trigger_stats(&self) {
        self.pools.iter().for_each(PoolInternal::trigger_stats)
    }
}