1use bb8::{CustomizeConnection, ManageConnection, Pool};
2use redis::aio::MultiplexedConnection as Connection;
3use redis::AsyncCommands;
4pub use redis::RedisError;
5use redis::ToRedisArgs;
6pub use redis::Value as RedisValue;
7use redis::{Client, IntoConnectionInfo};
8use std::future::Future;
9use std::ops::DerefMut;
10use std::pin::Pin;
11
12pub type RedisPool = Pool<RedisConnectionManager>;
13
14#[derive(Debug)]
15pub struct NamespaceCustomizer {
16 namespace: String,
17}
18
19impl CustomizeConnection<RedisConnection, RedisError> for NamespaceCustomizer {
20 fn on_acquire<'a>(
21 &'a self,
22 connection: &'a mut RedisConnection,
23 ) -> Pin<Box<dyn Future<Output = Result<(), RedisError>> + Send + 'a>> {
24 Box::pin(async {
25 connection.set_namespace(self.namespace.clone());
27
28 Ok(())
29 })
30 }
31}
32
33#[must_use]
34pub fn with_custom_namespace(namespace: String) -> Box<NamespaceCustomizer> {
35 Box::new(NamespaceCustomizer { namespace })
36}
37
38#[derive(Clone, Debug)]
41pub struct RedisConnectionManager {
42 client: Client,
43}
44
45impl RedisConnectionManager {
46 pub fn new<T: IntoConnectionInfo>(info: T) -> Result<Self, RedisError> {
49 Ok(Self {
50 client: Client::open(info.into_connection_info()?)?,
51 })
52 }
53}
54
55impl ManageConnection for RedisConnectionManager {
56 type Connection = RedisConnection;
57 type Error = RedisError;
58
59 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
60 let config = redis::AsyncConnectionConfig::new().set_response_timeout(None);
63 Ok(RedisConnection::new(
64 self.client
65 .get_multiplexed_async_connection_with_config(&config)
66 .await?,
67 ))
68 }
69
70 async fn is_valid(&self, mut conn: &mut Self::Connection) -> Result<(), Self::Error> {
71 let pong: String = redis::cmd("PING")
72 .query_async(&mut conn.deref_mut().connection)
73 .await?;
74 match pong.as_str() {
75 "PONG" => Ok(()),
76 _ => Err(redis::RedisError::from((
77 redis::ErrorKind::Server(redis::ServerErrorKind::ResponseError),
78 "ping request",
79 ))),
80 }
81 }
82
83 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
84 false
85 }
86}
87
88pub struct RedisConnection {
90 connection: Connection,
91 namespace: Option<String>,
92}
93
94impl RedisConnection {
95 #[must_use]
96 pub fn new(connection: Connection) -> Self {
97 Self {
98 connection,
99 namespace: None,
100 }
101 }
102
103 pub fn set_namespace(&mut self, namespace: String) {
104 self.namespace = Some(namespace);
105 }
106
107 #[must_use]
108 pub fn with_namespace(self, namespace: String) -> Self {
109 Self {
110 connection: self.connection,
111 namespace: Some(namespace),
112 }
113 }
114
115 fn namespaced_key(&self, key: String) -> String {
116 if let Some(ref namespace) = self.namespace {
117 return format!("{namespace}:{key}");
118 }
119
120 key
121 }
122
123 fn namespaced_keys(&self, keys: Vec<String>) -> Vec<String> {
124 if let Some(ref namespace) = self.namespace {
125 let keys: Vec<String> = keys
126 .iter()
127 .map(|key| format!("{namespace}:{key}"))
128 .collect();
129
130 return keys;
131 }
132
133 keys
134 }
135
136 pub fn unnamespaced_borrow_mut(&mut self) -> &mut Connection {
138 &mut self.connection
139 }
140
141 pub async fn brpop(
142 &mut self,
143 keys: Vec<String>,
144 timeout: usize,
145 ) -> Result<Option<(String, String)>, RedisError> {
146 self.connection
147 .brpop(self.namespaced_keys(keys), timeout as f64)
148 .await
149 }
150
151 pub fn cmd_with_key(&mut self, cmd: &str, key: String) -> redis::Cmd {
152 let mut c = redis::cmd(cmd);
153 c.arg(self.namespaced_key(key));
154 c
155 }
156
157 pub async fn del(&mut self, key: String) -> Result<usize, RedisError> {
158 self.connection.del(self.namespaced_key(key)).await
159 }
160
161 pub async fn expire(&mut self, key: String, value: usize) -> Result<usize, RedisError> {
162 self.connection
163 .expire(self.namespaced_key(key), value as i64)
164 .await
165 }
166
167 pub async fn lpush<V>(&mut self, key: String, value: V) -> Result<(), RedisError>
168 where
169 V: ToRedisArgs + Send + Sync,
170 {
171 self.connection.lpush(self.namespaced_key(key), value).await
172 }
173
174 pub async fn sadd<V>(&mut self, key: String, value: V) -> Result<(), RedisError>
175 where
176 V: ToRedisArgs + Send + Sync,
177 {
178 self.connection.sadd(self.namespaced_key(key), value).await
179 }
180
181 pub async fn set_nx_ex<V>(
182 &mut self,
183 key: String,
184 value: V,
185 ttl_in_seconds: usize,
186 ) -> Result<RedisValue, RedisError>
187 where
188 V: ToRedisArgs + Send + Sync,
189 {
190 redis::cmd("SET")
191 .arg(self.namespaced_key(key))
192 .arg(value)
193 .arg("NX")
194 .arg("EX")
195 .arg(ttl_in_seconds)
196 .query_async(self.unnamespaced_borrow_mut())
197 .await
198 }
199
200 pub async fn zrange(
201 &mut self,
202 key: String,
203 lower: isize,
204 upper: isize,
205 ) -> Result<Vec<String>, RedisError> {
206 self.connection
207 .zrange(self.namespaced_key(key), lower, upper)
208 .await
209 }
210
211 pub async fn zrangebyscore_limit<
212 L: redis::ToSingleRedisArg + Send + Sync,
213 U: redis::ToSingleRedisArg + Sync + Send,
214 >(
215 &mut self,
216 key: String,
217 lower: L,
218 upper: U,
219 offset: isize,
220 limit: isize,
221 ) -> Result<Vec<String>, RedisError> {
222 self.connection
223 .zrangebyscore_limit(self.namespaced_key(key), lower, upper, offset, limit)
224 .await
225 }
226
227 pub async fn zadd<
228 V: redis::ToSingleRedisArg + Send + Sync,
229 S: redis::ToSingleRedisArg + Send + Sync,
230 >(
231 &mut self,
232 key: String,
233 value: V,
234 score: S,
235 ) -> Result<usize, RedisError> {
236 self.connection
237 .zadd(self.namespaced_key(key), value, score)
238 .await
239 }
240
241 pub async fn zadd_ch<V: ToRedisArgs + Send + Sync, S: ToRedisArgs + Send + Sync>(
242 &mut self,
243 key: String,
244 value: V,
245 score: S,
246 ) -> Result<bool, RedisError> {
247 redis::cmd("ZADD")
248 .arg(self.namespaced_key(key))
249 .arg("CH")
250 .arg(score)
251 .arg(value)
252 .query_async(self.unnamespaced_borrow_mut())
253 .await
254 }
255
256 pub async fn zrem<V>(&mut self, key: String, value: V) -> Result<usize, RedisError>
257 where
258 V: ToRedisArgs + Send + Sync,
259 {
260 self.connection.zrem(self.namespaced_key(key), value).await
261 }
262}