redis_cmd/
cmd.rs

1use redis::aio::ConnectionManager;
2#[cfg(feature = "cluster")]
3use redis::cluster::ClusterClient;
4use redis::streams::StreamReadOptions;
5use redis::{from_redis_value, AsyncCommands, ToRedisArgs, Value};
6use std::fmt::{Debug, Formatter};
7
8#[derive(Clone)]
9pub struct RedisClient {
10    #[cfg(feature = "cluster")]
11    pub client: ClusterClient,
12    #[cfg(feature = "single")]
13    pub client: ConnectionManager,
14}
15
16impl Debug for RedisClient {
17    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
18        f.write_fmt(format_args!(""))
19    }
20}
21
22impl RedisClient {
23    #[tracing::instrument]
24    pub async fn get<K: ToRedisArgs + Debug + Send + Sync>(
25        &mut self,
26        key: K,
27    ) -> redis::RedisResult<String> {
28        #[cfg(feature = "cluster")]
29        let data = self.client.get_connection()?.get(key)?;
30
31        #[cfg(feature = "single")]
32        let data = self.client.get(key).await?;
33        Ok(data)
34    }
35
36    #[tracing::instrument]
37    pub async fn mget<K: ToRedisArgs + Debug + Send + Sync>(
38        &mut self,
39        key: K,
40    ) -> redis::RedisResult<Vec<String>> {
41        #[cfg(feature = "cluster")]
42        let data = self.client.get_connection()?.mget(key)?;
43
44        #[cfg(feature = "single")]
45        let data = self.client.mget(key).await?;
46        Ok(data)
47    }
48
49    #[tracing::instrument]
50    pub async fn exists<K: ToRedisArgs + Debug + Send + Sync>(
51        &mut self,
52        key: K,
53    ) -> redis::RedisResult<u8> {
54        #[cfg(feature = "cluster")]
55        let data = self.client.get_connection()?.exists(key)?;
56
57        #[cfg(feature = "single")]
58        let data = self.client.exists(key).await?;
59        Ok(data)
60    }
61
62    #[tracing::instrument]
63    pub async fn set<K: ToRedisArgs + Debug + Send + Sync, V: ToRedisArgs + Debug + Send + Sync>(
64        &mut self,
65        key: K,
66        value: V,
67    ) -> redis::RedisResult<bool> {
68        #[cfg(feature = "cluster")]
69        let ok = self.client.get_connection()?.set(key, value)?;
70
71        #[cfg(feature = "single")]
72        let ok = self.client.set(key, value).await?;
73        Ok(ok)
74    }
75
76    #[tracing::instrument]
77    pub async fn expire<K: ToRedisArgs + Debug + Send + Sync>(
78        &mut self,
79        key: K,
80        expire: usize,
81    ) -> redis::RedisResult<i8> {
82        #[cfg(feature = "cluster")]
83        let ok = self.client.get_connection()?.expire(key, expire)?;
84
85        #[cfg(feature = "single")]
86        let ok = self.client.expire(key, expire).await?;
87        Ok(ok)
88    }
89
90    #[tracing::instrument]
91    pub async fn del<K: ToRedisArgs + Debug + Send + Sync>(
92        &mut self,
93        key: K,
94    ) -> redis::RedisResult<u8> {
95        #[cfg(feature = "cluster")]
96        let ok = self.client.get_connection()?.del(key)?;
97
98        #[cfg(feature = "single")]
99        let ok = self.client.del(key).await?;
100        Ok(ok)
101    }
102
103    #[tracing::instrument]
104    pub async fn xadd<
105        K: ToRedisArgs + Debug + Send + Sync,
106        F: ToRedisArgs + Debug + Send + Sync,
107        V: ToRedisArgs + Debug + Send + Sync,
108    >(
109        &mut self,
110        key: K,
111        items: &[(F, V)],
112    ) -> redis::RedisResult<String> {
113        #[cfg(feature = "cluster")]
114        let ok = self.client.get_connection()?.xadd(key, "*", items)?;
115
116        #[cfg(feature = "single")]
117        let ok = self.client.xadd(key, "*", items).await?;
118        Ok(ok)
119    }
120
121    /// return OK
122    #[tracing::instrument]
123    pub async fn xgroup_create<
124        K: ToRedisArgs + Debug + Send + Sync,
125        G: ToRedisArgs + Debug + Send + Sync,
126    >(
127        &mut self,
128        key: K,
129        group: G,
130    ) -> redis::RedisResult<()> {
131        let ok = self.is_exist_group_name(&key, &group).await?;
132
133        if !ok {
134            #[cfg(feature = "cluster")]
135            self.client
136                .get_connection()?
137                .xgroup_create_mkstream(key, group, 0)?;
138
139            #[cfg(feature = "single")]
140            self.client.xgroup_create_mkstream(key, group, 0).await?;
141        }
142        Ok(())
143    }
144
145    /// return OK
146    #[tracing::instrument]
147    pub async fn xread_group(
148        &mut self,
149        key: &str,
150        group: &str,
151        consumer: &str,
152    ) -> redis::RedisResult<Value> {
153        let keys = vec![key];
154        let ids = vec![">"];
155        let mut opts = StreamReadOptions::default();
156        opts = opts.count(1);
157        opts = opts.group(group, consumer);
158        opts = opts.block(3000);
159        // opts = opts.noack();
160        #[cfg(feature = "cluster")]
161        let data = self
162            .client
163            .get_connection()?
164            .xread_options(&keys, &ids, &opts)?;
165
166        #[cfg(feature = "single")]
167        let data = self.client.xread_options(&keys, &ids, &opts).await?;
168        Ok(data)
169    }
170
171    #[tracing::instrument]
172    pub async fn xinfo_groups<K: ToRedisArgs + Debug + Send + Sync>(
173        &mut self,
174        key: K,
175    ) -> redis::RedisResult<Value> {
176        #[cfg(feature = "cluster")]
177        let v = self.client.get_connection()?.xinfo_groups(key)?;
178
179        #[cfg(feature = "single")]
180        let v = self.client.xinfo_groups(key).await?;
181        Ok(v)
182    }
183
184    #[tracing::instrument]
185    pub async fn is_exist_group_name<
186        K: ToRedisArgs + Debug + Send + Sync,
187        G: ToRedisArgs + Debug + Send + Sync,
188    >(
189        &mut self,
190        key: K,
191        group: G,
192    ) -> redis::RedisResult<bool> {
193        let groups = self.xinfo_groups(&key).await?;
194        let groups: Vec<Vec<Value>> = from_redis_value(&groups)?;
195        let groupname = group.to_redis_args().get(0).unwrap().to_owned();
196        let groupname = String::from_utf8(groupname).unwrap();
197        for group in groups {
198            let name = group.get(1).unwrap();
199            let name: String = from_redis_value(name).unwrap();
200            if name == groupname {
201                return Ok(true);
202            }
203        }
204        Ok(false)
205    }
206
207    #[tracing::instrument]
208    pub async fn xinfo_consumers<
209        K: ToRedisArgs + Debug + Send + Sync,
210        G: ToRedisArgs + Debug + Send + Sync,
211    >(
212        &mut self,
213        key: K,
214        group: G,
215    ) -> redis::RedisResult<Value> {
216        #[cfg(feature = "cluster")]
217        let v = self.client.get_connection()?.xinfo_consumers(key, group)?;
218
219        #[cfg(feature = "single")]
220        let v = self.client.xinfo_consumers(key, group).await?;
221        Ok(v)
222    }
223
224    #[tracing::instrument]
225    pub async fn xclaim_auto<
226        K: ToRedisArgs + Debug + Send + Sync,
227        G: ToRedisArgs + Debug + Send + Sync,
228        C: ToRedisArgs + Debug + Send + Sync,
229        MIT: ToRedisArgs + Debug + Send + Sync,
230    >(
231        &mut self,
232        key: K,
233        group: G,
234        consumer: C,
235        min_idle_time: MIT,
236    ) -> redis::RedisResult<Value> {
237        let data = self.xpending_one(&key, &group).await?;
238        let id = self.get_id(&data).await?;
239        let ids = vec![id];
240        let mut opts = redis::streams::StreamClaimOptions::default();
241        opts = opts.time(60000);
242        opts = opts.idle(60000);
243
244        #[cfg(feature = "cluster")]
245        let v = self.client.get_connection()?.xclaim_options(
246            key,
247            group,
248            consumer,
249            min_idle_time,
250            &ids,
251            opts,
252        )?;
253
254        #[cfg(feature = "single")]
255        let v = self
256            .client
257            .xclaim_options(key, group, consumer, min_idle_time, &ids, opts)
258            .await?;
259        Ok(v)
260    }
261
262    #[tracing::instrument]
263    pub async fn xpending_one<
264        K: ToRedisArgs + Debug + Send + Sync,
265        G: ToRedisArgs + Debug + Send + Sync,
266    >(
267        &mut self,
268        key: K,
269        group: G,
270    ) -> redis::RedisResult<Value> {
271        #[cfg(feature = "cluster")]
272        let d: Value = self
273            .client
274            .get_connection()?
275            .xpending_count(key, group, "-", "+", 1)?;
276
277        #[cfg(feature = "single")]
278        let d: Value = self.client.xpending_count(key, group, "-", "+", 1).await?;
279        Ok(d)
280    }
281
282    #[tracing::instrument]
283    pub async fn get_id(&self, data: &Value) -> redis::RedisResult<String> {
284        let data: Vec<Vec<Value>> = from_redis_value(&data)?;
285        if data.len() > 0 {
286            let data = data.get(0).unwrap();
287            if data.len() > 0 {
288                let data = data.get(0).unwrap();
289                let data = from_redis_value(data)?;
290                return Ok(data);
291            }
292        }
293        Ok(String::new())
294    }
295
296    #[tracing::instrument]
297    pub async fn xack<
298        K: ToRedisArgs + Debug + Send + Sync,
299        G: ToRedisArgs + Debug + Send + Sync,
300        I: ToRedisArgs + Debug + Send + Sync,
301    >(
302        &mut self,
303        key: K,
304        group: G,
305        ids: &Vec<I>,
306    ) -> redis::RedisResult<u64> {
307        #[cfg(feature = "cluster")]
308        let ack_count = self.client.get_connection()?.xack(key, group, ids)?;
309
310        #[cfg(feature = "single")]
311        let ack_count = self.client.xack(key, group, ids).await?;
312        Ok(ack_count)
313    }
314
315    #[tracing::instrument]
316    pub async fn xack2del<
317        K: ToRedisArgs + Debug + Send + Sync,
318        G: ToRedisArgs + Debug + Send + Sync,
319        I: ToRedisArgs + Debug + Send + Sync,
320    >(
321        &mut self,
322        key: K,
323        group: G,
324        ids: &Vec<I>,
325    ) -> redis::RedisResult<(u64, u64)> {
326        #[cfg(feature = "cluster")]
327        let ack_count = self.client.get_connection()?.xack(&key, group, ids)?;
328
329        #[cfg(feature = "single")]
330        let ack_count = self.client.xack(&key, group, ids).await?;
331        let del_count = self.xdel(key, ids).await?;
332        Ok((ack_count, del_count))
333    }
334
335    ///
336    #[tracing::instrument]
337    pub async fn xdel<
338        K: ToRedisArgs + Debug + Send + Sync,
339        I: ToRedisArgs + Debug + Send + Sync,
340    >(
341        &mut self,
342        key: K,
343        ids: &Vec<I>,
344    ) -> redis::RedisResult<u64> {
345        #[cfg(feature = "cluster")]
346        let del_count = self.client.get_connection()?.xdel(key, ids)?;
347
348        #[cfg(feature = "single")]
349        let del_count = self.client.xdel(key, ids).await?;
350        Ok(del_count)
351    }
352
353    #[cfg(feature = "cluster")]
354    pub async fn ping(&mut self) -> redis::RedisResult<bool> {
355        let ok = self.client.get_connection()?.check_connection();
356        Ok(ok)
357    }
358}