1use redis::aio::ConnectionManager;
2#[cfg(feature = "cluster")]
3use redis::cluster::ClusterClient;
4use redis::streams::StreamReadOptions;
5use redis::{from_redis_value, AsyncCommands, ToRedisArgs, Value};
6use std::fmt::{Debug, Formatter};
7
8#[derive(Clone)]
9pub struct RedisClient {
10 #[cfg(feature = "cluster")]
11 pub client: ClusterClient,
12 #[cfg(feature = "single")]
13 pub client: ConnectionManager,
14}
15
16impl Debug for RedisClient {
17 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
18 f.write_fmt(format_args!(""))
19 }
20}
21
22impl RedisClient {
23 #[tracing::instrument]
24 pub async fn get<K: ToRedisArgs + Debug + Send + Sync>(
25 &mut self,
26 key: K,
27 ) -> redis::RedisResult<String> {
28 #[cfg(feature = "cluster")]
29 let data = self.client.get_connection()?.get(key)?;
30
31 #[cfg(feature = "single")]
32 let data = self.client.get(key).await?;
33 Ok(data)
34 }
35
36 #[tracing::instrument]
37 pub async fn mget<K: ToRedisArgs + Debug + Send + Sync>(
38 &mut self,
39 key: K,
40 ) -> redis::RedisResult<Vec<String>> {
41 #[cfg(feature = "cluster")]
42 let data = self.client.get_connection()?.mget(key)?;
43
44 #[cfg(feature = "single")]
45 let data = self.client.mget(key).await?;
46 Ok(data)
47 }
48
49 #[tracing::instrument]
50 pub async fn exists<K: ToRedisArgs + Debug + Send + Sync>(
51 &mut self,
52 key: K,
53 ) -> redis::RedisResult<u8> {
54 #[cfg(feature = "cluster")]
55 let data = self.client.get_connection()?.exists(key)?;
56
57 #[cfg(feature = "single")]
58 let data = self.client.exists(key).await?;
59 Ok(data)
60 }
61
62 #[tracing::instrument]
63 pub async fn set<K: ToRedisArgs + Debug + Send + Sync, V: ToRedisArgs + Debug + Send + Sync>(
64 &mut self,
65 key: K,
66 value: V,
67 ) -> redis::RedisResult<bool> {
68 #[cfg(feature = "cluster")]
69 let ok = self.client.get_connection()?.set(key, value)?;
70
71 #[cfg(feature = "single")]
72 let ok = self.client.set(key, value).await?;
73 Ok(ok)
74 }
75
76 #[tracing::instrument]
77 pub async fn expire<K: ToRedisArgs + Debug + Send + Sync>(
78 &mut self,
79 key: K,
80 expire: usize,
81 ) -> redis::RedisResult<i8> {
82 #[cfg(feature = "cluster")]
83 let ok = self.client.get_connection()?.expire(key, expire)?;
84
85 #[cfg(feature = "single")]
86 let ok = self.client.expire(key, expire).await?;
87 Ok(ok)
88 }
89
90 #[tracing::instrument]
91 pub async fn del<K: ToRedisArgs + Debug + Send + Sync>(
92 &mut self,
93 key: K,
94 ) -> redis::RedisResult<u8> {
95 #[cfg(feature = "cluster")]
96 let ok = self.client.get_connection()?.del(key)?;
97
98 #[cfg(feature = "single")]
99 let ok = self.client.del(key).await?;
100 Ok(ok)
101 }
102
103 #[tracing::instrument]
104 pub async fn xadd<
105 K: ToRedisArgs + Debug + Send + Sync,
106 F: ToRedisArgs + Debug + Send + Sync,
107 V: ToRedisArgs + Debug + Send + Sync,
108 >(
109 &mut self,
110 key: K,
111 items: &[(F, V)],
112 ) -> redis::RedisResult<String> {
113 #[cfg(feature = "cluster")]
114 let ok = self.client.get_connection()?.xadd(key, "*", items)?;
115
116 #[cfg(feature = "single")]
117 let ok = self.client.xadd(key, "*", items).await?;
118 Ok(ok)
119 }
120
121 #[tracing::instrument]
123 pub async fn xgroup_create<
124 K: ToRedisArgs + Debug + Send + Sync,
125 G: ToRedisArgs + Debug + Send + Sync,
126 >(
127 &mut self,
128 key: K,
129 group: G,
130 ) -> redis::RedisResult<()> {
131 let ok = self.is_exist_group_name(&key, &group).await?;
132
133 if !ok {
134 #[cfg(feature = "cluster")]
135 self.client
136 .get_connection()?
137 .xgroup_create_mkstream(key, group, 0)?;
138
139 #[cfg(feature = "single")]
140 self.client.xgroup_create_mkstream(key, group, 0).await?;
141 }
142 Ok(())
143 }
144
145 #[tracing::instrument]
147 pub async fn xread_group(
148 &mut self,
149 key: &str,
150 group: &str,
151 consumer: &str,
152 ) -> redis::RedisResult<Value> {
153 let keys = vec![key];
154 let ids = vec![">"];
155 let mut opts = StreamReadOptions::default();
156 opts = opts.count(1);
157 opts = opts.group(group, consumer);
158 opts = opts.block(3000);
159 #[cfg(feature = "cluster")]
161 let data = self
162 .client
163 .get_connection()?
164 .xread_options(&keys, &ids, &opts)?;
165
166 #[cfg(feature = "single")]
167 let data = self.client.xread_options(&keys, &ids, &opts).await?;
168 Ok(data)
169 }
170
171 #[tracing::instrument]
172 pub async fn xinfo_groups<K: ToRedisArgs + Debug + Send + Sync>(
173 &mut self,
174 key: K,
175 ) -> redis::RedisResult<Value> {
176 #[cfg(feature = "cluster")]
177 let v = self.client.get_connection()?.xinfo_groups(key)?;
178
179 #[cfg(feature = "single")]
180 let v = self.client.xinfo_groups(key).await?;
181 Ok(v)
182 }
183
184 #[tracing::instrument]
185 pub async fn is_exist_group_name<
186 K: ToRedisArgs + Debug + Send + Sync,
187 G: ToRedisArgs + Debug + Send + Sync,
188 >(
189 &mut self,
190 key: K,
191 group: G,
192 ) -> redis::RedisResult<bool> {
193 let groups = self.xinfo_groups(&key).await?;
194 let groups: Vec<Vec<Value>> = from_redis_value(&groups)?;
195 let groupname = group.to_redis_args().get(0).unwrap().to_owned();
196 let groupname = String::from_utf8(groupname).unwrap();
197 for group in groups {
198 let name = group.get(1).unwrap();
199 let name: String = from_redis_value(name).unwrap();
200 if name == groupname {
201 return Ok(true);
202 }
203 }
204 Ok(false)
205 }
206
207 #[tracing::instrument]
208 pub async fn xinfo_consumers<
209 K: ToRedisArgs + Debug + Send + Sync,
210 G: ToRedisArgs + Debug + Send + Sync,
211 >(
212 &mut self,
213 key: K,
214 group: G,
215 ) -> redis::RedisResult<Value> {
216 #[cfg(feature = "cluster")]
217 let v = self.client.get_connection()?.xinfo_consumers(key, group)?;
218
219 #[cfg(feature = "single")]
220 let v = self.client.xinfo_consumers(key, group).await?;
221 Ok(v)
222 }
223
224 #[tracing::instrument]
225 pub async fn xclaim_auto<
226 K: ToRedisArgs + Debug + Send + Sync,
227 G: ToRedisArgs + Debug + Send + Sync,
228 C: ToRedisArgs + Debug + Send + Sync,
229 MIT: ToRedisArgs + Debug + Send + Sync,
230 >(
231 &mut self,
232 key: K,
233 group: G,
234 consumer: C,
235 min_idle_time: MIT,
236 ) -> redis::RedisResult<Value> {
237 let data = self.xpending_one(&key, &group).await?;
238 let id = self.get_id(&data).await?;
239 let ids = vec![id];
240 let mut opts = redis::streams::StreamClaimOptions::default();
241 opts = opts.time(60000);
242 opts = opts.idle(60000);
243
244 #[cfg(feature = "cluster")]
245 let v = self.client.get_connection()?.xclaim_options(
246 key,
247 group,
248 consumer,
249 min_idle_time,
250 &ids,
251 opts,
252 )?;
253
254 #[cfg(feature = "single")]
255 let v = self
256 .client
257 .xclaim_options(key, group, consumer, min_idle_time, &ids, opts)
258 .await?;
259 Ok(v)
260 }
261
262 #[tracing::instrument]
263 pub async fn xpending_one<
264 K: ToRedisArgs + Debug + Send + Sync,
265 G: ToRedisArgs + Debug + Send + Sync,
266 >(
267 &mut self,
268 key: K,
269 group: G,
270 ) -> redis::RedisResult<Value> {
271 #[cfg(feature = "cluster")]
272 let d: Value = self
273 .client
274 .get_connection()?
275 .xpending_count(key, group, "-", "+", 1)?;
276
277 #[cfg(feature = "single")]
278 let d: Value = self.client.xpending_count(key, group, "-", "+", 1).await?;
279 Ok(d)
280 }
281
282 #[tracing::instrument]
283 pub async fn get_id(&self, data: &Value) -> redis::RedisResult<String> {
284 let data: Vec<Vec<Value>> = from_redis_value(&data)?;
285 if data.len() > 0 {
286 let data = data.get(0).unwrap();
287 if data.len() > 0 {
288 let data = data.get(0).unwrap();
289 let data = from_redis_value(data)?;
290 return Ok(data);
291 }
292 }
293 Ok(String::new())
294 }
295
296 #[tracing::instrument]
297 pub async fn xack<
298 K: ToRedisArgs + Debug + Send + Sync,
299 G: ToRedisArgs + Debug + Send + Sync,
300 I: ToRedisArgs + Debug + Send + Sync,
301 >(
302 &mut self,
303 key: K,
304 group: G,
305 ids: &Vec<I>,
306 ) -> redis::RedisResult<u64> {
307 #[cfg(feature = "cluster")]
308 let ack_count = self.client.get_connection()?.xack(key, group, ids)?;
309
310 #[cfg(feature = "single")]
311 let ack_count = self.client.xack(key, group, ids).await?;
312 Ok(ack_count)
313 }
314
315 #[tracing::instrument]
316 pub async fn xack2del<
317 K: ToRedisArgs + Debug + Send + Sync,
318 G: ToRedisArgs + Debug + Send + Sync,
319 I: ToRedisArgs + Debug + Send + Sync,
320 >(
321 &mut self,
322 key: K,
323 group: G,
324 ids: &Vec<I>,
325 ) -> redis::RedisResult<(u64, u64)> {
326 #[cfg(feature = "cluster")]
327 let ack_count = self.client.get_connection()?.xack(&key, group, ids)?;
328
329 #[cfg(feature = "single")]
330 let ack_count = self.client.xack(&key, group, ids).await?;
331 let del_count = self.xdel(key, ids).await?;
332 Ok((ack_count, del_count))
333 }
334
335 #[tracing::instrument]
337 pub async fn xdel<
338 K: ToRedisArgs + Debug + Send + Sync,
339 I: ToRedisArgs + Debug + Send + Sync,
340 >(
341 &mut self,
342 key: K,
343 ids: &Vec<I>,
344 ) -> redis::RedisResult<u64> {
345 #[cfg(feature = "cluster")]
346 let del_count = self.client.get_connection()?.xdel(key, ids)?;
347
348 #[cfg(feature = "single")]
349 let del_count = self.client.xdel(key, ids).await?;
350 Ok(del_count)
351 }
352
353 #[cfg(feature = "cluster")]
354 pub async fn ping(&mut self) -> redis::RedisResult<bool> {
355 let ok = self.client.get_connection()?.check_connection();
356 Ok(ok)
357 }
358}