1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
//! # Deadpool manager for asynchronous Redis connections //! //! redis-async-pool implements a deadpool manager for asynchronous //! connections of the [redis crate](https://crates.io/crates/redis). Pooled connections can be used //! as regular `redis::aio::Connection`. //! //! ## Features //! - runtime agnostic (tested with tokio or async-std) //! - optional check of connection on recycle //! - optional ttl on connections //! //! ## Example //! //! ```rust //! use redis::AsyncCommands; //! use redis_async_pool::{RedisConnectionManager, RedisPool}; //! //! // Create a pool of maximum 5, checked on reuse without ttl. //! let pool = RedisPool::new( //! RedisConnectionManager::new(redis::Client::open("redis://localhost:6379")?, true, None), //! 5, //! ); //! //! // get a connection with the get() asyncc method and use it as regular redis connection //! let mut con = pool.get().await?; //! con.set(b"key", b"value").await?; //! let value: Vec<u8> = con.get(b"key").await?; //! assert_eq!(value, b"value"); //! ``` //! //! You can set a ttl for each created connection by the pool, //! this helps avoiding huge memory consumption when keeping many connections //! open during a too long time. use std::{ ops::{Deref, DerefMut}, time::{Duration, Instant}, }; use async_trait::async_trait; use deadpool::managed::RecycleError; use redis::AsyncCommands; pub use deadpool; /// The redis connection pool /// /// Use the `new` method to create a new pool. You can find /// more information in the documentation of the `deadpool` crate. pub type RedisPool = deadpool::managed::Pool<RedisConnection, redis::RedisError>; /// Manages creation and destruction of redis connections. /// pub struct RedisConnectionManager { client: redis::Client, check_on_recycle: bool, connection_ttl: Option<Duration>, } impl RedisConnectionManager { /// Create a new connection mananager. /// /// If `check_on_recycle` is true, before each connection reuse, an `exists` command /// is issued, if it fails to complete, the connection is dropped and a fresh connection. /// will be created. /// /// If `connection_ttl` is set, the connection will be recreated after the given duration. pub fn new( client: redis::Client, check_on_recycle: bool, connection_ttl: Option<Duration>, ) -> Self { Self { client, check_on_recycle, connection_ttl, } } } #[async_trait] impl deadpool::managed::Manager<RedisConnection, redis::RedisError> for RedisConnectionManager { async fn create(&self) -> Result<RedisConnection, redis::RedisError> { Ok(RedisConnection { actual: self.client.get_async_connection().await?, expires_at: self .connection_ttl .as_ref() .map(|max_duration| Instant::now() + *max_duration), }) } async fn recycle( &self, conn: &mut RedisConnection, ) -> deadpool::managed::RecycleResult<redis::RedisError> { if self.check_on_recycle { let _r: bool = conn.exists(b"key").await?; } match &conn.expires_at { // check if connection is expired Some(expires_at) => { if &Instant::now() >= expires_at { Err(RecycleError::Message("Connection expired".to_string())) } else { Ok(()) } } // no expire on connections None => Ok(()), } } } /// The connection created by the pool manager. /// /// It is Deref & DerefMut to `redis::aio::Connection` so it can be used /// like a regular Redis asynchronous connection. pub struct RedisConnection { actual: redis::aio::Connection, expires_at: Option<Instant>, } // Impl Deref & DefrefMut so the RedisConnection can be used as the real // redis::aio::Connection impl Deref for RedisConnection { type Target = redis::aio::Connection; fn deref(&self) -> &Self::Target { &self.actual } } impl DerefMut for RedisConnection { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.actual } }