redis_dumbpool/
lib.rs

1#![deny(warnings)]
2#![deny(missing_docs)]
3
4//! # Redis-Dumbpool
5//!
6//! Really dumb implementation of a Redis Connection Pool
7
8use std::ops::{Deref, DerefMut};
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11
12use redis::{Client, cmd, aio::Connection, RedisResult};
13
14use async_lock::Mutex;
15
16use tokio::time::{sleep, interval_at, Instant};
17
18use futures_util::stream::{iter, StreamExt, TryStreamExt};
19
20pub use redis;
21
22struct PoolInner {
23    client: Client,
24    max: u8,
25    sleep: u64,
26    count: u8,
27    pool: Vec<Connection>,
28}
29
30/// Redis connection pool
31#[derive(Clone)]
32pub struct Pool {
33    inner: Arc<Mutex<PoolInner>>
34}
35
36impl Pool {
37    /// Create a new Pool, pointing to a single Redis server, with minimum and maximum number of active connections,
38    /// the amount of nanoseconds to wait between retries when the connection pool is at maximum capacity
39    /// and the amount of nanoseconds between every connection keepalive (0 for no keepalive)
40    pub async fn factory(addr: &str, min: u8, max: u8, sleep: u64, refresh: u64) -> RedisResult<Self> {
41        let client = Client::open(addr)?;
42        let pool = iter(0..min.min(max))
43            .then(|_| async { client.get_async_connection().await })
44            .try_collect()
45            .await?;
46
47        let inner = Arc::new(Mutex::new(PoolInner {
48            client,
49            max,
50            sleep,
51            count: min.min(max),
52            pool,
53        }));
54
55        if refresh > 0 {
56            let inner2 = Arc::clone(&inner);
57            tokio::spawn(async move {
58                let duration = Duration::from_nanos(refresh);
59                let mut interval = interval_at(Instant::now() + duration, duration);
60                loop {
61                    interval.tick().await;
62
63                    let mut lock = inner2.lock().await;
64
65                    #[cfg(test)]
66                    log::info!("Connections keepalive ({})", lock.pool.len());
67
68                    while let Some(mut c) = lock.pool.pop() {
69                        let inner3 = Arc::clone(&inner2);
70                        tokio::spawn(async move {
71                            if cmd("PING").query_async::<_, ()>(&mut c).await.is_ok() {
72                                #[cfg(test)]
73                                log::info!("Connection alive");
74            
75                                let mut lock = inner3.lock().await;
76                                lock.pool.push(c);
77                            }
78                            else {
79                                #[cfg(test)]
80                                log::info!("Connection dead");
81            
82                                let mut lock = inner3.lock().await;
83                                lock.count -= 1;
84                            }
85                        });
86                    }
87                }
88            });
89        }
90
91        Ok(Pool {
92            inner
93        })
94    }
95
96    /// Retrieve a connection from the pool
97    /// The connection is tested before being released, if test fails a new connection is generated
98    pub async fn get_conn(&self) -> RedisResult<ConnWrapper> {
99        loop {
100            let mut lock = self.inner.lock().await;
101
102            let conn = if let Some(mut c) = lock.pool.pop() {
103                //test connection
104                if cmd("PING").query_async::<_, ()>(&mut c).await.is_err() {
105                    #[cfg(test)]
106                    log::info!("Invalid connection, re-created");
107
108                    lock.client.get_async_connection().await?
109                }
110                else {
111                    #[cfg(test)]
112                    log::info!("Recycled connection");
113
114                    c
115                }
116            }
117            else {
118                if lock.count >= lock.max {
119                    let duration = Duration::from_nanos(lock.sleep);
120                    drop(lock);
121                    sleep(duration).await;
122                    continue;
123                }
124
125                #[cfg(test)]
126                log::info!("New connection created");
127
128                lock.count += 1;
129                lock.client.get_async_connection().await?
130            };
131            drop(lock);
132
133            return Ok(ConnWrapper::new(Arc::downgrade(&self.inner), conn));
134        }
135    }
136}
137
138///Redis connection wrapper
139///Ensures the connection is returned to the pool on drop
140pub struct ConnWrapper {
141    pool: Weak<Mutex<PoolInner>>,
142    conn: Option<Connection>,
143}
144
145impl ConnWrapper {
146    fn new(pool: Weak<Mutex<PoolInner>>, conn: Connection) -> Self {
147        ConnWrapper {
148            pool,
149            conn: Some(conn),
150        }
151    }
152}
153
154impl Deref for ConnWrapper {
155    type Target = Connection;
156
157    fn deref(&self) -> &Self::Target {
158        self.conn.as_ref().unwrap()
159    }
160}
161
162impl DerefMut for ConnWrapper {
163    fn deref_mut(&mut self) -> &mut Self::Target {
164        self.conn.as_mut().unwrap()
165    }
166}
167
168impl Drop for ConnWrapper {
169    fn drop(&mut self) {
170        if let Some(pool) = self.pool.upgrade() {
171            if let Some(conn) = self.conn.take() {
172                tokio::spawn(async move {
173                    #[cfg(test)]
174                    log::info!("Connection returned to pool");
175
176                    let mut lock = pool.lock().await;
177                    lock.pool.push(conn);
178                });
179            }
180        }
181    }
182}
183
184#[cfg(test)]
185mod test {
186    use super::Pool;
187
188    use std::env;
189    use std::time::Duration;
190
191    use tokio::time::sleep;
192
193    use rand::{thread_rng, Rng};
194
195    use redis::AsyncCommands;
196
197    #[tokio::test]
198    async fn main() {
199        env_logger::init();
200
201        let pool = Pool::factory(&env::var("REDIS_SERVER").unwrap_or_else(|_| String::from("redis://127.0.0.1:6379/")), 10, 10, 1, 1000000000).await.unwrap();
202        let mut rng = thread_rng();
203        loop {
204            let mut conn = pool.get_conn().await.unwrap();
205            let wait = rng.gen_range(0..10);
206            tokio::spawn(async move {
207                sleep(Duration::from_secs(wait)).await;
208                conn.get::<_, Option<String>>(&env::var("REDIS_KEY").unwrap_or_else(|_| String::from("key"))).await.unwrap();
209            });
210        }
211    }
212}