reool 0.13.0

An asynchrounous connection pool for Redis based on tokio and redis-rs
Documentation
use std::sync::Arc;
use std::time::Duration;

use crate::stats::{MinMax, PoolStats};
use parking_lot::Mutex;

use crate::instrumentation::Instrumentation;

pub struct InstrumentationAggregator<I> {
    outbound: I,
    tracking: Mutex<Tracking>,
}

struct Tracking {
    pool_size: ValueTracker,
    idle: ValueTracker,
    in_flight: ValueTracker,
    reservations: ValueTracker,
}

impl<I> InstrumentationAggregator<I>
where
    I: Instrumentation + Send + Sync,
{
    pub fn new(instrumentation: I) -> Self {
        InstrumentationAggregator {
            outbound: instrumentation,
            tracking: Mutex::new(Tracking {
                pool_size: ValueTracker::default(),
                idle: ValueTracker::default(),
                in_flight: ValueTracker::default(),
                reservations: ValueTracker::default(),
            }),
        }
    }

    pub fn add_new_pool(&self) {
        let mut tracking = self.tracking.lock();
        tracking.pool_size.add_pool();
        tracking.idle.add_pool();
        tracking.in_flight.add_pool();
        tracking.reservations.add_pool();
    }
}

impl<I> InstrumentationAggregator<I>
where
    I: Instrumentation,
{
    fn checked_out_connection(&self, idle_for: Duration, _pool_idx: usize) {
        self.outbound.checked_out_connection(idle_for)
    }
    fn checked_in_returned_connection(&self, flight_time: Duration, _pool_idx: usize) {
        self.outbound.checked_in_returned_connection(flight_time)
    }
    fn checked_in_new_connection(&self, _pool_idx: usize) {
        self.outbound.checked_in_new_connection()
    }
    fn connection_dropped(&self, flight_time: Duration, lifetime: Duration, _pool_idx: usize) {
        self.outbound.connection_dropped(flight_time, lifetime)
    }
    fn connection_created(
        &self,
        connected_after: Duration,
        total_time: Duration,
        _pool_idx: usize,
    ) {
        self.outbound
            .connection_created(connected_after, total_time)
    }
    fn connection_killed(&self, lifetime: Duration, _pool_idx: usize) {
        self.outbound.connection_killed(lifetime)
    }
    fn reservation_added(&self, _pool_idx: usize) {
        self.outbound.reservation_added()
    }
    fn reservation_fulfilled(&self, after: Duration, _pool_idx: usize) {
        self.outbound.reservation_fulfilled(after)
    }
    fn reservation_not_fulfilled(&self, after: Duration, _pool_idx: usize) {
        self.outbound.reservation_not_fulfilled(after)
    }
    fn reservation_limit_reached(&self, _pool_idx: usize) {
        self.outbound.reservation_limit_reached()
    }
    fn connection_factory_failed(&self, _pool_idx: usize) {
        self.outbound.connection_factory_failed()
    }

    fn stats(&self, stats: PoolStats, pool_idx: usize) {
        let mut tracking = self.tracking.lock();
        let pool_size = tracking.pool_size.update(pool_idx, stats.pool_size);
        let idle = tracking.idle.update(pool_idx, stats.idle);
        let reservations = tracking.reservations.update(pool_idx, stats.reservations);
        let in_flight = tracking.in_flight.update(pool_idx, stats.in_flight);
        let node_count = tracking.pool_size.node_count();
        drop(tracking);

        let stats = PoolStats {
            pool_size,
            reservations,
            idle,
            in_flight,
            node_count,
        };

        self.outbound.stats(stats)
    }
}

struct ValueTracker {
    pool_values: Vec<MinMax>,
}

impl ValueTracker {
    pub fn add_pool(&mut self) {
        self.pool_values.push(MinMax::default())
    }

    pub fn update(&mut self, idx: usize, v: MinMax) -> MinMax {
        self.pool_values[idx] = v;
        let curr_min = self.pool_values.iter().map(MinMax::min).min().unwrap_or(0);
        let curr_max = self.pool_values.iter().map(MinMax::max).max().unwrap_or(0);
        MinMax(curr_min, curr_max)
    }

    pub fn node_count(&self) -> usize {
        self.pool_values.len()
    }
}

impl Default for ValueTracker {
    fn default() -> Self {
        Self {
            pool_values: Vec::new(),
        }
    }
}

pub struct IndexedInstrumentation<I> {
    index: usize,
    aggregator: Arc<InstrumentationAggregator<I>>,
}

impl<I> IndexedInstrumentation<I>
where
    I: Instrumentation,
{
    pub fn new(aggregator: Arc<InstrumentationAggregator<I>>, index: usize) -> Self {
        Self { index, aggregator }
    }
}

impl<I> Instrumentation for IndexedInstrumentation<I>
where
    I: Instrumentation,
{
    fn checked_out_connection(&self, idle_for: Duration) {
        self.aggregator.checked_out_connection(idle_for, self.index)
    }
    fn checked_in_returned_connection(&self, flight_time: Duration) {
        self.aggregator
            .checked_in_returned_connection(flight_time, self.index)
    }
    fn checked_in_new_connection(&self) {
        self.aggregator.checked_in_new_connection(self.index)
    }
    fn connection_dropped(&self, flight_time: Duration, lifetime: Duration) {
        self.aggregator
            .connection_dropped(flight_time, lifetime, self.index)
    }
    fn connection_created(&self, connected_after: Duration, total_time: Duration) {
        self.aggregator
            .connection_created(connected_after, total_time, self.index)
    }
    fn connection_killed(&self, lifetime: Duration) {
        self.aggregator.connection_killed(lifetime, self.index)
    }
    fn reservation_added(&self) {
        self.aggregator.reservation_added(self.index)
    }
    fn reservation_fulfilled(&self, after: Duration) {
        self.aggregator.reservation_fulfilled(after, self.index)
    }
    fn reservation_not_fulfilled(&self, after: Duration) {
        self.aggregator.reservation_not_fulfilled(after, self.index)
    }
    fn reservation_limit_reached(&self) {
        self.aggregator.reservation_limit_reached(self.index)
    }
    fn connection_factory_failed(&self) {
        self.aggregator.connection_factory_failed(self.index)
    }
    fn stats(&self, stats: PoolStats) {
        self.aggregator.stats(stats, self.index)
    }
}