1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
//! # Deadpool manager for asynchronous Redis connections
//!
//! redis-async-pool implements a deadpool manager for asynchronous
//! connections of the [redis crate](https://crates.io/crates/redis). Connections returned by  
//! the pool can be used as regular `redis::aio::Connection`.
//!
//! ## Foreword
//!
//! You may not need of a pool of async connections to Redis. Depending on your
//! workload, a multiplexed connection will be way faster. Using the [`ConnectionManager`](https://docs.rs/redis/0.17.0/redis/aio/struct.ConnectionManager.html)
//! provided by the redis crate, you can achieve very high performances without pooling
//! connections.
//!
//! ## Features
//! - runtime agnostic (tested with tokio & async-std)
//! - optional check of connection on recycle
//! - optional ttl on connections
//!
//! ## Example
//!
//! ```rust
//! use redis::AsyncCommands;
//! use redis_async_pool::{RedisConnectionManager, RedisPool};
//!
//! // Create a pool of maximum 5 connections, checked on reuse without ttl.
//!
//! let pool = RedisPool::new(
//!     RedisConnectionManager::new(redis::Client::open("redis://localhost:6379")?, true, None),
//!     5,
//! );
//!
//! // get a connection with the get() async method and use it as regular redis connection
//! let mut con = pool.get().await?;
//! con.set(b"key", b"value").await?;
//! let value: Vec<u8> = con.get(b"key").await?;
//! assert_eq!(value, b"value");
//! ```
//!
//! You can set a ttl for each created connection by the pool,
//! this helps avoiding huge memory consumption when keeping many connections
//! open during a too long time.

use std::{
    ops::{Deref, DerefMut},
    time::{Duration, Instant},
};

use async_trait::async_trait;
use deadpool::managed::RecycleError;
use rand::Rng;
use redis::AsyncCommands;

pub use deadpool;

/// The redis connection pool
///
/// Use the `new` method to create a new pool. You can find
/// more information in the documentation of the `deadpool` crate.
pub type RedisPool = deadpool::managed::Pool<RedisConnection, redis::RedisError>;

/// Time to live of a connection
pub enum Ttl {
    /// Connection will expire after the given duration
    Simple(Duration),
    /// Connection will expire after at least `min` time and at most
    /// `min + fuzz` time.
    ///
    /// Actual ttl is computed at connection creation by adding `min` duration to
    /// a random duration between 0 and `fuzz`.
    Fuzzy { min: Duration, fuzz: Duration },
    /// The connection will not been reused. A new connection will be created
    /// for each `get()` on the pool.
    ///
    /// Enabling Once ttl means the pool will not keep any connection opened.
    /// So it won't really act as a pool of connection.
    Once,
}

/// Manages creation and destruction of redis connections.
///
pub struct RedisConnectionManager {
    client: redis::Client,
    check_on_recycle: bool,
    connection_ttl: Option<Ttl>,
}

impl RedisConnectionManager {
    /// Create a new connection mananager.
    ///
    /// If `check_on_recycle` is true, before each connection reuse, an `exists` command
    /// is issued, if it fails to complete, the connection is dropped and a fresh connection
    /// is created.
    ///
    /// If `connection_ttl` is set, the connection will be recreated after the given duration.
    pub fn new(client: redis::Client, check_on_recycle: bool, connection_ttl: Option<Ttl>) -> Self {
        Self {
            client,
            check_on_recycle,
            connection_ttl,
        }
    }
}

#[async_trait]
impl deadpool::managed::Manager<RedisConnection, redis::RedisError> for RedisConnectionManager {
    async fn create(&self) -> Result<RedisConnection, redis::RedisError> {
        Ok(RedisConnection {
            actual: self.client.get_async_connection().await?,
            expires_at: self
                .connection_ttl
                .as_ref()
                .map(|max_duration| match max_duration {
                    Ttl::Simple(ttl) => Instant::now() + *ttl,
                    Ttl::Fuzzy { min, fuzz } => {
                        Instant::now()
                            + *min
                            + Duration::from_secs_f64(
                                rand::thread_rng().gen_range(0.0, fuzz.as_secs_f64()),
                            )
                    }
                    // already expired ;)
                    Ttl::Once => Instant::now(),
                }),
        })
    }
    async fn recycle(
        &self,
        conn: &mut RedisConnection,
    ) -> deadpool::managed::RecycleResult<redis::RedisError> {
        if self.check_on_recycle {
            let _r: bool = conn.exists(b"key").await?;
        }
        match &conn.expires_at {
            // check if connection is expired
            Some(expires_at) => {
                if &Instant::now() >= expires_at {
                    Err(RecycleError::Message("Connection expired".to_string()))
                } else {
                    Ok(())
                }
            }
            // no expire on connections
            None => Ok(()),
        }
    }
}

/// The connection created by the pool manager.
///
/// It is Deref & DerefMut to `redis::aio::Connection` so it can be used
/// like a regular Redis asynchronous connection.
pub struct RedisConnection {
    actual: redis::aio::Connection,
    expires_at: Option<Instant>,
}

// Impl Deref & DefrefMut so the RedisConnection can be used as the real
// redis::aio::Connection

impl Deref for RedisConnection {
    type Target = redis::aio::Connection;
    fn deref(&self) -> &Self::Target {
        &self.actual
    }
}

impl DerefMut for RedisConnection {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.actual
    }
}

impl AsMut<redis::aio::Connection> for RedisConnection {
    fn as_mut(&mut self) -> &mut redis::aio::Connection {
        &mut self.actual
    }
}

impl AsRef<redis::aio::Connection> for RedisConnection {
    fn as_ref(&self) -> &redis::aio::Connection {
        &self.actual
    }
}