sidekiq/
redis.rs

1use bb8::{CustomizeConnection, ManageConnection, Pool};
2use redis::AsyncCommands;
3pub use redis::RedisError;
4use redis::ToRedisArgs;
5pub use redis::Value as RedisValue;
6use redis::{aio::MultiplexedConnection as Connection, ErrorKind};
7use redis::{Client, IntoConnectionInfo};
8use std::future::Future;
9use std::ops::DerefMut;
10use std::pin::Pin;
11
12pub type RedisPool = Pool<RedisConnectionManager>;
13
14#[derive(Debug)]
15pub struct NamespaceCustomizer {
16    namespace: String,
17}
18
19impl CustomizeConnection<RedisConnection, RedisError> for NamespaceCustomizer {
20    fn on_acquire<'a>(
21        &'a self,
22        connection: &'a mut RedisConnection,
23    ) -> Pin<Box<dyn Future<Output = Result<(), RedisError>> + Send + 'a>> {
24        Box::pin(async {
25            // All redis operations used by the sidekiq lib will use this as a prefix.
26            connection.set_namespace(self.namespace.clone());
27
28            Ok(())
29        })
30    }
31}
32
33#[must_use]
34pub fn with_custom_namespace(namespace: String) -> Box<NamespaceCustomizer> {
35    Box::new(NamespaceCustomizer { namespace })
36}
37
38/// A `bb8::ManageConnection` for `redis::Client::get_async_connection` wrapped in a helper type
39/// for namespacing.
40#[derive(Clone, Debug)]
41pub struct RedisConnectionManager {
42    client: Client,
43}
44
45impl RedisConnectionManager {
46    /// Create a new `RedisConnectionManager`.
47    /// See `redis::Client::open` for a description of the parameter types.
48    pub fn new<T: IntoConnectionInfo>(info: T) -> Result<Self, RedisError> {
49        Ok(Self {
50            client: Client::open(info.into_connection_info()?)?,
51        })
52    }
53}
54
55impl ManageConnection for RedisConnectionManager {
56    type Connection = RedisConnection;
57    type Error = RedisError;
58
59    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
60        Ok(RedisConnection::new(
61            self.client.get_multiplexed_async_connection().await?,
62        ))
63    }
64
65    async fn is_valid(&self, mut conn: &mut Self::Connection) -> Result<(), Self::Error> {
66        let pong: String = redis::cmd("PING")
67            .query_async(&mut conn.deref_mut().connection)
68            .await?;
69        match pong.as_str() {
70            "PONG" => Ok(()),
71            _ => Err((ErrorKind::ResponseError, "ping request").into()),
72        }
73    }
74
75    fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
76        false
77    }
78}
79
80/// A wrapper type for making the redis crate compatible with namespacing.
81pub struct RedisConnection {
82    connection: Connection,
83    namespace: Option<String>,
84}
85
86impl RedisConnection {
87    #[must_use]
88    pub fn new(connection: Connection) -> Self {
89        Self {
90            connection,
91            namespace: None,
92        }
93    }
94
95    pub fn set_namespace(&mut self, namespace: String) {
96        self.namespace = Some(namespace);
97    }
98
99    #[must_use]
100    pub fn with_namespace(self, namespace: String) -> Self {
101        Self {
102            connection: self.connection,
103            namespace: Some(namespace),
104        }
105    }
106
107    fn namespaced_key(&self, key: String) -> String {
108        if let Some(ref namespace) = self.namespace {
109            return format!("{namespace}:{key}");
110        }
111
112        key
113    }
114
115    fn namespaced_keys(&self, keys: Vec<String>) -> Vec<String> {
116        if let Some(ref namespace) = self.namespace {
117            let keys: Vec<String> = keys
118                .iter()
119                .map(|key| format!("{namespace}:{key}"))
120                .collect();
121
122            return keys;
123        }
124
125        keys
126    }
127
128    /// This allows you to borrow the raw redis connection without any namespacing support.
129    pub fn unnamespaced_borrow_mut(&mut self) -> &mut Connection {
130        &mut self.connection
131    }
132
133    pub async fn brpop(
134        &mut self,
135        keys: Vec<String>,
136        timeout: usize,
137    ) -> Result<Option<(String, String)>, RedisError> {
138        self.connection
139            .brpop(self.namespaced_keys(keys), timeout as f64)
140            .await
141    }
142
143    pub fn cmd_with_key(&mut self, cmd: &str, key: String) -> redis::Cmd {
144        let mut c = redis::cmd(cmd);
145        c.arg(self.namespaced_key(key));
146        c
147    }
148
149    pub async fn del(&mut self, key: String) -> Result<usize, RedisError> {
150        self.connection.del(self.namespaced_key(key)).await
151    }
152
153    pub async fn expire(&mut self, key: String, value: usize) -> Result<usize, RedisError> {
154        self.connection
155            .expire(self.namespaced_key(key), value as i64)
156            .await
157    }
158
159    pub async fn lpush<V>(&mut self, key: String, value: V) -> Result<(), RedisError>
160    where
161        V: ToRedisArgs + Send + Sync,
162    {
163        self.connection.lpush(self.namespaced_key(key), value).await
164    }
165
166    pub async fn sadd<V>(&mut self, key: String, value: V) -> Result<(), RedisError>
167    where
168        V: ToRedisArgs + Send + Sync,
169    {
170        self.connection.sadd(self.namespaced_key(key), value).await
171    }
172
173    pub async fn set_nx_ex<V>(
174        &mut self,
175        key: String,
176        value: V,
177        ttl_in_seconds: usize,
178    ) -> Result<RedisValue, RedisError>
179    where
180        V: ToRedisArgs + Send + Sync,
181    {
182        redis::cmd("SET")
183            .arg(self.namespaced_key(key))
184            .arg(value)
185            .arg("NX")
186            .arg("EX")
187            .arg(ttl_in_seconds)
188            .query_async(self.unnamespaced_borrow_mut())
189            .await
190    }
191
192    pub async fn zrange(
193        &mut self,
194        key: String,
195        lower: isize,
196        upper: isize,
197    ) -> Result<Vec<String>, RedisError> {
198        self.connection
199            .zrange(self.namespaced_key(key), lower, upper)
200            .await
201    }
202
203    pub async fn zrangebyscore_limit<L: ToRedisArgs + Send + Sync, U: ToRedisArgs + Sync + Send>(
204        &mut self,
205        key: String,
206        lower: L,
207        upper: U,
208        offset: isize,
209        limit: isize,
210    ) -> Result<Vec<String>, RedisError> {
211        self.connection
212            .zrangebyscore_limit(self.namespaced_key(key), lower, upper, offset, limit)
213            .await
214    }
215
216    pub async fn zadd<V: ToRedisArgs + Send + Sync, S: ToRedisArgs + Send + Sync>(
217        &mut self,
218        key: String,
219        value: V,
220        score: S,
221    ) -> Result<usize, RedisError> {
222        self.connection
223            .zadd(self.namespaced_key(key), value, score)
224            .await
225    }
226
227    pub async fn zadd_ch<V: ToRedisArgs + Send + Sync, S: ToRedisArgs + Send + Sync>(
228        &mut self,
229        key: String,
230        value: V,
231        score: S,
232    ) -> Result<bool, RedisError> {
233        redis::cmd("ZADD")
234            .arg(self.namespaced_key(key))
235            .arg("CH")
236            .arg(score)
237            .arg(value)
238            .query_async(self.unnamespaced_borrow_mut())
239            .await
240    }
241
242    pub async fn zrem<V>(&mut self, key: String, value: V) -> Result<usize, RedisError>
243    where
244        V: ToRedisArgs + Send + Sync,
245    {
246        self.connection.zrem(self.namespaced_key(key), value).await
247    }
248}