#[cfg(feature = "kafka")]
pub use crate::config::KafkaConnection;
pub use crate::config::{CacheMode, Config, Connection};
#[cfg(feature = "kafka")]
use crate::kafka::Kafka;
#[cfg(feature = "redis")]
use crate::redis::Redis;
use json::JsonValue;
use once_cell::sync::Lazy;
use std::sync::mpsc::Sender;
use std::sync::RwLock;
pub mod config;
#[cfg(feature = "kafka")]
mod kafka;
#[cfg(feature = "redis")]
mod redis;
static GLOBAL_CONFIG: Lazy<RwLock<Config>> = Lazy::new(|| RwLock::new(Config::default()));
#[derive(Clone)]
pub struct Cache {
#[cfg(feature = "redis")]
redis: Option<Redis>,
#[cfg(feature = "kafka")]
kafka: Option<Kafka>,
}
impl Cache {
pub fn new(config: Config) -> Self {
{
let mut data = GLOBAL_CONFIG.write().unwrap();
data.clone_from(&config);
}
let config = GLOBAL_CONFIG.read().unwrap();
let connection = config
.connections
.get(config.default.as_str())
.unwrap()
.clone();
Self::from_connection(connection)
}
pub fn create(name: &str, connection: Connection) -> Self {
{
let mut data = GLOBAL_CONFIG.write().unwrap();
if !data.connections.contains_key(name) {
data.connections.insert(name.to_string(), connection);
}
data.default = name.to_string();
}
let config = GLOBAL_CONFIG.read().unwrap();
let connection = config
.connections
.get(config.default.as_str())
.unwrap()
.clone();
Self::from_connection(connection)
}
fn from_connection(connection: Connection) -> Self {
#[cfg(feature = "redis")]
let redis = match Redis::connect(connection.clone()) {
Ok(r) => Some(r),
Err(e) => {
log::warn!("Redis 连接失败: {}", e);
None
}
};
#[cfg(feature = "kafka")]
let kafka = if let Some(kafka_conn) = connection.kafka {
match Kafka::connect(kafka_conn) {
Ok(k) => Some(k),
Err(e) => {
log::warn!("Kafka 连接失败: {}", e);
None
}
}
} else {
None
};
Self {
#[cfg(feature = "redis")]
redis,
#[cfg(feature = "kafka")]
kafka,
}
}
pub fn connections(&mut self) -> JsonValue {
let mut connections = vec![];
let data = GLOBAL_CONFIG.read().unwrap();
for (item, mut value) in data.connections.clone() {
if value.mode.str().is_empty() {
continue;
}
let mut t = value.json();
t["name"] = item.into();
connections.push(t);
}
connections.into()
}
pub fn connection(&mut self, name: &str) -> Self {
let mut data = GLOBAL_CONFIG.write().unwrap();
if data.connections.contains_key(name) {
if name == data.default {
return self.clone();
}
data.default = name.to_string();
let connection = data.connections.get(data.default.as_str()).unwrap().clone();
Self::from_connection(connection)
} else {
Self::none()
}
}
pub fn none() -> Self {
Self {
#[cfg(feature = "redis")]
redis: None,
#[cfg(feature = "kafka")]
kafka: None,
}
}
pub fn is_none(&self) -> bool {
#[cfg(all(feature = "redis", feature = "kafka"))]
{
return self.redis.is_none() && self.kafka.is_none();
}
#[cfg(all(feature = "redis", not(feature = "kafka")))]
{
return self.redis.is_none();
}
#[cfg(all(not(feature = "redis"), feature = "kafka"))]
{
return self.kafka.is_none();
}
#[cfg(all(not(feature = "redis"), not(feature = "kafka")))]
{
true
}
}
#[cfg(feature = "redis")]
pub fn is_redis(&self) -> bool {
self.redis.is_some()
}
#[cfg(not(feature = "redis"))]
pub fn is_redis(&self) -> bool {
false
}
#[cfg(feature = "kafka")]
pub fn is_kafka(&self) -> bool {
self.kafka.is_some()
}
#[cfg(not(feature = "kafka"))]
pub fn is_kafka(&self) -> bool {
false
}
pub fn mode(&self) -> &'static str {
#[cfg(feature = "kafka")]
if self.kafka.is_some() {
return "kafka";
}
#[cfg(feature = "redis")]
if self.redis.is_some() {
return "redis";
}
"none"
}
pub fn ping(&mut self) -> Result<JsonValue, String> {
let mut result = json::object! {
redis: false,
kafka: false
};
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
result["redis"] = r.ping().unwrap_or(false).into();
}
#[cfg(feature = "kafka")]
if let Some(ref k) = self.kafka {
result["kafka"] = k.ping().unwrap_or(false).into();
}
Ok(result)
}
#[cfg(feature = "redis")]
pub fn redis_ping(&mut self) -> Result<bool, String> {
if let Some(ref mut r) = self.redis {
return r.ping();
}
Err("Redis未初始化".into())
}
#[cfg(feature = "redis")]
pub fn redis_publish(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
if let Some(ref mut r) = self.redis {
return r.publish(key, value);
}
Err("Redis未初始化".into())
}
#[cfg(feature = "kafka")]
pub fn kafka_ping(&self) -> Result<bool, String> {
if let Some(ref k) = self.kafka {
return k.ping();
}
Err("Kafka未初始化".into())
}
#[cfg(feature = "kafka")]
fn kafka_ref(&self) -> Result<&Kafka, String> {
self.kafka.as_ref().ok_or_else(|| "Kafka未初始化".into())
}
}
impl Cache {
pub fn db(&mut self, db: i8) -> &mut Self {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
r.db(db);
}
self
}
pub fn key_exists(&mut self, key: &str) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.key_exists(key);
}
Err("缓存未初始化".into())
}
pub fn key_del(&mut self, key: &str) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.key_del(key);
}
Err("缓存未初始化".into())
}
pub fn key_ttl(&mut self, key: &str) -> Result<i64, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.key_ttl(key);
}
Err("缓存未初始化".into())
}
pub fn key_set_expireat(&mut self, key: &str, timestamp: i64) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.key_set_expireat(key, timestamp);
}
Err("缓存未初始化".into())
}
pub fn key_set_seconds(&mut self, key: &str, s: i64) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.key_set_seconds(key, s);
}
Err("缓存未初始化".into())
}
pub fn key_del_expire(&mut self, key: &str) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.key_del_expire(key);
}
Err("缓存未初始化".into())
}
pub fn key_query(&mut self, key: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.key_query(key);
}
Err("缓存未初始化".into())
}
pub fn add(
&mut self,
key: &str,
value: JsonValue,
expiration_date: u64,
) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.add(key, value, expiration_date);
}
Err("缓存未初始化".into())
}
pub fn get(&mut self, key: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.get(key);
}
Err("缓存未初始化".into())
}
pub fn set_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.set_add(key, value, expiry_s);
}
Err("缓存未初始化".into())
}
pub fn set_count(&mut self, key: &str) -> Result<usize, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.set_count(key);
}
Err("缓存未初始化".into())
}
pub fn set_get(&mut self, key: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.set_get(key);
}
Err("缓存未初始化".into())
}
pub fn set_delete(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.set_delete(key, value);
}
Err("缓存未初始化".into())
}
pub fn set_get_sinter(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.set_get_sinter(keys);
}
Err("缓存未初始化".into())
}
pub fn set_get_sunion(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.set_get_sunion(keys);
}
Err("缓存未初始化".into())
}
pub fn list_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_add(key, value, expiry_s);
}
Err("缓存未初始化".into())
}
pub fn list_del(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_del(key, value);
}
Err("缓存未初始化".into())
}
pub fn list_lpush(
&mut self,
key: &str,
value: JsonValue,
expiry_s: i64,
) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_lpush(key, value, expiry_s);
}
Err("缓存未初始化".into())
}
pub fn list_rpush(
&mut self,
key: &str,
value: JsonValue,
expiry_s: i64,
) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_rpush(key, value, expiry_s);
}
Err("缓存未初始化".into())
}
pub fn list_lpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_lpop(key, count);
}
Err("缓存未初始化".into())
}
pub fn list_rpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_rpop(key, count);
}
Err("缓存未初始化".into())
}
pub fn list_len(&mut self, key: &str) -> Result<usize, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_len(key);
}
Err("缓存未初始化".into())
}
pub fn list_range(
&mut self,
key: &str,
start: isize,
stop: isize,
) -> Result<Vec<JsonValue>, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_range(key, start, stop);
}
Err("缓存未初始化".into())
}
pub fn list_all(&mut self, key: &str) -> Result<Vec<JsonValue>, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_all(key);
}
Err("缓存未初始化".into())
}
pub fn list_get(&mut self, key: &str, index: isize) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_get(key, index);
}
Err("缓存未初始化".into())
}
pub fn list_trim(&mut self, key: &str, start: isize, stop: isize) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_trim(key, start, stop);
}
Err("缓存未初始化".into())
}
pub fn list_set(&mut self, key: &str, index: isize, value: JsonValue) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_set(key, index, value);
}
Err("缓存未初始化".into())
}
pub fn list_remove(
&mut self,
key: &str,
value: JsonValue,
count: isize,
) -> Result<isize, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.list_remove(key, value, count);
}
Err("缓存未初始化".into())
}
pub fn hash_get(&mut self, key: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.hash_get(key);
}
Err("缓存未初始化".into())
}
pub fn hash_add(&mut self, key: &str, field: &str, value: JsonValue) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.hash_add(key, field, value);
}
Err("缓存未初始化".into())
}
pub fn hash_get_field_value(&mut self, key: &str, field: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.hash_get_field_value(key, field);
}
Err("缓存未初始化".into())
}
pub fn hash_get_fields(&mut self, key: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.hash_get_fields(key);
}
Err("缓存未初始化".into())
}
pub fn hash_delete(&mut self, key: &str, field: &str) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.hash_delete(key, field);
}
Err("缓存未初始化".into())
}
pub fn hash_get_values(&mut self, key: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.hash_get_values(key);
}
Err("缓存未初始化".into())
}
pub fn geo_add(
&mut self,
key: &str,
longitude: f64,
latitude: f64,
value: JsonValue,
) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.geo_add(key, longitude, latitude, value);
}
Err("缓存未初始化".into())
}
pub fn geo_get(&mut self, key: &str, value: JsonValue) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.geo_get(key, value);
}
Err("缓存未初始化".into())
}
pub fn geo_dist(
&mut self,
key: &str,
value1: JsonValue,
value2: JsonValue,
) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.geo_dist(key, value1, value2);
}
Err("缓存未初始化".into())
}
pub fn geo_radius(
&mut self,
key: &str,
value: JsonValue,
radius: &str,
) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.geo_radius(key, value, radius);
}
Err("缓存未初始化".into())
}
pub fn stream_add(
&mut self,
key: &str,
msg_id: &str,
field: &str,
value: JsonValue,
) -> Result<String, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_add(key, msg_id, field, value);
}
Err("缓存未初始化".into())
}
pub fn stream_count(&mut self, key: &str) -> Result<usize, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_count(key);
}
Err("缓存未初始化".into())
}
pub fn stream_get(&mut self, key: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_get(key);
}
Err("缓存未初始化".into())
}
pub fn stream_del(&mut self, key: &str, id: &str) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_del(key, id);
}
Err("缓存未初始化".into())
}
pub fn stream_group_create(&mut self, key: &str, group: &str) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_group_create(key, group);
}
Err("缓存未初始化".into())
}
pub fn stream_group_add_user(
&mut self,
key: &str,
group: &str,
user: &str,
) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_group_add_user(key, group, user);
}
Err("缓存未初始化".into())
}
pub fn stream_group_del_user(
&mut self,
key: &str,
group: &str,
user: &str,
) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_group_del_user(key, group, user);
}
Err("缓存未初始化".into())
}
pub fn stream_group_del(&mut self, key: &str, group: &str) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_group_del(key, group);
}
Err("缓存未初始化".into())
}
pub fn stream_group_msg(
&mut self,
key: &str,
group: &str,
user: &str,
) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_group_msg(key, group, user);
}
Err("缓存未初始化".into())
}
pub fn stream_get_group(&mut self, key: &str, group: &str) -> Result<bool, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_get_group(key, group);
}
Err("缓存未初始化".into())
}
pub fn stream_get_stream(&mut self, key: &str) -> Result<JsonValue, String> {
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.stream_get_stream(key);
}
Err("缓存未初始化".into())
}
pub fn subscribe(&mut self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
#[cfg(feature = "kafka")]
if let Some(ref k) = self.kafka {
return k.subscribe(key, tx);
}
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.subscribe(key, tx);
}
Err("缓存未初始化".into())
}
pub fn subscribe_with_reconnect(&self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
#[cfg(feature = "kafka")]
if let Some(ref k) = self.kafka {
return k.subscribe(key, tx);
}
#[cfg(feature = "redis")]
if let Some(ref r) = self.redis {
return r.subscribe_with_reconnect(key, tx);
}
Err("缓存未初始化".into())
}
pub fn publish(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
#[cfg(feature = "kafka")]
if let Some(ref k) = self.kafka {
return k.publish(key, value);
}
#[cfg(feature = "redis")]
if let Some(ref mut r) = self.redis {
return r.publish(key, value);
}
Err("缓存未初始化".into())
}
}
#[cfg(feature = "kafka")]
impl Cache {
pub fn kafka_get_topics(&self) -> Result<Vec<String>, String> {
self.kafka_ref()?.get_topics()
}
pub fn kafka_create_topic(&self, topic: &str) -> Result<bool, String> {
self.kafka_ref()?.create_topic(topic)
}
pub fn kafka_consume(
&self,
topic: &str,
key: &str,
only_one: bool,
) -> Result<JsonValue, String> {
self.kafka_ref()?.consume(topic, key, only_one)
}
pub fn kafka_publish_with_key(
&self,
topic: &str,
key: &str,
data: JsonValue,
) -> Result<bool, String> {
self.kafka_ref()?.publish_with_key(topic, key, data)
}
}