Skip to main content

br_cache/
lib.rs

1#[cfg(feature = "kafka")]
2pub use crate::config::KafkaConnection;
3pub use crate::config::{CacheMode, Config, Connection};
4#[cfg(feature = "kafka")]
5use crate::kafka::Kafka;
6#[cfg(feature = "redis")]
7use crate::redis::Redis;
8use json::JsonValue;
9use once_cell::sync::Lazy;
10use std::sync::mpsc::Sender;
11use std::sync::RwLock;
12
13pub mod config;
14#[cfg(feature = "kafka")]
15mod kafka;
16#[cfg(feature = "redis")]
17mod redis;
18
19static GLOBAL_CONFIG: Lazy<RwLock<Config>> = Lazy::new(|| RwLock::new(Config::default()));
20
21#[derive(Clone)]
22pub struct Cache {
23    #[cfg(feature = "redis")]
24    redis: Option<Redis>,
25    #[cfg(feature = "kafka")]
26    kafka: Option<Kafka>,
27}
28
29impl Cache {
30    pub fn new(config: Config) -> Self {
31        {
32            let mut data = GLOBAL_CONFIG.write().unwrap();
33            data.clone_from(&config);
34        }
35        let config = GLOBAL_CONFIG.read().unwrap();
36        let connection = config
37            .connections
38            .get(config.default.as_str())
39            .unwrap()
40            .clone();
41        Self::from_connection(connection)
42    }
43
44    pub fn create(name: &str, connection: Connection) -> Self {
45        {
46            let mut data = GLOBAL_CONFIG.write().unwrap();
47            if !data.connections.contains_key(name) {
48                data.connections.insert(name.to_string(), connection);
49            }
50            data.default = name.to_string();
51        }
52        let config = GLOBAL_CONFIG.read().unwrap();
53        let connection = config
54            .connections
55            .get(config.default.as_str())
56            .unwrap()
57            .clone();
58        Self::from_connection(connection)
59    }
60
61    fn from_connection(connection: Connection) -> Self {
62        #[cfg(feature = "redis")]
63        let redis = match Redis::connect(connection.clone()) {
64            Ok(r) => Some(r),
65            Err(e) => {
66                log::warn!("Redis 连接失败: {}", e);
67                None
68            }
69        };
70
71        #[cfg(feature = "kafka")]
72        let kafka = if let Some(kafka_conn) = connection.kafka {
73            match Kafka::connect(kafka_conn) {
74                Ok(k) => Some(k),
75                Err(e) => {
76                    log::warn!("Kafka 连接失败: {}", e);
77                    None
78                }
79            }
80        } else {
81            None
82        };
83
84        Self {
85            #[cfg(feature = "redis")]
86            redis,
87            #[cfg(feature = "kafka")]
88            kafka,
89        }
90    }
91
92    pub fn connections(&mut self) -> JsonValue {
93        let mut connections = vec![];
94        let data = GLOBAL_CONFIG.read().unwrap();
95        for (item, mut value) in data.connections.clone() {
96            if value.mode.str().is_empty() {
97                continue;
98            }
99            let mut t = value.json();
100            t["name"] = item.into();
101            connections.push(t);
102        }
103        connections.into()
104    }
105
106    pub fn connection(&mut self, name: &str) -> Self {
107        let mut data = GLOBAL_CONFIG.write().unwrap();
108        if data.connections.contains_key(name) {
109            if name == data.default {
110                return self.clone();
111            }
112            data.default = name.to_string();
113            let connection = data.connections.get(data.default.as_str()).unwrap().clone();
114            Self::from_connection(connection)
115        } else {
116            Self::none()
117        }
118    }
119
120    pub fn none() -> Self {
121        Self {
122            #[cfg(feature = "redis")]
123            redis: None,
124            #[cfg(feature = "kafka")]
125            kafka: None,
126        }
127    }
128
129    pub fn is_none(&self) -> bool {
130        #[cfg(all(feature = "redis", feature = "kafka"))]
131        {
132            return self.redis.is_none() && self.kafka.is_none();
133        }
134        #[cfg(all(feature = "redis", not(feature = "kafka")))]
135        {
136            return self.redis.is_none();
137        }
138        #[cfg(all(not(feature = "redis"), feature = "kafka"))]
139        {
140            return self.kafka.is_none();
141        }
142        #[cfg(all(not(feature = "redis"), not(feature = "kafka")))]
143        {
144            true
145        }
146    }
147
148    #[cfg(feature = "redis")]
149    pub fn is_redis(&self) -> bool {
150        self.redis.is_some()
151    }
152
153    #[cfg(not(feature = "redis"))]
154    pub fn is_redis(&self) -> bool {
155        false
156    }
157
158    #[cfg(feature = "kafka")]
159    pub fn is_kafka(&self) -> bool {
160        self.kafka.is_some()
161    }
162
163    #[cfg(not(feature = "kafka"))]
164    pub fn is_kafka(&self) -> bool {
165        false
166    }
167
168    pub fn mode(&self) -> &'static str {
169        #[cfg(feature = "kafka")]
170        if self.kafka.is_some() {
171            return "kafka";
172        }
173        #[cfg(feature = "redis")]
174        if self.redis.is_some() {
175            return "redis";
176        }
177        "none"
178    }
179
180    pub fn ping(&mut self) -> Result<JsonValue, String> {
181        let mut result = json::object! {
182            redis: false,
183            kafka: false
184        };
185
186        #[cfg(feature = "redis")]
187        if let Some(ref mut r) = self.redis {
188            result["redis"] = r.ping().unwrap_or(false).into();
189        }
190
191        #[cfg(feature = "kafka")]
192        if let Some(ref k) = self.kafka {
193            result["kafka"] = k.ping().unwrap_or(false).into();
194        }
195
196        Ok(result)
197    }
198
199    #[cfg(feature = "redis")]
200    pub fn redis_ping(&mut self) -> Result<bool, String> {
201        if let Some(ref mut r) = self.redis {
202            return r.ping();
203        }
204        Err("Redis未初始化".into())
205    }
206
207    #[cfg(feature = "redis")]
208    pub fn redis_publish(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
209        if let Some(ref mut r) = self.redis {
210            return r.publish(key, value);
211        }
212        Err("Redis未初始化".into())
213    }
214
215    #[cfg(feature = "kafka")]
216    pub fn kafka_ping(&self) -> Result<bool, String> {
217        if let Some(ref k) = self.kafka {
218            return k.ping();
219        }
220        Err("Kafka未初始化".into())
221    }
222
223    #[cfg(feature = "kafka")]
224    fn kafka_ref(&self) -> Result<&Kafka, String> {
225        self.kafka.as_ref().ok_or_else(|| "Kafka未初始化".into())
226    }
227}
228
229impl Cache {
230    pub fn db(&mut self, db: i8) -> &mut Self {
231        #[cfg(feature = "redis")]
232        if let Some(ref mut r) = self.redis {
233            r.db(db);
234        }
235        self
236    }
237
238    pub fn key_exists(&mut self, key: &str) -> Result<bool, String> {
239        #[cfg(feature = "redis")]
240        if let Some(ref mut r) = self.redis {
241            return r.key_exists(key);
242        }
243        Err("缓存未初始化".into())
244    }
245
246    pub fn key_del(&mut self, key: &str) -> Result<bool, String> {
247        #[cfg(feature = "redis")]
248        if let Some(ref mut r) = self.redis {
249            return r.key_del(key);
250        }
251        Err("缓存未初始化".into())
252    }
253
254    pub fn key_ttl(&mut self, key: &str) -> Result<i64, String> {
255        #[cfg(feature = "redis")]
256        if let Some(ref mut r) = self.redis {
257            return r.key_ttl(key);
258        }
259        Err("缓存未初始化".into())
260    }
261
262    pub fn key_set_expireat(&mut self, key: &str, timestamp: i64) -> Result<bool, String> {
263        #[cfg(feature = "redis")]
264        if let Some(ref mut r) = self.redis {
265            return r.key_set_expireat(key, timestamp);
266        }
267        Err("缓存未初始化".into())
268    }
269
270    pub fn key_set_seconds(&mut self, key: &str, s: i64) -> Result<bool, String> {
271        #[cfg(feature = "redis")]
272        if let Some(ref mut r) = self.redis {
273            return r.key_set_seconds(key, s);
274        }
275        Err("缓存未初始化".into())
276    }
277
278    pub fn key_del_expire(&mut self, key: &str) -> Result<bool, String> {
279        #[cfg(feature = "redis")]
280        if let Some(ref mut r) = self.redis {
281            return r.key_del_expire(key);
282        }
283        Err("缓存未初始化".into())
284    }
285
286    pub fn key_query(&mut self, key: &str) -> Result<JsonValue, String> {
287        #[cfg(feature = "redis")]
288        if let Some(ref mut r) = self.redis {
289            return r.key_query(key);
290        }
291        Err("缓存未初始化".into())
292    }
293
294    pub fn add(
295        &mut self,
296        key: &str,
297        value: JsonValue,
298        expiration_date: u64,
299    ) -> Result<bool, String> {
300        #[cfg(feature = "redis")]
301        if let Some(ref mut r) = self.redis {
302            return r.add(key, value, expiration_date);
303        }
304        Err("缓存未初始化".into())
305    }
306
307    pub fn get(&mut self, key: &str) -> Result<JsonValue, String> {
308        #[cfg(feature = "redis")]
309        if let Some(ref mut r) = self.redis {
310            return r.get(key);
311        }
312        Err("缓存未初始化".into())
313    }
314
315    pub fn set_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
316        #[cfg(feature = "redis")]
317        if let Some(ref mut r) = self.redis {
318            return r.set_add(key, value, expiry_s);
319        }
320        Err("缓存未初始化".into())
321    }
322
323    pub fn set_count(&mut self, key: &str) -> Result<usize, String> {
324        #[cfg(feature = "redis")]
325        if let Some(ref mut r) = self.redis {
326            return r.set_count(key);
327        }
328        Err("缓存未初始化".into())
329    }
330
331    pub fn set_get(&mut self, key: &str) -> Result<JsonValue, String> {
332        #[cfg(feature = "redis")]
333        if let Some(ref mut r) = self.redis {
334            return r.set_get(key);
335        }
336        Err("缓存未初始化".into())
337    }
338
339    pub fn set_delete(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
340        #[cfg(feature = "redis")]
341        if let Some(ref mut r) = self.redis {
342            return r.set_delete(key, value);
343        }
344        Err("缓存未初始化".into())
345    }
346
347    pub fn set_get_sinter(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
348        #[cfg(feature = "redis")]
349        if let Some(ref mut r) = self.redis {
350            return r.set_get_sinter(keys);
351        }
352        Err("缓存未初始化".into())
353    }
354
355    pub fn set_get_sunion(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
356        #[cfg(feature = "redis")]
357        if let Some(ref mut r) = self.redis {
358            return r.set_get_sunion(keys);
359        }
360        Err("缓存未初始化".into())
361    }
362
363    pub fn list_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
364        #[cfg(feature = "redis")]
365        if let Some(ref mut r) = self.redis {
366            return r.list_add(key, value, expiry_s);
367        }
368        Err("缓存未初始化".into())
369    }
370
371    pub fn list_del(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
372        #[cfg(feature = "redis")]
373        if let Some(ref mut r) = self.redis {
374            return r.list_del(key, value);
375        }
376        Err("缓存未初始化".into())
377    }
378
379    pub fn list_lpush(
380        &mut self,
381        key: &str,
382        value: JsonValue,
383        expiry_s: i64,
384    ) -> Result<bool, String> {
385        #[cfg(feature = "redis")]
386        if let Some(ref mut r) = self.redis {
387            return r.list_lpush(key, value, expiry_s);
388        }
389        Err("缓存未初始化".into())
390    }
391
392    pub fn list_rpush(
393        &mut self,
394        key: &str,
395        value: JsonValue,
396        expiry_s: i64,
397    ) -> Result<bool, String> {
398        #[cfg(feature = "redis")]
399        if let Some(ref mut r) = self.redis {
400            return r.list_rpush(key, value, expiry_s);
401        }
402        Err("缓存未初始化".into())
403    }
404
405    pub fn list_lpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
406        #[cfg(feature = "redis")]
407        if let Some(ref mut r) = self.redis {
408            return r.list_lpop(key, count);
409        }
410        Err("缓存未初始化".into())
411    }
412
413    pub fn list_rpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
414        #[cfg(feature = "redis")]
415        if let Some(ref mut r) = self.redis {
416            return r.list_rpop(key, count);
417        }
418        Err("缓存未初始化".into())
419    }
420
421    pub fn list_len(&mut self, key: &str) -> Result<usize, String> {
422        #[cfg(feature = "redis")]
423        if let Some(ref mut r) = self.redis {
424            return r.list_len(key);
425        }
426        Err("缓存未初始化".into())
427    }
428
429    pub fn list_range(
430        &mut self,
431        key: &str,
432        start: isize,
433        stop: isize,
434    ) -> Result<Vec<JsonValue>, String> {
435        #[cfg(feature = "redis")]
436        if let Some(ref mut r) = self.redis {
437            return r.list_range(key, start, stop);
438        }
439        Err("缓存未初始化".into())
440    }
441
442    pub fn list_all(&mut self, key: &str) -> Result<Vec<JsonValue>, String> {
443        #[cfg(feature = "redis")]
444        if let Some(ref mut r) = self.redis {
445            return r.list_all(key);
446        }
447        Err("缓存未初始化".into())
448    }
449
450    pub fn list_get(&mut self, key: &str, index: isize) -> Result<JsonValue, String> {
451        #[cfg(feature = "redis")]
452        if let Some(ref mut r) = self.redis {
453            return r.list_get(key, index);
454        }
455        Err("缓存未初始化".into())
456    }
457
458    pub fn list_trim(&mut self, key: &str, start: isize, stop: isize) -> Result<bool, String> {
459        #[cfg(feature = "redis")]
460        if let Some(ref mut r) = self.redis {
461            return r.list_trim(key, start, stop);
462        }
463        Err("缓存未初始化".into())
464    }
465
466    pub fn list_set(&mut self, key: &str, index: isize, value: JsonValue) -> Result<bool, String> {
467        #[cfg(feature = "redis")]
468        if let Some(ref mut r) = self.redis {
469            return r.list_set(key, index, value);
470        }
471        Err("缓存未初始化".into())
472    }
473
474    pub fn list_remove(
475        &mut self,
476        key: &str,
477        value: JsonValue,
478        count: isize,
479    ) -> Result<isize, String> {
480        #[cfg(feature = "redis")]
481        if let Some(ref mut r) = self.redis {
482            return r.list_remove(key, value, count);
483        }
484        Err("缓存未初始化".into())
485    }
486
487    pub fn hash_get(&mut self, key: &str) -> Result<JsonValue, String> {
488        #[cfg(feature = "redis")]
489        if let Some(ref mut r) = self.redis {
490            return r.hash_get(key);
491        }
492        Err("缓存未初始化".into())
493    }
494
495    pub fn hash_add(&mut self, key: &str, field: &str, value: JsonValue) -> Result<bool, String> {
496        #[cfg(feature = "redis")]
497        if let Some(ref mut r) = self.redis {
498            return r.hash_add(key, field, value);
499        }
500        Err("缓存未初始化".into())
501    }
502
503    pub fn hash_get_field_value(&mut self, key: &str, field: &str) -> Result<JsonValue, String> {
504        #[cfg(feature = "redis")]
505        if let Some(ref mut r) = self.redis {
506            return r.hash_get_field_value(key, field);
507        }
508        Err("缓存未初始化".into())
509    }
510
511    pub fn hash_get_fields(&mut self, key: &str) -> Result<JsonValue, String> {
512        #[cfg(feature = "redis")]
513        if let Some(ref mut r) = self.redis {
514            return r.hash_get_fields(key);
515        }
516        Err("缓存未初始化".into())
517    }
518
519    pub fn hash_delete(&mut self, key: &str, field: &str) -> Result<bool, String> {
520        #[cfg(feature = "redis")]
521        if let Some(ref mut r) = self.redis {
522            return r.hash_delete(key, field);
523        }
524        Err("缓存未初始化".into())
525    }
526
527    pub fn hash_get_values(&mut self, key: &str) -> Result<JsonValue, String> {
528        #[cfg(feature = "redis")]
529        if let Some(ref mut r) = self.redis {
530            return r.hash_get_values(key);
531        }
532        Err("缓存未初始化".into())
533    }
534
535    pub fn geo_add(
536        &mut self,
537        key: &str,
538        longitude: f64,
539        latitude: f64,
540        value: JsonValue,
541    ) -> Result<bool, String> {
542        #[cfg(feature = "redis")]
543        if let Some(ref mut r) = self.redis {
544            return r.geo_add(key, longitude, latitude, value);
545        }
546        Err("缓存未初始化".into())
547    }
548
549    pub fn geo_get(&mut self, key: &str, value: JsonValue) -> Result<JsonValue, String> {
550        #[cfg(feature = "redis")]
551        if let Some(ref mut r) = self.redis {
552            return r.geo_get(key, value);
553        }
554        Err("缓存未初始化".into())
555    }
556
557    pub fn geo_dist(
558        &mut self,
559        key: &str,
560        value1: JsonValue,
561        value2: JsonValue,
562    ) -> Result<JsonValue, String> {
563        #[cfg(feature = "redis")]
564        if let Some(ref mut r) = self.redis {
565            return r.geo_dist(key, value1, value2);
566        }
567        Err("缓存未初始化".into())
568    }
569
570    pub fn geo_radius(
571        &mut self,
572        key: &str,
573        value: JsonValue,
574        radius: &str,
575    ) -> Result<JsonValue, String> {
576        #[cfg(feature = "redis")]
577        if let Some(ref mut r) = self.redis {
578            return r.geo_radius(key, value, radius);
579        }
580        Err("缓存未初始化".into())
581    }
582
583    pub fn stream_add(
584        &mut self,
585        key: &str,
586        msg_id: &str,
587        field: &str,
588        value: JsonValue,
589    ) -> Result<String, String> {
590        #[cfg(feature = "redis")]
591        if let Some(ref mut r) = self.redis {
592            return r.stream_add(key, msg_id, field, value);
593        }
594        Err("缓存未初始化".into())
595    }
596
597    pub fn stream_count(&mut self, key: &str) -> Result<usize, String> {
598        #[cfg(feature = "redis")]
599        if let Some(ref mut r) = self.redis {
600            return r.stream_count(key);
601        }
602        Err("缓存未初始化".into())
603    }
604
605    pub fn stream_get(&mut self, key: &str) -> Result<JsonValue, String> {
606        #[cfg(feature = "redis")]
607        if let Some(ref mut r) = self.redis {
608            return r.stream_get(key);
609        }
610        Err("缓存未初始化".into())
611    }
612
613    pub fn stream_del(&mut self, key: &str, id: &str) -> Result<bool, String> {
614        #[cfg(feature = "redis")]
615        if let Some(ref mut r) = self.redis {
616            return r.stream_del(key, id);
617        }
618        Err("缓存未初始化".into())
619    }
620
621    pub fn stream_group_create(&mut self, key: &str, group: &str) -> Result<bool, String> {
622        #[cfg(feature = "redis")]
623        if let Some(ref mut r) = self.redis {
624            return r.stream_group_create(key, group);
625        }
626        Err("缓存未初始化".into())
627    }
628
629    pub fn stream_group_add_user(
630        &mut self,
631        key: &str,
632        group: &str,
633        user: &str,
634    ) -> Result<bool, String> {
635        #[cfg(feature = "redis")]
636        if let Some(ref mut r) = self.redis {
637            return r.stream_group_add_user(key, group, user);
638        }
639        Err("缓存未初始化".into())
640    }
641
642    pub fn stream_group_del_user(
643        &mut self,
644        key: &str,
645        group: &str,
646        user: &str,
647    ) -> Result<bool, String> {
648        #[cfg(feature = "redis")]
649        if let Some(ref mut r) = self.redis {
650            return r.stream_group_del_user(key, group, user);
651        }
652        Err("缓存未初始化".into())
653    }
654
655    pub fn stream_group_del(&mut self, key: &str, group: &str) -> Result<bool, String> {
656        #[cfg(feature = "redis")]
657        if let Some(ref mut r) = self.redis {
658            return r.stream_group_del(key, group);
659        }
660        Err("缓存未初始化".into())
661    }
662
663    pub fn stream_group_msg(
664        &mut self,
665        key: &str,
666        group: &str,
667        user: &str,
668    ) -> Result<JsonValue, String> {
669        #[cfg(feature = "redis")]
670        if let Some(ref mut r) = self.redis {
671            return r.stream_group_msg(key, group, user);
672        }
673        Err("缓存未初始化".into())
674    }
675
676    pub fn stream_get_group(&mut self, key: &str, group: &str) -> Result<bool, String> {
677        #[cfg(feature = "redis")]
678        if let Some(ref mut r) = self.redis {
679            return r.stream_get_group(key, group);
680        }
681        Err("缓存未初始化".into())
682    }
683
684    pub fn stream_get_stream(&mut self, key: &str) -> Result<JsonValue, String> {
685        #[cfg(feature = "redis")]
686        if let Some(ref mut r) = self.redis {
687            return r.stream_get_stream(key);
688        }
689        Err("缓存未初始化".into())
690    }
691
692    pub fn subscribe(&mut self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
693        #[cfg(feature = "kafka")]
694        if let Some(ref k) = self.kafka {
695            return k.subscribe(key, tx);
696        }
697        #[cfg(feature = "redis")]
698        if let Some(ref mut r) = self.redis {
699            return r.subscribe(key, tx);
700        }
701        Err("缓存未初始化".into())
702    }
703
704    pub fn subscribe_with_reconnect(&self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
705        #[cfg(feature = "kafka")]
706        if let Some(ref k) = self.kafka {
707            return k.subscribe(key, tx);
708        }
709        #[cfg(feature = "redis")]
710        if let Some(ref r) = self.redis {
711            return r.subscribe_with_reconnect(key, tx);
712        }
713        Err("缓存未初始化".into())
714    }
715
716    pub fn publish(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
717        #[cfg(feature = "kafka")]
718        if let Some(ref k) = self.kafka {
719            return k.publish(key, value);
720        }
721        #[cfg(feature = "redis")]
722        if let Some(ref mut r) = self.redis {
723            return r.publish(key, value);
724        }
725        Err("缓存未初始化".into())
726    }
727}
728
729#[cfg(feature = "kafka")]
730impl Cache {
731    pub fn kafka_get_topics(&self) -> Result<Vec<String>, String> {
732        self.kafka_ref()?.get_topics()
733    }
734
735    pub fn kafka_create_topic(&self, topic: &str) -> Result<bool, String> {
736        self.kafka_ref()?.create_topic(topic)
737    }
738
739    pub fn kafka_consume(
740        &self,
741        topic: &str,
742        key: &str,
743        only_one: bool,
744    ) -> Result<JsonValue, String> {
745        self.kafka_ref()?.consume(topic, key, only_one)
746    }
747
748    pub fn kafka_publish_with_key(
749        &self,
750        topic: &str,
751        key: &str,
752        data: JsonValue,
753    ) -> Result<bool, String> {
754        self.kafka_ref()?.publish_with_key(topic, key, data)
755    }
756}