1use std::io::ErrorKind;
2use std::ops::{Deref, DerefMut};
3use std::str::FromStr;
4
5use futures_core::future::BoxFuture;
6use redis::aio::ConnectionLike;
7use redis::{Client, Cmd, Pipeline, RedisFuture, Value};
8
9use poolx::{Connection, ConnectOptions, futures_core, url};
10use poolx::url::Url;
11
12#[derive(Debug, Clone)]
13pub struct RedisConnectionOption {
14 url: Url,
15 client: Client,
16}
17
18impl RedisConnectionOption {}
19
20impl FromStr for RedisConnectionOption {
21 type Err = poolx::Error;
22
23 fn from_str(s: &str) -> Result<Self, Self::Err> {
24 let url = s.parse::<Url>().map_err(|e| poolx::Error::Configuration(Box::new(e)))?;
25 Self::from_url(&url)
26 }
27}
28
29impl ConnectOptions for RedisConnectionOption {
30 type Connection = RedisConnection;
31
32 fn from_url(url: &url::Url) -> Result<Self, poolx::Error> {
33 let client = Client::open(url.clone()).map_err(|e| poolx::Error::Configuration(Box::new(e)))?;
34 Ok(Self {
35 url: url.clone(),
36 client,
37 })
38 }
39
40 fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, poolx::Error>> where Self::Connection: Sized {
41 Box::pin(async move {
42 let conn = self.client.get_async_connection().await.map_err(|e| poolx::Error::Io(std::io::Error::from(ErrorKind::ConnectionReset)))?;
43 Ok(RedisConnection { inner: conn })
44 })
45 }
46}
47
48pub struct RedisConnection {
49 inner: redis::aio::Connection,
50}
51
52impl AsMut<redis::aio::Connection> for RedisConnection {
53 fn as_mut(&mut self) -> &mut redis::aio::Connection {
54 &mut self.inner
55 }
56}
57
58
59impl Connection for RedisConnection {
60 type Options = RedisConnectionOption;
61
62 fn close(mut self) -> BoxFuture<'static, Result<(), poolx::Error>> {
63 Box::pin(async move {
64 self.inner.req_packed_command(&redis::cmd("QUIT")).await.map_err(|e| std::io::Error::new(ErrorKind::ConnectionReset, e.to_string()))?;
65 Ok(())
66 })
67 }
68
69 fn close_hard(self) -> BoxFuture<'static, Result<(), poolx::Error>> {
70 Box::pin(async move {
71 Ok(())
72 })
73 }
74
75 fn ping(&mut self) -> BoxFuture<'_, Result<(), poolx::Error>> {
76 Box::pin(async move {
77 let pong: String = redis::cmd("PING").query_async(&mut self.inner).await.map_err(|e| std::io::Error::new(ErrorKind::ConnectionReset, e.to_string()))?;
78 match pong.as_str() {
79 "PONG" => Ok(()),
80 _ => Err(poolx::Error::ResponseError),
81 }
82 })
83 }
84}
85
86impl ConnectionLike for RedisConnection{
87 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
88 self.inner.req_packed_command(cmd)
89 }
90
91 fn req_packed_commands<'a>(&'a mut self, cmd: &'a Pipeline, offset: usize, count: usize) -> RedisFuture<'a, Vec<Value>> {
92 self.inner.req_packed_commands(cmd, offset, count)
93 }
94
95 fn get_db(&self) -> i64 {
96 self.inner.get_db()
97 }
98}
99#[cfg(test)]
100mod tests {
101 use redis::cmd;
102
103 use poolx::{Pool, PoolOptions};
104
105 use crate::RedisConnection;
106
107 #[tokio::test]
108 async fn test_redis_connection_pool() {
109 let url = "redis://:foobared@127.0.0.1:6379";
110 let option = url.parse::<super::RedisConnectionOption>().unwrap();
111
112 let pool: Pool<RedisConnection> = PoolOptions::new()
113 .test_before_acquire(true)
114 .idle_timeout(std::time::Duration::from_secs(3))
115 .min_connections(3)
116 .max_connections(100)
117 .connect_lazy_with(option);
118
119 for i in 0..10 {
120 let mut conn = pool.acquire().await.unwrap();
121 let reply: String = cmd("PING").query_async(conn.as_mut()).await.unwrap();
122 println!("reply: {}", reply);
123 }
124 }
125}