1use bb8::{CustomizeConnection, ManageConnection, Pool};
2use redis::AsyncCommands;
3pub use redis::RedisError;
4use redis::ToRedisArgs;
5pub use redis::Value as RedisValue;
6use redis::{aio::MultiplexedConnection as Connection, ErrorKind};
7use redis::{Client, IntoConnectionInfo};
8use std::future::Future;
9use std::ops::DerefMut;
10use std::pin::Pin;
11
12pub type RedisPool = Pool<RedisConnectionManager>;
13
14#[derive(Debug)]
15pub struct NamespaceCustomizer {
16 namespace: String,
17}
18
19impl CustomizeConnection<RedisConnection, RedisError> for NamespaceCustomizer {
20 fn on_acquire<'a>(
21 &'a self,
22 connection: &'a mut RedisConnection,
23 ) -> Pin<Box<dyn Future<Output = Result<(), RedisError>> + Send + 'a>> {
24 Box::pin(async {
25 connection.set_namespace(self.namespace.clone());
27
28 Ok(())
29 })
30 }
31}
32
33#[must_use]
34pub fn with_custom_namespace(namespace: String) -> Box<NamespaceCustomizer> {
35 Box::new(NamespaceCustomizer { namespace })
36}
37
38#[derive(Clone, Debug)]
41pub struct RedisConnectionManager {
42 client: Client,
43}
44
45impl RedisConnectionManager {
46 pub fn new<T: IntoConnectionInfo>(info: T) -> Result<Self, RedisError> {
49 Ok(Self {
50 client: Client::open(info.into_connection_info()?)?,
51 })
52 }
53}
54
55impl ManageConnection for RedisConnectionManager {
56 type Connection = RedisConnection;
57 type Error = RedisError;
58
59 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
60 Ok(RedisConnection::new(
61 self.client.get_multiplexed_async_connection().await?,
62 ))
63 }
64
65 async fn is_valid(&self, mut conn: &mut Self::Connection) -> Result<(), Self::Error> {
66 let pong: String = redis::cmd("PING")
67 .query_async(&mut conn.deref_mut().connection)
68 .await?;
69 match pong.as_str() {
70 "PONG" => Ok(()),
71 _ => Err((ErrorKind::ResponseError, "ping request").into()),
72 }
73 }
74
75 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
76 false
77 }
78}
79
80pub struct RedisConnection {
82 connection: Connection,
83 namespace: Option<String>,
84}
85
86impl RedisConnection {
87 #[must_use]
88 pub fn new(connection: Connection) -> Self {
89 Self {
90 connection,
91 namespace: None,
92 }
93 }
94
95 pub fn set_namespace(&mut self, namespace: String) {
96 self.namespace = Some(namespace);
97 }
98
99 #[must_use]
100 pub fn with_namespace(self, namespace: String) -> Self {
101 Self {
102 connection: self.connection,
103 namespace: Some(namespace),
104 }
105 }
106
107 fn namespaced_key(&self, key: String) -> String {
108 if let Some(ref namespace) = self.namespace {
109 return format!("{namespace}:{key}");
110 }
111
112 key
113 }
114
115 fn namespaced_keys(&self, keys: Vec<String>) -> Vec<String> {
116 if let Some(ref namespace) = self.namespace {
117 let keys: Vec<String> = keys
118 .iter()
119 .map(|key| format!("{namespace}:{key}"))
120 .collect();
121
122 return keys;
123 }
124
125 keys
126 }
127
128 pub fn unnamespaced_borrow_mut(&mut self) -> &mut Connection {
130 &mut self.connection
131 }
132
133 pub async fn brpop(
134 &mut self,
135 keys: Vec<String>,
136 timeout: usize,
137 ) -> Result<Option<(String, String)>, RedisError> {
138 self.connection
139 .brpop(self.namespaced_keys(keys), timeout as f64)
140 .await
141 }
142
143 pub fn cmd_with_key(&mut self, cmd: &str, key: String) -> redis::Cmd {
144 let mut c = redis::cmd(cmd);
145 c.arg(self.namespaced_key(key));
146 c
147 }
148
149 pub async fn del(&mut self, key: String) -> Result<usize, RedisError> {
150 self.connection.del(self.namespaced_key(key)).await
151 }
152
153 pub async fn expire(&mut self, key: String, value: usize) -> Result<usize, RedisError> {
154 self.connection
155 .expire(self.namespaced_key(key), value as i64)
156 .await
157 }
158
159 pub async fn lpush<V>(&mut self, key: String, value: V) -> Result<(), RedisError>
160 where
161 V: ToRedisArgs + Send + Sync,
162 {
163 self.connection.lpush(self.namespaced_key(key), value).await
164 }
165
166 pub async fn sadd<V>(&mut self, key: String, value: V) -> Result<(), RedisError>
167 where
168 V: ToRedisArgs + Send + Sync,
169 {
170 self.connection.sadd(self.namespaced_key(key), value).await
171 }
172
173 pub async fn set_nx_ex<V>(
174 &mut self,
175 key: String,
176 value: V,
177 ttl_in_seconds: usize,
178 ) -> Result<RedisValue, RedisError>
179 where
180 V: ToRedisArgs + Send + Sync,
181 {
182 redis::cmd("SET")
183 .arg(self.namespaced_key(key))
184 .arg(value)
185 .arg("NX")
186 .arg("EX")
187 .arg(ttl_in_seconds)
188 .query_async(self.unnamespaced_borrow_mut())
189 .await
190 }
191
192 pub async fn zrange(
193 &mut self,
194 key: String,
195 lower: isize,
196 upper: isize,
197 ) -> Result<Vec<String>, RedisError> {
198 self.connection
199 .zrange(self.namespaced_key(key), lower, upper)
200 .await
201 }
202
203 pub async fn zrangebyscore_limit<L: ToRedisArgs + Send + Sync, U: ToRedisArgs + Sync + Send>(
204 &mut self,
205 key: String,
206 lower: L,
207 upper: U,
208 offset: isize,
209 limit: isize,
210 ) -> Result<Vec<String>, RedisError> {
211 self.connection
212 .zrangebyscore_limit(self.namespaced_key(key), lower, upper, offset, limit)
213 .await
214 }
215
216 pub async fn zadd<V: ToRedisArgs + Send + Sync, S: ToRedisArgs + Send + Sync>(
217 &mut self,
218 key: String,
219 value: V,
220 score: S,
221 ) -> Result<usize, RedisError> {
222 self.connection
223 .zadd(self.namespaced_key(key), value, score)
224 .await
225 }
226
227 pub async fn zadd_ch<V: ToRedisArgs + Send + Sync, S: ToRedisArgs + Send + Sync>(
228 &mut self,
229 key: String,
230 value: V,
231 score: S,
232 ) -> Result<bool, RedisError> {
233 redis::cmd("ZADD")
234 .arg(self.namespaced_key(key))
235 .arg("CH")
236 .arg(score)
237 .arg(value)
238 .query_async(self.unnamespaced_borrow_mut())
239 .await
240 }
241
242 pub async fn zrem<V>(&mut self, key: String, value: V) -> Result<usize, RedisError>
243 where
244 V: ToRedisArgs + Send + Sync,
245 {
246 self.connection.zrem(self.namespaced_key(key), value).await
247 }
248}