1#[cfg(feature = "kafka")]
2pub use crate::config::KafkaConnection;
3pub use crate::config::{CacheMode, Config, Connection};
4#[cfg(feature = "kafka")]
5use crate::kafka::Kafka;
6#[cfg(feature = "redis")]
7use crate::redis::Redis;
8use json::JsonValue;
9use once_cell::sync::Lazy;
10use std::sync::mpsc::Sender;
11use std::sync::RwLock;
12
13pub mod config;
14#[cfg(feature = "kafka")]
15mod kafka;
16#[cfg(feature = "redis")]
17mod redis;
18
19static GLOBAL_CONFIG: Lazy<RwLock<Config>> = Lazy::new(|| RwLock::new(Config::default()));
20
21#[derive(Clone)]
22pub struct Cache {
23 #[cfg(feature = "redis")]
24 redis: Option<Redis>,
25 #[cfg(feature = "kafka")]
26 kafka: Option<Kafka>,
27}
28
29impl Cache {
30 pub fn new(config: Config) -> Self {
31 {
32 let mut data = GLOBAL_CONFIG.write().unwrap();
33 data.clone_from(&config);
34 }
35 let config = GLOBAL_CONFIG.read().unwrap();
36 let connection = config
37 .connections
38 .get(config.default.as_str())
39 .unwrap()
40 .clone();
41 Self::from_connection(connection)
42 }
43
44 pub fn create(name: &str, connection: Connection) -> Self {
45 {
46 let mut data = GLOBAL_CONFIG.write().unwrap();
47 if !data.connections.contains_key(name) {
48 data.connections.insert(name.to_string(), connection);
49 }
50 data.default = name.to_string();
51 }
52 let config = GLOBAL_CONFIG.read().unwrap();
53 let connection = config
54 .connections
55 .get(config.default.as_str())
56 .unwrap()
57 .clone();
58 Self::from_connection(connection)
59 }
60
61 fn from_connection(connection: Connection) -> Self {
62 #[cfg(feature = "redis")]
63 let redis = match Redis::connect(connection.clone()) {
64 Ok(r) => Some(r),
65 Err(e) => {
66 log::warn!("Redis 连接失败: {}", e);
67 None
68 }
69 };
70
71 #[cfg(feature = "kafka")]
72 let kafka = if let Some(kafka_conn) = connection.kafka {
73 match Kafka::connect(kafka_conn) {
74 Ok(k) => Some(k),
75 Err(e) => {
76 log::warn!("Kafka 连接失败: {}", e);
77 None
78 }
79 }
80 } else {
81 None
82 };
83
84 Self {
85 #[cfg(feature = "redis")]
86 redis,
87 #[cfg(feature = "kafka")]
88 kafka,
89 }
90 }
91
92 pub fn connections(&mut self) -> JsonValue {
93 let mut connections = vec![];
94 let data = GLOBAL_CONFIG.read().unwrap();
95 for (item, mut value) in data.connections.clone() {
96 if value.mode.str().is_empty() {
97 continue;
98 }
99 let mut t = value.json();
100 t["name"] = item.into();
101 connections.push(t);
102 }
103 connections.into()
104 }
105
106 pub fn connection(&mut self, name: &str) -> Self {
107 let mut data = GLOBAL_CONFIG.write().unwrap();
108 if data.connections.contains_key(name) {
109 if name == data.default {
110 return self.clone();
111 }
112 data.default = name.to_string();
113 let connection = data.connections.get(data.default.as_str()).unwrap().clone();
114 Self::from_connection(connection)
115 } else {
116 Self::none()
117 }
118 }
119
120 pub fn none() -> Self {
121 Self {
122 #[cfg(feature = "redis")]
123 redis: None,
124 #[cfg(feature = "kafka")]
125 kafka: None,
126 }
127 }
128
129 pub fn is_none(&self) -> bool {
130 #[cfg(all(feature = "redis", feature = "kafka"))]
131 {
132 return self.redis.is_none() && self.kafka.is_none();
133 }
134 #[cfg(all(feature = "redis", not(feature = "kafka")))]
135 {
136 return self.redis.is_none();
137 }
138 #[cfg(all(not(feature = "redis"), feature = "kafka"))]
139 {
140 return self.kafka.is_none();
141 }
142 #[cfg(all(not(feature = "redis"), not(feature = "kafka")))]
143 {
144 true
145 }
146 }
147
148 #[cfg(feature = "redis")]
149 pub fn is_redis(&self) -> bool {
150 self.redis.is_some()
151 }
152
153 #[cfg(not(feature = "redis"))]
154 pub fn is_redis(&self) -> bool {
155 false
156 }
157
158 #[cfg(feature = "kafka")]
159 pub fn is_kafka(&self) -> bool {
160 self.kafka.is_some()
161 }
162
163 #[cfg(not(feature = "kafka"))]
164 pub fn is_kafka(&self) -> bool {
165 false
166 }
167
168 pub fn mode(&self) -> &'static str {
169 #[cfg(feature = "kafka")]
170 if self.kafka.is_some() {
171 return "kafka";
172 }
173 #[cfg(feature = "redis")]
174 if self.redis.is_some() {
175 return "redis";
176 }
177 "none"
178 }
179
180 pub fn ping(&mut self) -> Result<JsonValue, String> {
181 let mut result = json::object! {
182 redis: false,
183 kafka: false
184 };
185
186 #[cfg(feature = "redis")]
187 if let Some(ref mut r) = self.redis {
188 result["redis"] = r.ping().unwrap_or(false).into();
189 }
190
191 #[cfg(feature = "kafka")]
192 if let Some(ref k) = self.kafka {
193 result["kafka"] = k.ping().unwrap_or(false).into();
194 }
195
196 Ok(result)
197 }
198
199 #[cfg(feature = "redis")]
200 pub fn redis_ping(&mut self) -> Result<bool, String> {
201 if let Some(ref mut r) = self.redis {
202 return r.ping();
203 }
204 Err("Redis未初始化".into())
205 }
206
207 #[cfg(feature = "redis")]
208 pub fn redis_publish(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
209 if let Some(ref mut r) = self.redis {
210 return r.publish(key, value);
211 }
212 Err("Redis未初始化".into())
213 }
214
215 #[cfg(feature = "kafka")]
216 pub fn kafka_ping(&self) -> Result<bool, String> {
217 if let Some(ref k) = self.kafka {
218 return k.ping();
219 }
220 Err("Kafka未初始化".into())
221 }
222
223 #[cfg(feature = "kafka")]
224 fn kafka_ref(&self) -> Result<&Kafka, String> {
225 self.kafka.as_ref().ok_or_else(|| "Kafka未初始化".into())
226 }
227}
228
229impl Cache {
230 pub fn db(&mut self, db: i8) -> &mut Self {
231 #[cfg(feature = "redis")]
232 if let Some(ref mut r) = self.redis {
233 r.db(db);
234 }
235 self
236 }
237
238 pub fn key_exists(&mut self, key: &str) -> Result<bool, String> {
239 #[cfg(feature = "redis")]
240 if let Some(ref mut r) = self.redis {
241 return r.key_exists(key);
242 }
243 Err("缓存未初始化".into())
244 }
245
246 pub fn key_del(&mut self, key: &str) -> Result<bool, String> {
247 #[cfg(feature = "redis")]
248 if let Some(ref mut r) = self.redis {
249 return r.key_del(key);
250 }
251 Err("缓存未初始化".into())
252 }
253
254 pub fn key_ttl(&mut self, key: &str) -> Result<i64, String> {
255 #[cfg(feature = "redis")]
256 if let Some(ref mut r) = self.redis {
257 return r.key_ttl(key);
258 }
259 Err("缓存未初始化".into())
260 }
261
262 pub fn key_set_expireat(&mut self, key: &str, timestamp: i64) -> Result<bool, String> {
263 #[cfg(feature = "redis")]
264 if let Some(ref mut r) = self.redis {
265 return r.key_set_expireat(key, timestamp);
266 }
267 Err("缓存未初始化".into())
268 }
269
270 pub fn key_set_seconds(&mut self, key: &str, s: i64) -> Result<bool, String> {
271 #[cfg(feature = "redis")]
272 if let Some(ref mut r) = self.redis {
273 return r.key_set_seconds(key, s);
274 }
275 Err("缓存未初始化".into())
276 }
277
278 pub fn key_del_expire(&mut self, key: &str) -> Result<bool, String> {
279 #[cfg(feature = "redis")]
280 if let Some(ref mut r) = self.redis {
281 return r.key_del_expire(key);
282 }
283 Err("缓存未初始化".into())
284 }
285
286 pub fn key_query(&mut self, key: &str) -> Result<JsonValue, String> {
287 #[cfg(feature = "redis")]
288 if let Some(ref mut r) = self.redis {
289 return r.key_query(key);
290 }
291 Err("缓存未初始化".into())
292 }
293
294 pub fn add(
295 &mut self,
296 key: &str,
297 value: JsonValue,
298 expiration_date: u64,
299 ) -> Result<bool, String> {
300 #[cfg(feature = "redis")]
301 if let Some(ref mut r) = self.redis {
302 return r.add(key, value, expiration_date);
303 }
304 Err("缓存未初始化".into())
305 }
306
307 pub fn get(&mut self, key: &str) -> Result<JsonValue, String> {
308 #[cfg(feature = "redis")]
309 if let Some(ref mut r) = self.redis {
310 return r.get(key);
311 }
312 Err("缓存未初始化".into())
313 }
314
315 pub fn set_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
316 #[cfg(feature = "redis")]
317 if let Some(ref mut r) = self.redis {
318 return r.set_add(key, value, expiry_s);
319 }
320 Err("缓存未初始化".into())
321 }
322
323 pub fn set_count(&mut self, key: &str) -> Result<usize, String> {
324 #[cfg(feature = "redis")]
325 if let Some(ref mut r) = self.redis {
326 return r.set_count(key);
327 }
328 Err("缓存未初始化".into())
329 }
330
331 pub fn set_get(&mut self, key: &str) -> Result<JsonValue, String> {
332 #[cfg(feature = "redis")]
333 if let Some(ref mut r) = self.redis {
334 return r.set_get(key);
335 }
336 Err("缓存未初始化".into())
337 }
338
339 pub fn set_delete(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
340 #[cfg(feature = "redis")]
341 if let Some(ref mut r) = self.redis {
342 return r.set_delete(key, value);
343 }
344 Err("缓存未初始化".into())
345 }
346
347 pub fn set_get_sinter(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
348 #[cfg(feature = "redis")]
349 if let Some(ref mut r) = self.redis {
350 return r.set_get_sinter(keys);
351 }
352 Err("缓存未初始化".into())
353 }
354
355 pub fn set_get_sunion(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
356 #[cfg(feature = "redis")]
357 if let Some(ref mut r) = self.redis {
358 return r.set_get_sunion(keys);
359 }
360 Err("缓存未初始化".into())
361 }
362
363 pub fn list_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
364 #[cfg(feature = "redis")]
365 if let Some(ref mut r) = self.redis {
366 return r.list_add(key, value, expiry_s);
367 }
368 Err("缓存未初始化".into())
369 }
370
371 pub fn list_del(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
372 #[cfg(feature = "redis")]
373 if let Some(ref mut r) = self.redis {
374 return r.list_del(key, value);
375 }
376 Err("缓存未初始化".into())
377 }
378
379 pub fn list_lpush(
380 &mut self,
381 key: &str,
382 value: JsonValue,
383 expiry_s: i64,
384 ) -> Result<bool, String> {
385 #[cfg(feature = "redis")]
386 if let Some(ref mut r) = self.redis {
387 return r.list_lpush(key, value, expiry_s);
388 }
389 Err("缓存未初始化".into())
390 }
391
392 pub fn list_rpush(
393 &mut self,
394 key: &str,
395 value: JsonValue,
396 expiry_s: i64,
397 ) -> Result<bool, String> {
398 #[cfg(feature = "redis")]
399 if let Some(ref mut r) = self.redis {
400 return r.list_rpush(key, value, expiry_s);
401 }
402 Err("缓存未初始化".into())
403 }
404
405 pub fn list_lpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
406 #[cfg(feature = "redis")]
407 if let Some(ref mut r) = self.redis {
408 return r.list_lpop(key, count);
409 }
410 Err("缓存未初始化".into())
411 }
412
413 pub fn list_rpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
414 #[cfg(feature = "redis")]
415 if let Some(ref mut r) = self.redis {
416 return r.list_rpop(key, count);
417 }
418 Err("缓存未初始化".into())
419 }
420
421 pub fn list_len(&mut self, key: &str) -> Result<usize, String> {
422 #[cfg(feature = "redis")]
423 if let Some(ref mut r) = self.redis {
424 return r.list_len(key);
425 }
426 Err("缓存未初始化".into())
427 }
428
429 pub fn list_range(
430 &mut self,
431 key: &str,
432 start: isize,
433 stop: isize,
434 ) -> Result<Vec<JsonValue>, String> {
435 #[cfg(feature = "redis")]
436 if let Some(ref mut r) = self.redis {
437 return r.list_range(key, start, stop);
438 }
439 Err("缓存未初始化".into())
440 }
441
442 pub fn list_all(&mut self, key: &str) -> Result<Vec<JsonValue>, String> {
443 #[cfg(feature = "redis")]
444 if let Some(ref mut r) = self.redis {
445 return r.list_all(key);
446 }
447 Err("缓存未初始化".into())
448 }
449
450 pub fn list_get(&mut self, key: &str, index: isize) -> Result<JsonValue, String> {
451 #[cfg(feature = "redis")]
452 if let Some(ref mut r) = self.redis {
453 return r.list_get(key, index);
454 }
455 Err("缓存未初始化".into())
456 }
457
458 pub fn list_trim(&mut self, key: &str, start: isize, stop: isize) -> Result<bool, String> {
459 #[cfg(feature = "redis")]
460 if let Some(ref mut r) = self.redis {
461 return r.list_trim(key, start, stop);
462 }
463 Err("缓存未初始化".into())
464 }
465
466 pub fn list_set(&mut self, key: &str, index: isize, value: JsonValue) -> Result<bool, String> {
467 #[cfg(feature = "redis")]
468 if let Some(ref mut r) = self.redis {
469 return r.list_set(key, index, value);
470 }
471 Err("缓存未初始化".into())
472 }
473
474 pub fn list_remove(
475 &mut self,
476 key: &str,
477 value: JsonValue,
478 count: isize,
479 ) -> Result<isize, String> {
480 #[cfg(feature = "redis")]
481 if let Some(ref mut r) = self.redis {
482 return r.list_remove(key, value, count);
483 }
484 Err("缓存未初始化".into())
485 }
486
487 pub fn hash_get(&mut self, key: &str) -> Result<JsonValue, String> {
488 #[cfg(feature = "redis")]
489 if let Some(ref mut r) = self.redis {
490 return r.hash_get(key);
491 }
492 Err("缓存未初始化".into())
493 }
494
495 pub fn hash_add(&mut self, key: &str, field: &str, value: JsonValue) -> Result<bool, String> {
496 #[cfg(feature = "redis")]
497 if let Some(ref mut r) = self.redis {
498 return r.hash_add(key, field, value);
499 }
500 Err("缓存未初始化".into())
501 }
502
503 pub fn hash_get_field_value(&mut self, key: &str, field: &str) -> Result<JsonValue, String> {
504 #[cfg(feature = "redis")]
505 if let Some(ref mut r) = self.redis {
506 return r.hash_get_field_value(key, field);
507 }
508 Err("缓存未初始化".into())
509 }
510
511 pub fn hash_get_fields(&mut self, key: &str) -> Result<JsonValue, String> {
512 #[cfg(feature = "redis")]
513 if let Some(ref mut r) = self.redis {
514 return r.hash_get_fields(key);
515 }
516 Err("缓存未初始化".into())
517 }
518
519 pub fn hash_delete(&mut self, key: &str, field: &str) -> Result<bool, String> {
520 #[cfg(feature = "redis")]
521 if let Some(ref mut r) = self.redis {
522 return r.hash_delete(key, field);
523 }
524 Err("缓存未初始化".into())
525 }
526
527 pub fn hash_get_values(&mut self, key: &str) -> Result<JsonValue, String> {
528 #[cfg(feature = "redis")]
529 if let Some(ref mut r) = self.redis {
530 return r.hash_get_values(key);
531 }
532 Err("缓存未初始化".into())
533 }
534
535 pub fn geo_add(
536 &mut self,
537 key: &str,
538 longitude: f64,
539 latitude: f64,
540 value: JsonValue,
541 ) -> Result<bool, String> {
542 #[cfg(feature = "redis")]
543 if let Some(ref mut r) = self.redis {
544 return r.geo_add(key, longitude, latitude, value);
545 }
546 Err("缓存未初始化".into())
547 }
548
549 pub fn geo_get(&mut self, key: &str, value: JsonValue) -> Result<JsonValue, String> {
550 #[cfg(feature = "redis")]
551 if let Some(ref mut r) = self.redis {
552 return r.geo_get(key, value);
553 }
554 Err("缓存未初始化".into())
555 }
556
557 pub fn geo_dist(
558 &mut self,
559 key: &str,
560 value1: JsonValue,
561 value2: JsonValue,
562 ) -> Result<JsonValue, String> {
563 #[cfg(feature = "redis")]
564 if let Some(ref mut r) = self.redis {
565 return r.geo_dist(key, value1, value2);
566 }
567 Err("缓存未初始化".into())
568 }
569
570 pub fn geo_radius(
571 &mut self,
572 key: &str,
573 value: JsonValue,
574 radius: &str,
575 ) -> Result<JsonValue, String> {
576 #[cfg(feature = "redis")]
577 if let Some(ref mut r) = self.redis {
578 return r.geo_radius(key, value, radius);
579 }
580 Err("缓存未初始化".into())
581 }
582
583 pub fn stream_add(
584 &mut self,
585 key: &str,
586 msg_id: &str,
587 field: &str,
588 value: JsonValue,
589 ) -> Result<String, String> {
590 #[cfg(feature = "redis")]
591 if let Some(ref mut r) = self.redis {
592 return r.stream_add(key, msg_id, field, value);
593 }
594 Err("缓存未初始化".into())
595 }
596
597 pub fn stream_count(&mut self, key: &str) -> Result<usize, String> {
598 #[cfg(feature = "redis")]
599 if let Some(ref mut r) = self.redis {
600 return r.stream_count(key);
601 }
602 Err("缓存未初始化".into())
603 }
604
605 pub fn stream_get(&mut self, key: &str) -> Result<JsonValue, String> {
606 #[cfg(feature = "redis")]
607 if let Some(ref mut r) = self.redis {
608 return r.stream_get(key);
609 }
610 Err("缓存未初始化".into())
611 }
612
613 pub fn stream_del(&mut self, key: &str, id: &str) -> Result<bool, String> {
614 #[cfg(feature = "redis")]
615 if let Some(ref mut r) = self.redis {
616 return r.stream_del(key, id);
617 }
618 Err("缓存未初始化".into())
619 }
620
621 pub fn stream_group_create(&mut self, key: &str, group: &str) -> Result<bool, String> {
622 #[cfg(feature = "redis")]
623 if let Some(ref mut r) = self.redis {
624 return r.stream_group_create(key, group);
625 }
626 Err("缓存未初始化".into())
627 }
628
629 pub fn stream_group_add_user(
630 &mut self,
631 key: &str,
632 group: &str,
633 user: &str,
634 ) -> Result<bool, String> {
635 #[cfg(feature = "redis")]
636 if let Some(ref mut r) = self.redis {
637 return r.stream_group_add_user(key, group, user);
638 }
639 Err("缓存未初始化".into())
640 }
641
642 pub fn stream_group_del_user(
643 &mut self,
644 key: &str,
645 group: &str,
646 user: &str,
647 ) -> Result<bool, String> {
648 #[cfg(feature = "redis")]
649 if let Some(ref mut r) = self.redis {
650 return r.stream_group_del_user(key, group, user);
651 }
652 Err("缓存未初始化".into())
653 }
654
655 pub fn stream_group_del(&mut self, key: &str, group: &str) -> Result<bool, String> {
656 #[cfg(feature = "redis")]
657 if let Some(ref mut r) = self.redis {
658 return r.stream_group_del(key, group);
659 }
660 Err("缓存未初始化".into())
661 }
662
663 pub fn stream_group_msg(
664 &mut self,
665 key: &str,
666 group: &str,
667 user: &str,
668 ) -> Result<JsonValue, String> {
669 #[cfg(feature = "redis")]
670 if let Some(ref mut r) = self.redis {
671 return r.stream_group_msg(key, group, user);
672 }
673 Err("缓存未初始化".into())
674 }
675
676 pub fn stream_get_group(&mut self, key: &str, group: &str) -> Result<bool, String> {
677 #[cfg(feature = "redis")]
678 if let Some(ref mut r) = self.redis {
679 return r.stream_get_group(key, group);
680 }
681 Err("缓存未初始化".into())
682 }
683
684 pub fn stream_get_stream(&mut self, key: &str) -> Result<JsonValue, String> {
685 #[cfg(feature = "redis")]
686 if let Some(ref mut r) = self.redis {
687 return r.stream_get_stream(key);
688 }
689 Err("缓存未初始化".into())
690 }
691
692 pub fn subscribe(&mut self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
693 #[cfg(feature = "kafka")]
694 if let Some(ref k) = self.kafka {
695 return k.subscribe(key, tx);
696 }
697 #[cfg(feature = "redis")]
698 if let Some(ref mut r) = self.redis {
699 return r.subscribe(key, tx);
700 }
701 Err("缓存未初始化".into())
702 }
703
704 pub fn subscribe_with_reconnect(&self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
705 #[cfg(feature = "kafka")]
706 if let Some(ref k) = self.kafka {
707 return k.subscribe(key, tx);
708 }
709 #[cfg(feature = "redis")]
710 if let Some(ref r) = self.redis {
711 return r.subscribe_with_reconnect(key, tx);
712 }
713 Err("缓存未初始化".into())
714 }
715
716 pub fn publish(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
717 #[cfg(feature = "kafka")]
718 if let Some(ref k) = self.kafka {
719 return k.publish(key, value);
720 }
721 #[cfg(feature = "redis")]
722 if let Some(ref mut r) = self.redis {
723 return r.publish(key, value);
724 }
725 Err("缓存未初始化".into())
726 }
727}
728
729#[cfg(feature = "kafka")]
730impl Cache {
731 pub fn kafka_get_topics(&self) -> Result<Vec<String>, String> {
732 self.kafka_ref()?.get_topics()
733 }
734
735 pub fn kafka_create_topic(&self, topic: &str) -> Result<bool, String> {
736 self.kafka_ref()?.create_topic(topic)
737 }
738
739 pub fn kafka_consume(
740 &self,
741 topic: &str,
742 key: &str,
743 only_one: bool,
744 ) -> Result<JsonValue, String> {
745 self.kafka_ref()?.consume(topic, key, only_one)
746 }
747
748 pub fn kafka_publish_with_key(
749 &self,
750 topic: &str,
751 key: &str,
752 data: JsonValue,
753 ) -> Result<bool, String> {
754 self.kafka_ref()?.publish_with_key(topic, key, data)
755 }
756}