br_cache/
redis.rs

1use std::collections::{HashSet};
2use std::num::NonZeroUsize;
3use std::sync::mpsc::{Sender};
4use crate::{CacheBase, Connection as CacheConnection};
5use json::{array, object, JsonValue};
6use log::error;
7use r2d2::{Pool, PooledConnection};
8use redis::{geo, Client, Commands, FromRedisValue, RedisResult};
9use redis::streams::{StreamInfoGroupsReply, StreamInfoStreamReply, StreamRangeReply, StreamReadReply};
10
11#[derive(Clone)]
12pub struct Redis {
13    client: Pool<Client>,
14    pubsub_client: Client,
15    db: i8,
16}
17
18impl Redis {
19    pub fn connect(connection: CacheConnection) -> Result<Self, String> {
20        let dsn = if connection.userpass.is_empty() {
21            format!("redis://{}:{}/", connection.hostname, connection.hostport)
22        } else {
23            format!("redis://:{}@{}:{}/", connection.userpass, connection.hostname, connection.hostport)
24        };
25        let client = Client::open(dsn.clone()).map_err(|e| format!("Redis Client error: {e}"))?;
26        let pool = Pool::builder().max_size(64).connection_timeout(std::time::Duration::from_secs(2)).build(client).map_err(|e| format!("Redis Pool build error: {e}"))?;
27        let pubsub_client = Client::open(dsn).map_err(|e| format!("Redis PubSub Client error: {e}"))?;
28        Ok(Self { client: pool, pubsub_client, db: 0 })
29    }
30    pub fn con(&mut self) -> Result<PooledConnection<Client>, String> {
31        let mut conn = match self.client.get() {
32            Ok(e) => e,
33            Err(err) => return Err(format!("Redis Connection failed: {err}")),
34        };
35        redis::pipe().cmd("SELECT").arg(self.db).exec(&mut conn).unwrap();
36        Ok(conn)
37    }
38}
39
40impl CacheBase for Redis {
41    /// 切换到指定的数据库
42    fn db(&mut self, db: i8) -> &mut Self {
43        self.db = db;
44        self
45    }
46
47    fn key_exists(&mut self, key: &str) -> Result<bool, String> {
48        let data: RedisResult<bool> = Commands::exists(&mut self.con()?, key);
49        match data {
50            Ok(data) => Ok(data),
51            Err(e) => Err(format!("判断是否存在失败: {e}")),
52        }
53    }
54
55    fn key_del(&mut self, key: &str) -> Result<bool, String> {
56        let data: RedisResult<bool> = Commands::del(&mut self.con()?, key);
57        match data {
58            Ok(e) => Ok(e),
59            Err(e) => Err(format!("删除: {e}"))
60        }
61    }
62
63    fn key_ttl(&mut self, key: &str) -> Result<i64, String> {
64        let data: RedisResult<i64> = Commands::ttl(&mut self.con()?, key);
65        match data {
66            Ok(e) => Ok(e),
67            Err(e) => Err(format!("获取剩余时间失败: {e}"))
68        }
69    }
70
71    fn key_set_expireat(&mut self, key: &str, timestamp: i64) -> Result<bool, String> {
72        let data: RedisResult<bool> = Commands::expire_at(&mut self.con()?, key, timestamp);
73        match data {
74            Ok(e) => Ok(e),
75            Err(e) => Err(format!("key_set_expireat: {e}"))
76        }
77    }
78
79    fn key_set_seconds(&mut self, key: &str, s: i64) -> Result<bool, String> {
80        let data: RedisResult<bool> = Commands::expire(&mut self.con()?, key, s);
81        match data {
82            Ok(e) => Ok(e),
83            Err(e) => Err(format!("key_set_expireat: {e}"))
84        }
85    }
86
87    fn key_del_expire(&mut self, key: &str) -> Result<bool, String> {
88        let data: RedisResult<bool> = Commands::persist(&mut self.con()?, key);
89        match data {
90            Ok(e) => Ok(e),
91            Err(e) => Err(format!("key_del_expire: {e}"))
92        }
93    }
94    fn key_query(&mut self, key: &str) -> Result<JsonValue, String> {
95        let data: RedisResult<Vec<String>> = Commands::keys(&mut self.con()?, key);
96        match data {
97            Ok(e) => Ok(JsonValue::from(e)),
98            Err(e) => Err(format!("查询KEYS失败: {e}")),
99        }
100    }
101    /// 设置
102    ///
103    /// * expiration_date 过期时间 s 秒
104    fn add(&mut self, key: &str, value: JsonValue, expiration_date: u64) -> Result<bool, String> {
105        let data: RedisResult<bool> = {
106            if expiration_date > 0 {
107                Commands::set_ex(&mut self.con()?, key, value.to_string(), expiration_date)
108            } else {
109                Commands::set(&mut self.con()?, key, value.to_string())
110            }
111        };
112        match data {
113            Ok(e) => Ok(e),
114            Err(e) => Err(format!("设置缓存失败: {e}"))
115        }
116    }
117    /// 获取
118    fn get(&mut self, key: &str) -> Result<JsonValue, String> {
119        let data: RedisResult<String> = Commands::get(&mut self.con()?, key);
120        match data {
121            Ok(e) => {
122                match json::parse(&e) {
123                    Ok(json) => Ok(json),
124                    Err(_) => Ok(JsonValue::from(e)),
125                }
126            }
127            Err(e) => Err(format!("获取失败: {e}")),
128        }
129    }
130
131
132    fn set_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
133        let data: RedisResult<bool> = Commands::sadd(&mut self.con()?, key, value.to_string());
134        match data {
135            Ok(e) => {
136                if e && expiry_s > 0 {
137                    return self.key_set_seconds(key, expiry_s);
138                }
139                Ok(e)
140            }
141            Err(e) => Err(format!("集合添加: {e}")),
142        }
143    }
144    fn set_count(&mut self, key: &str) -> Result<usize, String> {
145        let data: RedisResult<usize> = Commands::scard(&mut self.con()?, key);
146        match data {
147            Ok(e) => Ok(e),
148            Err(e) => Err(format!("集合: {e}")),
149        }
150    }
151
152    /// 集合 获取
153    fn set_get(&mut self, key: &str) -> Result<JsonValue, String> {
154        let data: RedisResult<Vec<String>> = Commands::smembers(&mut self.con()?, key);
155        match data {
156            Ok(e) => {
157                let mut list = array![];
158                for item in e.iter() {
159                    let data = JsonValue::from(item.clone());
160                    let json = json::parse(item).unwrap_or(data);
161                    let _ = list.push(json);
162                }
163                Ok(list)
164            }
165            Err(e) => Err(format!("集合查询: {e}")),
166        }
167    }
168    /// 集合 删除
169    fn set_delete(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
170        let data: RedisResult<bool> = Commands::srem(&mut self.con()?, key, value.to_string());
171        match data {
172            Ok(e) => Ok(e),
173            Err(e) => Err(format!("集合删除: {e}")),
174        }
175    }
176
177    fn set_get_sinter(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
178        let data: RedisResult<HashSet<String>> = Commands::sinter(&mut self.con()?, keys);
179        match data {
180            Ok(e) => {
181                let mut list = array![];
182                for item in e {
183                    let _ = list.push(json::parse(&item).unwrap_or(JsonValue::Null));
184                }
185                Ok(list)
186            }
187            Err(e) => Err(format!("集合删除: {e}")),
188        }
189    }
190
191    fn set_get_sunion(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
192        let data: RedisResult<HashSet<String>> = Commands::sunion(&mut self.con()?, keys);
193        match data {
194            Ok(e) => {
195                let mut list = array![];
196                for item in e {
197                    let _ = list.push(json::parse(&item).unwrap_or(JsonValue::Null));
198                }
199                Ok(list)
200            }
201            Err(e) => Err(format!("集合删除: {e}")),
202        }
203    }
204
205    fn list_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
206        let data: RedisResult<bool> = Commands::lpush(&mut self.con()?, key, value.to_string());
207        match data {
208            Ok(e) => {
209                if e && expiry_s > 0 {
210                    return self.key_set_seconds(key, expiry_s);
211                }
212                Ok(e)
213            }
214            Err(e) => Err(format!("集合删除: {e}")),
215        }
216    }
217
218    fn list_del(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
219        let data: RedisResult<bool> = Commands::lrem(&mut self.con()?, key, 0, value.to_string());
220        match data {
221            Ok(e) => Ok(e),
222            Err(e) => Err(format!("集合删除: {e}")),
223        }
224    }
225
226    /// 从列表左侧添加元素
227    fn list_lpush(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
228        let data: RedisResult<bool> = Commands::lpush(&mut self.con()?, key, value.to_string());
229        match data {
230            Ok(e) => {
231                if e && expiry_s > 0 {
232                    return self.key_set_seconds(key, expiry_s);
233                }
234                Ok(e)
235            }
236            Err(e) => Err(format!("列表左侧添加失败: {e}")),
237        }
238    }
239
240    /// 从列表右侧添加元素
241    fn list_rpush(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
242        let data: RedisResult<bool> = Commands::rpush(&mut self.con()?, key, value.to_string());
243        match data {
244            Ok(e) => {
245                if e && expiry_s > 0 {
246                    return self.key_set_seconds(key, expiry_s);
247                }
248                Ok(e)
249            }
250            Err(e) => Err(format!("列表右侧添加失败: {e}")),
251        }
252    }
253
254    /// 从列表左侧弹出元素
255    fn list_lpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
256        let strings = if count <= 1 {
257            // 弹出单个元素
258            let data: RedisResult<Option<String>> = Commands::lpop(&mut self.con()?, key, None::<NonZeroUsize>);
259            match data {
260                Ok(Some(value)) => vec![value],
261                Ok(None) => vec![],
262                Err(e) => return Err(format!("列表左侧弹出失败: {e}")),
263            }
264        } else {
265            // 弹出多个元素
266            let non_zero = NonZeroUsize::new(count).unwrap();
267            let data: RedisResult<Vec<String>> = Commands::lpop(&mut self.con()?, key, Some(non_zero));
268            match data {
269                Ok(values) => values,
270                Err(e) => return Err(format!("列表左侧弹出多个失败: {e}")),
271            }
272        };
273
274        // 统一进行JSON解析
275        strings.into_iter().map(|s| json::parse(&s).map_err(|e| format!("解析JSON失败: {e}"))).collect()
276    }
277
278    /// 从列表右侧弹出元素
279    fn list_rpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
280        let strings = if count <= 1 {
281            // 弹出单个元素
282            let data: RedisResult<Option<String>> = Commands::rpop(&mut self.con()?, key, None::<NonZeroUsize>);
283            match data {
284                Ok(Some(value)) => vec![value],
285                Ok(None) => vec![],
286                Err(e) => return Err(format!("列表右侧弹出失败: {e}")),
287            }
288        } else {
289            // 弹出多个元素
290            let non_zero = NonZeroUsize::new(count).unwrap();
291            let data: RedisResult<Vec<String>> = Commands::rpop(&mut self.con()?, key, Some(non_zero));
292            match data {
293                Ok(values) => values,
294                Err(e) => return Err(format!("列表右侧弹出多个失败: {e}")),
295            }
296        };
297
298        // 统一进行JSON解析
299        strings.into_iter().map(|s| json::parse(&s).map_err(|e| format!("解析JSON失败: {e}"))).collect()
300    }
301
302    /// 获取列表长度
303    fn list_len(&mut self, key: &str) -> Result<usize, String> {
304        let data: RedisResult<usize> = Commands::llen(&mut self.con()?, key);
305        match data {
306            Ok(e) => Ok(e),
307            Err(e) => Err(format!("获取列表长度失败: {e}")),
308        }
309    }
310
311    /// 获取列表指定范围的元素
312    fn list_range(&mut self, key: &str, start: isize, stop: isize) -> Result<Vec<JsonValue>, String> {
313        let data: RedisResult<Vec<String>> = Commands::lrange(&mut self.con()?, key, start, stop);
314        match data {
315            Ok(e) => {
316                // 统一进行JSON解析
317                e.into_iter().map(|s| json::parse(&s).map_err(|e| format!("解析JSON失败: {e}"))).collect()
318            }
319            Err(e) => Err(format!("获取列表范围失败: {e}")),
320        }
321    }
322
323    /// 获取列表所有元素
324    fn list_all(&mut self, key: &str) -> Result<Vec<JsonValue>, String> {
325        self.list_range(key, 0, -1)
326    }
327
328    /// 根据索引获取列表元素
329    fn list_get(&mut self, key: &str, index: isize) -> Result<JsonValue, String> {
330        let data: RedisResult<Option<String>> = Commands::lindex(&mut self.con()?, key, index);
331
332        match data {
333            Ok(Some(s)) => {
334                match json::parse(&s) {
335                    Ok(val) => Ok(val),
336                    Err(e) => Err(format!("解析 JSON 失败: {e}")),
337                }
338            }
339            Ok(None) => Ok(JsonValue::Null), // 没有值,返回 JSON 的 `null`
340            Err(e) => Err(format!("获取列表元素失败: {e}")),
341        }
342    }
343
344    /// 修剪列表,只保留指定范围内的元素
345    fn list_trim(&mut self, key: &str, start: isize, stop: isize) -> Result<bool, String> {
346        let data: RedisResult<()> = Commands::ltrim(&mut self.con()?, key, start, stop);
347        match data {
348            Ok(()) => Ok(true),
349            Err(e) => Err(format!("修剪列表失败: {e}")),
350        }
351    }
352
353    /// 设置指定索引位置的元素值
354    fn list_set(&mut self, key: &str, index: isize, value: JsonValue) -> Result<bool, String> {
355        let data: RedisResult<()> = Commands::lset(&mut self.con()?, key, index, value.to_string());
356        match data {
357            Ok(()) => Ok(true),
358            Err(e) => Err(format!("设置列表元素失败: {e}")),
359        }
360    }
361
362    /// 删除列表中指定值的元素(count=0表示删除所有匹配项)
363    fn list_remove(&mut self, key: &str, value: JsonValue, count: isize) -> Result<isize, String> {
364        let data: RedisResult<isize> = Commands::lrem(&mut self.con()?, key, count, value.to_string());
365        match data {
366            Ok(e) => Ok(e),
367            Err(e) => Err(format!("删除列表元素失败: {e}")),
368        }
369    }
370
371    /// 哈希 获取对象集合
372    fn hash_get(&mut self, key: &str) -> Result<JsonValue, String> {
373        let data: RedisResult<Vec<String>> = Commands::hgetall(&mut self.con()?, key);
374        match data {
375            Ok(e) => {
376                let mut list = object! {};
377                let mut index = 0;
378                while index < e.len() {
379                    list[e[index].to_string()] = e[index + 1].clone().into();
380                    index += 2;
381                }
382                Ok(list)
383            }
384            Err(e) => Err(format!("{e}")),
385        }
386    }
387
388    /// 哈希-添加
389    fn hash_add(&mut self, key: &str, field: &str, value: JsonValue) -> Result<bool, String> {
390        let res: RedisResult<bool> = Commands::hexists(&mut self.con()?, key, field);
391        let ists = res.unwrap_or(false);
392        if ists {
393            let res = self.hash_delete(key, field)?;
394            if res {
395                let data: RedisResult<bool> = Commands::hset(&mut self.con()?, key, field, value.to_string());
396                match data {
397                    Ok(e) => Ok(e),
398                    Err(e) => {
399                        error!("{e}");
400                        Err(format!("设置哈希类型缓存失败: {e}"))
401                    }
402                }
403            } else {
404                Ok(false)
405            }
406        } else {
407            let data: RedisResult<bool> = Commands::hset_nx(&mut self.con()?, key, field, value.to_string());
408            match data {
409                Ok(e) => Ok(e),
410                Err(e) => {
411                    error!("{e}");
412                    Err(format!("设置哈希类型缓存失败: {e}"))
413                }
414            }
415        }
416    }
417
418    fn hash_get_field_value(&mut self, key: &str, field: &str) -> Result<JsonValue, String> {
419        let data: RedisResult<String> = Commands::hget(&mut self.con()?, key, field);
420        match data {
421            Ok(e) => Ok(json::parse(e.as_str()).unwrap_or(JsonValue::Null)),
422            Err(e) => {
423                error!("{e}");
424                Err(format!("设置哈希类型缓存失败: {e}"))
425            }
426        }
427    }
428
429    fn hash_get_fields(&mut self, key: &str) -> Result<JsonValue, String> {
430        let data: RedisResult<Vec<String>> = Commands::hkeys(&mut self.con()?, key);
431        match data {
432            Ok(e) => Ok(e.into()),
433            Err(e) => {
434                error!("{e}");
435                Err(format!("获取哈希类型缓存失败: {e}"))
436            }
437        }
438    }
439
440    fn hash_delete(&mut self, key: &str, field: &str) -> Result<bool, String> {
441        let data: RedisResult<bool> = Commands::hdel(&mut self.con()?, key, field);
442        match data {
443            Ok(e) => Ok(e),
444            Err(e) => {
445                error!("{e}");
446                Err(format!("设置哈希类型缓存失败: {e}"))
447            }
448        }
449    }
450
451    fn hash_get_values(&mut self, key: &str) -> Result<JsonValue, String> {
452        let data: RedisResult<Vec<String>> = Commands::hvals(&mut self.con()?, key);
453        match data {
454            Ok(e) => {
455                let mut list = array![];
456                for item in e.iter() {
457                    list.push(json::parse(item).unwrap_or(object! {})).unwrap();
458                }
459                Ok(list)
460            }
461            Err(e) => {
462                error!("{e}");
463                Err(format!("获取哈希类型缓存失败: {e}"))
464            }
465        }
466    }
467
468    fn geo_add(&mut self, key: &str, longitude: f64, latitude: f64, value: JsonValue) -> Result<bool, String> {
469        let data: RedisResult<bool> = Commands::geo_add(&mut self.con()?, key, &[
470            (longitude.to_string(), latitude.to_string(), value.to_string())
471        ]);
472        match data {
473            Ok(e) => Ok(e),
474            Err(e) => {
475                error!("{e}");
476                Err(format!("设置哈希类型缓存失败: {e}"))
477            }
478        }
479    }
480
481    fn geo_get(&mut self, key: &str, value: JsonValue) -> Result<JsonValue, String> {
482        let data: RedisResult<Vec<Option<geo::Coord<f64>>>> = Commands::geo_pos(&mut self.con()?, key, &[
483            value.to_string().clone()
484        ]);
485        match data {
486            Ok(e) => {
487                let mut data = object! {};
488                for item in e.into_iter().flatten() {
489                    let t = item;
490                    data["latitude"] = t.latitude.into();
491                    data["longitude"] = t.longitude.into();
492                    data["name"] = value.clone();
493                }
494                Ok(data)
495            }
496            Err(e) => {
497                error!("{e}");
498                Err(format!("设置哈希类型缓存失败: {e}"))
499            }
500        }
501    }
502
503    fn geo_dist(&mut self, _key: &str, _value1: JsonValue, _value2: JsonValue) -> Result<JsonValue, String> {
504        todo!()
505    }
506
507    fn geo_radius(&mut self, _key: &str, _value: JsonValue, _radius: &str) -> Result<JsonValue, String> {
508        todo!()
509    }
510
511    /// 设置消息队列
512    fn stream_add(&mut self, key: &str, msg_id: &str, field: &str, value: JsonValue) -> Result<String, String> {
513        let data: RedisResult<Option<String>> = Commands::xadd(&mut self.con()?, key, msg_id, &[(field, value.to_string())]);
514        match data {
515            Ok(e) => Ok(e.unwrap().to_string()),
516            Err(e) => Err(format!("{e}")),
517        }
518    }
519
520    fn stream_count(&mut self, key: &str) -> Result<usize, String> {
521        let data: RedisResult<usize> = Commands::xlen(&mut self.con()?, key);
522        match data {
523            Ok(e) => Ok(e),
524            Err(e) => Err(format!("{e}")),
525        }
526    }
527
528    /// 消息队列获取
529    fn stream_get(&mut self, key: &str) -> Result<JsonValue, String> {
530        let data: RedisResult<StreamRangeReply> = Commands::xrange(&mut self.con()?, key, "-", "+");
531        match data {
532            Ok(e) => {
533                let mut list = array![];
534                for id in e.ids.iter() {
535                    for (_, value) in id.map.clone() {
536                        let t = String::from_redis_value(&value).unwrap();
537                        let ids = object! {
538                                id:id.id.clone(),
539                                value:json::parse(t.as_str()).unwrap_or(t.into())
540                            };
541                        let _ = list.push(ids);
542                    }
543                }
544                Ok(list)
545            }
546            Err(e) => Err(format!("{e}")),
547        }
548    }
549
550
551    fn stream_del(&mut self, key: &str, id: &str) -> Result<bool, String> {
552        let data: RedisResult<bool> = Commands::xdel(&mut self.con()?, key, &[id]);
553        match data {
554            Ok(e) => Ok(e),
555            Err(e) => Err(format!("{e}")),
556        }
557    }
558
559
560    fn stream_group_create(&mut self, key: &str, group: &str) -> Result<bool, String> {
561        let data: RedisResult<bool> = Commands::xgroup_create(&mut self.con()?, key, group, "$");
562        match data {
563            Ok(e) => Ok(e),
564            Err(e) => Err(format!("{e}")),
565        }
566    }
567
568    fn stream_group_add_user(&mut self, key: &str, group: &str, user: &str) -> Result<bool, String> {
569        let data: RedisResult<StreamReadReply> = redis::cmd("XREADGROUP").arg("GROUP").arg(group).arg(user).arg("COUNT").arg(0) // 只注册,不消费
570                                                                         .arg("STREAMS").arg(key).arg(">").query(&mut self.con()?);
571        match data {
572            Ok(_) => Ok(true),
573            Err(e) => Err(format!("{e}")),
574        }
575    }
576
577    fn stream_group_del_user(&mut self, _key: &str, _group: &str, _user: &str) -> Result<bool, String> {
578        todo!()
579    }
580
581    fn stream_group_del(&mut self, key: &str, group: &str) -> Result<bool, String> {
582        let data: RedisResult<bool> = Commands::xgroup_destroy(&mut self.con()?, key, group);
583        match data {
584            Ok(e) => Ok(e),
585            Err(e) => Err(format!("{e}")),
586        }
587    }
588
589    fn stream_group_msg(&mut self, key: &str, group: &str, user: &str) -> Result<JsonValue, String> {
590        let data: RedisResult<StreamReadReply> = redis::cmd("XREADGROUP").arg("GROUP").arg(group).arg(user).arg("COUNT").arg(2) // 只注册,不消费
591                                                                         .arg("STREAMS").arg(key).arg(">").query(&mut self.con()?);
592        match data {
593            Ok(e) => {
594                let mut list = array![];
595                for item in e.keys.iter().cloned() {
596                    for id in item.ids.iter() {
597                        for (_key, value) in id.map.clone() {
598                            let t = String::from_redis_value(&value).unwrap();
599                            let ids = object! {
600                                key:item.key.clone(),
601                                id:id.id.clone(),
602                                value:json::parse(t.as_str()).unwrap_or(t.into())
603                        };
604                            let _ = list.push(ids);
605                        }
606                    }
607                }
608                Ok(list)
609            }
610            Err(e) => Err(format!("{e}")),
611        }
612    }
613
614    fn stream_get_group(&mut self, key: &str, group: &str) -> Result<bool, String> {
615        let data: RedisResult<StreamInfoGroupsReply> = Commands::xinfo_groups(&mut self.con()?, key);
616        match data {
617            Ok(e) => {
618                for item in e.groups {
619                    let name = item.name.clone();
620                    if name == group {
621                        return Ok(true);
622                    }
623                }
624                Ok(false)
625            }
626            Err(e) => Err(format!("{e}")),
627        }
628    }
629
630
631    fn stream_get_stream(&mut self, key: &str) -> Result<JsonValue, String> {
632        let data: RedisResult<StreamInfoStreamReply> = Commands::xinfo_stream(&mut self.con()?, key);
633        match data {
634            Ok(e) => {
635                let info = object! {
636                    length:e.length,
637                    groups:e.groups,
638                    last_generated_id:e.last_generated_id.clone(),
639                    first_entry:e.first_entry.id.clone(),
640                    last_entry:e.last_entry.id.clone(),
641                    radix_tree_keys:e.radix_tree_keys,
642                };
643                Ok(info)
644            }
645            Err(e) => Err(format!("{e}")),
646        }
647    }
648
649    fn subscribe(&mut self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
650        let mut con = self.pubsub_client
651                           .get_connection()
652                           .map_err(|e| format!("PubSub connect error: {e}"))?;
653
654        let mut pubsub = con.as_pubsub();
655        let res = pubsub.subscribe(&[key]);
656        match res {
657            Ok(()) => {
658                loop {
659                    let msg = match pubsub.get_message() {
660                        Ok(e) => e,
661                        Err(e) => return Err(e.to_string())
662                    };
663                    let payload: String = msg.get_payload().unwrap_or("".to_string());
664                    match tx.send(json::parse(&payload).unwrap_or(JsonValue::new_object())) {
665                        Ok(_) => {}
666                        Err(e) => return Err(e.to_string())
667                    }
668                }
669            }
670            Err(e) => Err(format!("{e}")),
671        }
672    }
673
674    fn publish(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
675        let data: RedisResult<usize> = Commands::publish(&mut self.con()?, key, value.to_string());
676        match data {
677            Ok(_) => Ok(true),
678            Err(e) => Err(format!("???{e}")),
679        }
680    }
681}