1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//! # Deadpool for Redis [![Latest Version](https://img.shields.io/crates/v/deadpool-redis.svg)](https://crates.io/crates/deadpool-redis)
//!
//! Deadpool is a dead simple async pool for connections and objects
//! of any type.
//!
//! This crate implements a [`deadpool`](https://crates.io/crates/deadpool)
//! manager for [`redis`](https://crates.io/crates/redis).
//!
//! ## Features
//!
//! | Feature | Description | Extra dependencies | Default |
//! | ------- | ----------- | ------------------ | ------- |
//! | `config` | Enable support for [config](https://crates.io/crates/config) crate | `config`, `serde/derive` | yes |
//!
//! ## Example
//!
//! ```rust
//! use deadpool_redis::{cmd, Config};
//! use redis::FromRedisValue;
//!
//! #[tokio::main]
//! async fn main() {
//!     let cfg = Config::from_env("REDIS").unwrap();
//!     let pool = cfg.create_pool().unwrap();
//!     {
//!         let mut conn = pool.get().await.unwrap();
//!         cmd("SET")
//!             .arg(&["deadpool/test_key", "42"])
//!             .execute_async(&mut conn)
//!             .await.unwrap();
//!     }
//!     {
//!         let mut conn = pool.get().await.unwrap();
//!         let value: String = cmd("GET")
//!             .arg(&["deadpool/test_key"])
//!             .query_async(&mut conn)
//!             .await.unwrap();
//!         assert_eq!(value, "42".to_string());
//!     }
//! }
//! ```
//!
//! ## License
//!
//! Licensed under either of
//!
//! - Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or <http://www.apache.org/licenses/LICENSE-2.0>)
//! - MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>)
//!
//! at your option.
#![warn(missing_docs)]

use std::ops::{Deref, DerefMut};

use async_trait::async_trait;
use redis::{
    aio::Connection as RedisConnection, Client, IntoConnectionInfo, RedisError, RedisResult,
};

/// A type alias for using `deadpool::Pool` with `redis`
pub type Pool = deadpool::managed::Pool<ConnectionWrapper, RedisError>;

/// A type alias for using `deadpool::PoolError` with `redis`
pub type PoolError = deadpool::managed::PoolError<RedisError>;

/// A type alias for using `deadpool::Object` with `redis`
pub type Connection = deadpool::managed::Object<ConnectionWrapper, RedisError>;

type RecycleResult = deadpool::managed::RecycleResult<RedisError>;

mod config;
pub use config::Config;
mod cmd_wrapper;
pub use cmd_wrapper::{cmd, Cmd};
mod pipeline_wrapper;
pub use pipeline_wrapper::{pipe, Pipeline};

/// A wrapper for `redis::Connection`. The `query_async` and `execute_async`
/// functions of `redis::Cmd` and `redis::Pipeline` consume the connection.
/// This wrapper makes it possible to replace the internal connection after
/// executing a query.
pub struct ConnectionWrapper {
    conn: RedisConnection,
}

impl Deref for ConnectionWrapper {
    type Target = RedisConnection;
    fn deref(&self) -> &RedisConnection {
        &self.conn
    }
}

impl DerefMut for ConnectionWrapper {
    fn deref_mut(&mut self) -> &mut RedisConnection {
        &mut self.conn
    }
}

/// The manager for creating and recyling lapin connections
pub struct Manager {
    client: Client,
}

impl Manager {
    /// Create manager using `PgConfig` and a `TlsConnector`
    pub fn new<T: IntoConnectionInfo>(params: T) -> RedisResult<Self> {
        Ok(Self {
            client: Client::open(params)?,
        })
    }
}

#[async_trait]
impl deadpool::managed::Manager<ConnectionWrapper, RedisError> for Manager {
    async fn create(&self) -> Result<ConnectionWrapper, RedisError> {
        let conn = self.client.get_async_connection().await?;
        Ok(ConnectionWrapper { conn })
    }

    async fn recycle(&self, conn: &mut ConnectionWrapper) -> RecycleResult {
        match cmd("PING").execute_async(conn).await {
            Ok(_) => Ok(()),
            Err(e) => Err(e.into()),
        }
    }
}