1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//! Deadpool simple async pool for AMQP connections.
//!
//! This crate implements a [`deadpool`](https://crates.io/crates/deadpool)
//! manager for [`lapin`](https://crates.io/crates/lapin).
//!
//! You should not need to use `deadpool` directly. Use the `Pool` type
//! provided by this crate instead.
//!
//! # Example
//!
//! ```rust
//! use std::env;
//!
//! use deadpool_redis::{Manager, Pool};
//! use futures::compat::Future01CompatExt;
//! use redis::FromRedisValue;
//!
//! #[tokio::main]
//! async fn main() {
//!     let mgr = Manager::new("redis://127.0.0.1/").unwrap();
//!     let pool = Pool::new(mgr, 16);
//!     {
//!         let mut conn = pool.get().await.unwrap();
//!         let mut cmd = redis::cmd("SET");
//!         cmd.arg(&["deadpool/test_key", "42"]);
//!         conn.query(&cmd).await.unwrap();
//!     }
//!     {
//!         let mut conn = pool.get().await.unwrap();
//!         let mut cmd = redis::cmd("GET");
//!         cmd.arg(&["deadpool/test_key"]);
//!         let value = conn.query(&cmd).await.unwrap();
//!         assert_eq!(String::from_redis_value(&value).unwrap(), "42".to_string());
//!     }
//! }
//! ```
#![warn(missing_docs)]

use async_trait::async_trait;
use futures::compat::Future01CompatExt;
use redis::{
    aio::Connection as RedisConnection,
    Client,
    IntoConnectionInfo,
    RedisError,
    RedisResult,
};

/// A type alias for using `deadpool::Pool` with `redis`
pub type Pool = deadpool::Pool<Connection, RedisError>;

/// A type alias for using `deadpool::Object` with `redis`
pub struct Connection {
    conn: Option<RedisConnection>
}

impl Connection {
    pub async fn query(&mut self, cmd: &redis::Cmd) -> RedisResult<redis::Value>
    {
        // FIXME how to handle if the connection is no longer part of this
        let conn = self.conn.take().unwrap();
        let (conn, result) = cmd.query_async(conn).compat().await?;
        self.conn.replace(conn);
        Ok(result)
    }
}


/// The manager for creating and recyling lapin connections
pub struct Manager {
    client: Client,
}

impl Manager {
    /// Create manager using `PgConfig` and a `TlsConnector`
    pub fn new<T: IntoConnectionInfo>(params: T) -> RedisResult<Self> {
        Ok(Self {
            client: Client::open(params)?
        })
    }
}

#[async_trait]
impl deadpool::Manager<Connection, RedisError> for Manager
{
    async fn create(&self) -> Result<Connection, RedisError> {
        let conn = self.client.get_async_connection().compat().await?;
        Ok(Connection { conn: Some(conn) })
    }
    async fn recycle(&self, conn: Connection) -> Result<Connection, RedisError> {
        if conn.conn.is_some() {
            Ok(conn)
        } else {
            self.create().await
        }
    }
}