Skip to main content

sidekiq/
redis.rs

1use bb8::{CustomizeConnection, ManageConnection, Pool};
2use redis::aio::MultiplexedConnection as Connection;
3use redis::AsyncCommands;
4pub use redis::RedisError;
5use redis::ToRedisArgs;
6pub use redis::Value as RedisValue;
7use redis::{Client, IntoConnectionInfo};
8use std::future::Future;
9use std::ops::DerefMut;
10use std::pin::Pin;
11
12pub type RedisPool = Pool<RedisConnectionManager>;
13
14#[derive(Debug)]
15pub struct NamespaceCustomizer {
16    namespace: String,
17}
18
19impl CustomizeConnection<RedisConnection, RedisError> for NamespaceCustomizer {
20    fn on_acquire<'a>(
21        &'a self,
22        connection: &'a mut RedisConnection,
23    ) -> Pin<Box<dyn Future<Output = Result<(), RedisError>> + Send + 'a>> {
24        Box::pin(async {
25            // All redis operations used by the sidekiq lib will use this as a prefix.
26            connection.set_namespace(self.namespace.clone());
27
28            Ok(())
29        })
30    }
31}
32
33#[must_use]
34pub fn with_custom_namespace(namespace: String) -> Box<NamespaceCustomizer> {
35    Box::new(NamespaceCustomizer { namespace })
36}
37
38/// A `bb8::ManageConnection` for `redis::Client::get_async_connection` wrapped in a helper type
39/// for namespacing.
40#[derive(Clone, Debug)]
41pub struct RedisConnectionManager {
42    client: Client,
43}
44
45impl RedisConnectionManager {
46    /// Create a new `RedisConnectionManager`.
47    /// See `redis::Client::open` for a description of the parameter types.
48    pub fn new<T: IntoConnectionInfo>(info: T) -> Result<Self, RedisError> {
49        Ok(Self {
50            client: Client::open(info.into_connection_info()?)?,
51        })
52    }
53}
54
55impl ManageConnection for RedisConnectionManager {
56    type Connection = RedisConnection;
57    type Error = RedisError;
58
59    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
60        // Disable response timeout because this connection is used for blocking
61        // commands like BRPOP which can legitimately block for seconds.
62        let config = redis::AsyncConnectionConfig::new().set_response_timeout(None);
63        Ok(RedisConnection::new(
64            self.client
65                .get_multiplexed_async_connection_with_config(&config)
66                .await?,
67        ))
68    }
69
70    async fn is_valid(&self, mut conn: &mut Self::Connection) -> Result<(), Self::Error> {
71        let pong: String = redis::cmd("PING")
72            .query_async(&mut conn.deref_mut().connection)
73            .await?;
74        match pong.as_str() {
75            "PONG" => Ok(()),
76            _ => Err(redis::RedisError::from((
77                redis::ErrorKind::Server(redis::ServerErrorKind::ResponseError),
78                "ping request",
79            ))),
80        }
81    }
82
83    fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
84        false
85    }
86}
87
88/// A wrapper type for making the redis crate compatible with namespacing.
89pub struct RedisConnection {
90    connection: Connection,
91    namespace: Option<String>,
92}
93
94impl RedisConnection {
95    #[must_use]
96    pub fn new(connection: Connection) -> Self {
97        Self {
98            connection,
99            namespace: None,
100        }
101    }
102
103    pub fn set_namespace(&mut self, namespace: String) {
104        self.namespace = Some(namespace);
105    }
106
107    #[must_use]
108    pub fn with_namespace(self, namespace: String) -> Self {
109        Self {
110            connection: self.connection,
111            namespace: Some(namespace),
112        }
113    }
114
115    fn namespaced_key(&self, key: String) -> String {
116        if let Some(ref namespace) = self.namespace {
117            return format!("{namespace}:{key}");
118        }
119
120        key
121    }
122
123    fn namespaced_keys(&self, keys: Vec<String>) -> Vec<String> {
124        if let Some(ref namespace) = self.namespace {
125            let keys: Vec<String> = keys
126                .iter()
127                .map(|key| format!("{namespace}:{key}"))
128                .collect();
129
130            return keys;
131        }
132
133        keys
134    }
135
136    /// This allows you to borrow the raw redis connection without any namespacing support.
137    pub fn unnamespaced_borrow_mut(&mut self) -> &mut Connection {
138        &mut self.connection
139    }
140
141    pub async fn brpop(
142        &mut self,
143        keys: Vec<String>,
144        timeout: usize,
145    ) -> Result<Option<(String, String)>, RedisError> {
146        self.connection
147            .brpop(self.namespaced_keys(keys), timeout as f64)
148            .await
149    }
150
151    pub fn cmd_with_key(&mut self, cmd: &str, key: String) -> redis::Cmd {
152        let mut c = redis::cmd(cmd);
153        c.arg(self.namespaced_key(key));
154        c
155    }
156
157    pub async fn del(&mut self, key: String) -> Result<usize, RedisError> {
158        self.connection.del(self.namespaced_key(key)).await
159    }
160
161    pub async fn expire(&mut self, key: String, value: usize) -> Result<usize, RedisError> {
162        self.connection
163            .expire(self.namespaced_key(key), value as i64)
164            .await
165    }
166
167    pub async fn lpush<V>(&mut self, key: String, value: V) -> Result<(), RedisError>
168    where
169        V: ToRedisArgs + Send + Sync,
170    {
171        self.connection.lpush(self.namespaced_key(key), value).await
172    }
173
174    pub async fn sadd<V>(&mut self, key: String, value: V) -> Result<(), RedisError>
175    where
176        V: ToRedisArgs + Send + Sync,
177    {
178        self.connection.sadd(self.namespaced_key(key), value).await
179    }
180
181    pub async fn set_nx_ex<V>(
182        &mut self,
183        key: String,
184        value: V,
185        ttl_in_seconds: usize,
186    ) -> Result<RedisValue, RedisError>
187    where
188        V: ToRedisArgs + Send + Sync,
189    {
190        redis::cmd("SET")
191            .arg(self.namespaced_key(key))
192            .arg(value)
193            .arg("NX")
194            .arg("EX")
195            .arg(ttl_in_seconds)
196            .query_async(self.unnamespaced_borrow_mut())
197            .await
198    }
199
200    pub async fn zrange(
201        &mut self,
202        key: String,
203        lower: isize,
204        upper: isize,
205    ) -> Result<Vec<String>, RedisError> {
206        self.connection
207            .zrange(self.namespaced_key(key), lower, upper)
208            .await
209    }
210
211    pub async fn zrangebyscore_limit<
212        L: redis::ToSingleRedisArg + Send + Sync,
213        U: redis::ToSingleRedisArg + Sync + Send,
214    >(
215        &mut self,
216        key: String,
217        lower: L,
218        upper: U,
219        offset: isize,
220        limit: isize,
221    ) -> Result<Vec<String>, RedisError> {
222        self.connection
223            .zrangebyscore_limit(self.namespaced_key(key), lower, upper, offset, limit)
224            .await
225    }
226
227    pub async fn zadd<
228        V: redis::ToSingleRedisArg + Send + Sync,
229        S: redis::ToSingleRedisArg + Send + Sync,
230    >(
231        &mut self,
232        key: String,
233        value: V,
234        score: S,
235    ) -> Result<usize, RedisError> {
236        self.connection
237            .zadd(self.namespaced_key(key), value, score)
238            .await
239    }
240
241    pub async fn zadd_ch<V: ToRedisArgs + Send + Sync, S: ToRedisArgs + Send + Sync>(
242        &mut self,
243        key: String,
244        value: V,
245        score: S,
246    ) -> Result<bool, RedisError> {
247        redis::cmd("ZADD")
248            .arg(self.namespaced_key(key))
249            .arg("CH")
250            .arg(score)
251            .arg(value)
252            .query_async(self.unnamespaced_borrow_mut())
253            .await
254    }
255
256    pub async fn zrem<V>(&mut self, key: String, value: V) -> Result<usize, RedisError>
257    where
258        V: ToRedisArgs + Send + Sync,
259    {
260        self.connection.zrem(self.namespaced_key(key), value).await
261    }
262}