poolx_redis/
lib.rs

1use std::io::ErrorKind;
2use std::ops::{Deref, DerefMut};
3use std::str::FromStr;
4
5use futures_core::future::BoxFuture;
6use redis::aio::ConnectionLike;
7use redis::{Client, Cmd, Pipeline, RedisFuture, Value};
8
9use poolx::{Connection, ConnectOptions, futures_core, url};
10use poolx::url::Url;
11
12#[derive(Debug, Clone)]
13pub struct RedisConnectionOption {
14    url: Url,
15    client: Client,
16}
17
18impl RedisConnectionOption {}
19
20impl FromStr for RedisConnectionOption {
21    type Err = poolx::Error;
22
23    fn from_str(s: &str) -> Result<Self, Self::Err> {
24        let url = s.parse::<Url>().map_err(|e| poolx::Error::Configuration(Box::new(e)))?;
25        Self::from_url(&url)
26    }
27}
28
29impl ConnectOptions for RedisConnectionOption {
30    type Connection = RedisConnection;
31
32    fn from_url(url: &url::Url) -> Result<Self, poolx::Error> {
33        let client = Client::open(url.clone()).map_err(|e| poolx::Error::Configuration(Box::new(e)))?;
34        Ok(Self {
35            url: url.clone(),
36            client,
37        })
38    }
39
40    fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, poolx::Error>> where Self::Connection: Sized {
41        Box::pin(async move {
42            let conn = self.client.get_async_connection().await.map_err(|e| poolx::Error::Io(std::io::Error::from(ErrorKind::ConnectionReset)))?;
43            Ok(RedisConnection { inner: conn })
44        })
45    }
46}
47
48pub struct RedisConnection {
49    inner: redis::aio::Connection,
50}
51
52impl AsMut<redis::aio::Connection> for RedisConnection {
53    fn as_mut(&mut self) -> &mut redis::aio::Connection {
54        &mut self.inner
55    }
56}
57
58
59impl Connection for RedisConnection {
60    type Options = RedisConnectionOption;
61
62    fn close(mut self) -> BoxFuture<'static, Result<(), poolx::Error>> {
63        Box::pin(async move {
64            self.inner.req_packed_command(&redis::cmd("QUIT")).await.map_err(|e| std::io::Error::new(ErrorKind::ConnectionReset, e.to_string()))?;
65            Ok(())
66        })
67    }
68
69    fn close_hard(self) -> BoxFuture<'static, Result<(), poolx::Error>> {
70        Box::pin(async move {
71            Ok(())
72        })
73    }
74
75    fn ping(&mut self) -> BoxFuture<'_, Result<(), poolx::Error>> {
76        Box::pin(async move {
77            let pong: String = redis::cmd("PING").query_async(&mut self.inner).await.map_err(|e| std::io::Error::new(ErrorKind::ConnectionReset, e.to_string()))?;
78            match pong.as_str() {
79                "PONG" => Ok(()),
80                _ => Err(poolx::Error::ResponseError),
81            }
82        })
83    }
84}
85
86impl ConnectionLike for RedisConnection{
87    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
88        self.inner.req_packed_command(cmd)
89    }
90
91    fn req_packed_commands<'a>(&'a mut self, cmd: &'a Pipeline, offset: usize, count: usize) -> RedisFuture<'a, Vec<Value>> {
92        self.inner.req_packed_commands(cmd, offset, count)
93    }
94
95    fn get_db(&self) -> i64 {
96        self.inner.get_db()
97    }
98}
99#[cfg(test)]
100mod tests {
101    use redis::cmd;
102
103    use poolx::{Pool, PoolOptions};
104
105    use crate::RedisConnection;
106
107    #[tokio::test]
108    async fn test_redis_connection_pool() {
109        let url = "redis://:foobared@127.0.0.1:6379";
110        let option = url.parse::<super::RedisConnectionOption>().unwrap();
111
112        let pool: Pool<RedisConnection> = PoolOptions::new()
113            .test_before_acquire(true)
114            .idle_timeout(std::time::Duration::from_secs(3))
115            .min_connections(3)
116            .max_connections(100)
117            .connect_lazy_with(option);
118
119        for i in 0..10 {
120            let mut conn = pool.acquire().await.unwrap();
121            let reply: String = cmd("PING").query_async(conn.as_mut()).await.unwrap();
122            println!("reply: {}", reply);
123        }
124    }
125}