use crate::Connection as CacheConnection;
use json::{array, object, JsonValue};
use log::error;
use r2d2::{Pool, PooledConnection};
use redis::streams::{
StreamInfoGroupsReply, StreamInfoStreamReply, StreamRangeReply, StreamReadReply,
};
use redis::{geo, Client, Commands, FromRedisValue, RedisResult};
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::mpsc::Sender;
#[derive(Clone)]
pub struct Redis {
client: Pool<Client>,
pubsub_client: Client,
db: i8,
}
impl Redis {
pub fn connect(connection: CacheConnection) -> Result<Self, String> {
let dsn = if connection.userpass.is_empty() {
format!("redis://{}:{}/", connection.hostname, connection.hostport)
} else {
format!(
"redis://:{}@{}:{}/",
connection.userpass, connection.hostname, connection.hostport
)
};
let client = Client::open(dsn.clone()).map_err(|e| format!("Redis Client error: {e}"))?;
let pool = Pool::builder()
.max_size(64)
.connection_timeout(std::time::Duration::from_secs(2))
.build(client)
.map_err(|e| format!("Redis Pool build error: {e}"))?;
let pubsub_client =
Client::open(dsn).map_err(|e| format!("Redis PubSub Client error: {e}"))?;
Ok(Self {
client: pool,
pubsub_client,
db: 0,
})
}
pub fn con(&mut self) -> Result<PooledConnection<Client>, String> {
let mut conn = match self.client.get() {
Ok(e) => e,
Err(err) => return Err(format!("Redis Connection failed: {err}")),
};
redis::pipe()
.cmd("SELECT")
.arg(self.db)
.exec(&mut conn)
.unwrap();
Ok(conn)
}
}
impl Redis {
pub fn ping(&mut self) -> Result<bool, String> {
let mut conn = self.con()?;
let result: RedisResult<String> = redis::cmd("PING").query(&mut *conn);
match result {
Ok(r) => Ok(r == "PONG"),
Err(e) => Err(format!("Redis ping failed: {}", e)),
}
}
pub fn db(&mut self, db: i8) -> &mut Self {
self.db = db;
self
}
pub fn key_exists(&mut self, key: &str) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::exists(&mut self.con()?, key);
match data {
Ok(data) => Ok(data),
Err(e) => Err(format!("判断是否存在失败: {e}")),
}
}
pub fn key_del(&mut self, key: &str) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::del(&mut self.con()?, key);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("删除: {e}")),
}
}
pub fn key_ttl(&mut self, key: &str) -> Result<i64, String> {
let data: RedisResult<i64> = Commands::ttl(&mut self.con()?, key);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("获取剩余时间失败: {e}")),
}
}
pub fn key_set_expireat(&mut self, key: &str, timestamp: i64) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::expire_at(&mut self.con()?, key, timestamp);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("key_set_expireat: {e}")),
}
}
pub fn key_set_seconds(&mut self, key: &str, s: i64) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::expire(&mut self.con()?, key, s);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("key_set_expireat: {e}")),
}
}
pub fn key_del_expire(&mut self, key: &str) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::persist(&mut self.con()?, key);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("key_del_expire: {e}")),
}
}
pub fn key_query(&mut self, key: &str) -> Result<JsonValue, String> {
let data: RedisResult<Vec<String>> = Commands::keys(&mut self.con()?, key);
match data {
Ok(e) => Ok(JsonValue::from(e)),
Err(e) => Err(format!("查询KEYS失败: {e}")),
}
}
pub fn add(
&mut self,
key: &str,
value: JsonValue,
expiration_date: u64,
) -> Result<bool, String> {
let data: RedisResult<bool> = {
if expiration_date > 0 {
Commands::set_ex(&mut self.con()?, key, value.to_string(), expiration_date)
} else {
Commands::set(&mut self.con()?, key, value.to_string())
}
};
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("设置缓存失败: {e}")),
}
}
pub fn get(&mut self, key: &str) -> Result<JsonValue, String> {
let data: RedisResult<String> = Commands::get(&mut self.con()?, key);
match data {
Ok(e) => match json::parse(&e) {
Ok(json) => Ok(json),
Err(_) => Ok(JsonValue::from(e)),
},
Err(e) => Err(format!("获取失败: {e}")),
}
}
pub fn set_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::sadd(&mut self.con()?, key, value.to_string());
match data {
Ok(e) => {
if e && expiry_s > 0 {
return self.key_set_seconds(key, expiry_s);
}
Ok(e)
}
Err(e) => Err(format!("集合添加: {e}")),
}
}
pub fn set_count(&mut self, key: &str) -> Result<usize, String> {
let data: RedisResult<usize> = Commands::scard(&mut self.con()?, key);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("集合: {e}")),
}
}
pub fn set_get(&mut self, key: &str) -> Result<JsonValue, String> {
let data: RedisResult<Vec<String>> = Commands::smembers(&mut self.con()?, key);
match data {
Ok(e) => {
let mut list = array![];
for item in e.iter() {
let data = JsonValue::from(item.clone());
let json = json::parse(item).unwrap_or(data);
let _ = list.push(json);
}
Ok(list)
}
Err(e) => Err(format!("集合查询: {e}")),
}
}
pub fn set_delete(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::srem(&mut self.con()?, key, value.to_string());
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("集合删除: {e}")),
}
}
pub fn set_get_sinter(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
let data: RedisResult<HashSet<String>> = Commands::sinter(&mut self.con()?, keys);
match data {
Ok(e) => {
let mut list = array![];
for item in e {
let _ = list.push(json::parse(&item).unwrap_or(JsonValue::Null));
}
Ok(list)
}
Err(e) => Err(format!("集合删除: {e}")),
}
}
pub fn set_get_sunion(&mut self, keys: Vec<&str>) -> Result<JsonValue, String> {
let data: RedisResult<HashSet<String>> = Commands::sunion(&mut self.con()?, keys);
match data {
Ok(e) => {
let mut list = array![];
for item in e {
let _ = list.push(json::parse(&item).unwrap_or(JsonValue::Null));
}
Ok(list)
}
Err(e) => Err(format!("集合删除: {e}")),
}
}
pub fn list_add(&mut self, key: &str, value: JsonValue, expiry_s: i64) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::lpush(&mut self.con()?, key, value.to_string());
match data {
Ok(e) => {
if e && expiry_s > 0 {
return self.key_set_seconds(key, expiry_s);
}
Ok(e)
}
Err(e) => Err(format!("集合删除: {e}")),
}
}
pub fn list_del(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::lrem(&mut self.con()?, key, 0, value.to_string());
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("集合删除: {e}")),
}
}
pub fn list_lpush(
&mut self,
key: &str,
value: JsonValue,
expiry_s: i64,
) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::lpush(&mut self.con()?, key, value.to_string());
match data {
Ok(e) => {
if e && expiry_s > 0 {
return self.key_set_seconds(key, expiry_s);
}
Ok(e)
}
Err(e) => Err(format!("列表左侧添加失败: {e}")),
}
}
pub fn list_rpush(
&mut self,
key: &str,
value: JsonValue,
expiry_s: i64,
) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::rpush(&mut self.con()?, key, value.to_string());
match data {
Ok(e) => {
if e && expiry_s > 0 {
return self.key_set_seconds(key, expiry_s);
}
Ok(e)
}
Err(e) => Err(format!("列表右侧添加失败: {e}")),
}
}
pub fn list_lpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
let strings = if count <= 1 {
let data: RedisResult<Option<String>> =
Commands::lpop(&mut self.con()?, key, None::<NonZeroUsize>);
match data {
Ok(Some(value)) => vec![value],
Ok(None) => vec![],
Err(e) => return Err(format!("列表左侧弹出失败: {e}")),
}
} else {
let non_zero = NonZeroUsize::new(count).unwrap();
let data: RedisResult<Vec<String>> =
Commands::lpop(&mut self.con()?, key, Some(non_zero));
match data {
Ok(values) => values,
Err(e) => return Err(format!("列表左侧弹出多个失败: {e}")),
}
};
strings
.into_iter()
.map(|s| json::parse(&s).map_err(|e| format!("解析JSON失败: {e}")))
.collect()
}
pub fn list_rpop(&mut self, key: &str, count: usize) -> Result<Vec<JsonValue>, String> {
let strings = if count <= 1 {
let data: RedisResult<Option<String>> =
Commands::rpop(&mut self.con()?, key, None::<NonZeroUsize>);
match data {
Ok(Some(value)) => vec![value],
Ok(None) => vec![],
Err(e) => return Err(format!("列表右侧弹出失败: {e}")),
}
} else {
let non_zero = NonZeroUsize::new(count).unwrap();
let data: RedisResult<Vec<String>> =
Commands::rpop(&mut self.con()?, key, Some(non_zero));
match data {
Ok(values) => values,
Err(e) => return Err(format!("列表右侧弹出多个失败: {e}")),
}
};
strings
.into_iter()
.map(|s| json::parse(&s).map_err(|e| format!("解析JSON失败: {e}")))
.collect()
}
pub fn list_len(&mut self, key: &str) -> Result<usize, String> {
let data: RedisResult<usize> = Commands::llen(&mut self.con()?, key);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("获取列表长度失败: {e}")),
}
}
pub fn list_range(
&mut self,
key: &str,
start: isize,
stop: isize,
) -> Result<Vec<JsonValue>, String> {
let data: RedisResult<Vec<String>> = Commands::lrange(&mut self.con()?, key, start, stop);
match data {
Ok(e) => {
e.into_iter()
.map(|s| json::parse(&s).map_err(|e| format!("解析JSON失败: {e}")))
.collect()
}
Err(e) => Err(format!("获取列表范围失败: {e}")),
}
}
pub fn list_all(&mut self, key: &str) -> Result<Vec<JsonValue>, String> {
self.list_range(key, 0, -1)
}
pub fn list_get(&mut self, key: &str, index: isize) -> Result<JsonValue, String> {
let data: RedisResult<Option<String>> = Commands::lindex(&mut self.con()?, key, index);
match data {
Ok(Some(s)) => match json::parse(&s) {
Ok(val) => Ok(val),
Err(e) => Err(format!("解析 JSON 失败: {e}")),
},
Ok(None) => Ok(JsonValue::Null), Err(e) => Err(format!("获取列表元素失败: {e}")),
}
}
pub fn list_trim(&mut self, key: &str, start: isize, stop: isize) -> Result<bool, String> {
let data: RedisResult<()> = Commands::ltrim(&mut self.con()?, key, start, stop);
match data {
Ok(()) => Ok(true),
Err(e) => Err(format!("修剪列表失败: {e}")),
}
}
pub fn list_set(&mut self, key: &str, index: isize, value: JsonValue) -> Result<bool, String> {
let data: RedisResult<()> = Commands::lset(&mut self.con()?, key, index, value.to_string());
match data {
Ok(()) => Ok(true),
Err(e) => Err(format!("设置列表元素失败: {e}")),
}
}
pub fn list_remove(
&mut self,
key: &str,
value: JsonValue,
count: isize,
) -> Result<isize, String> {
let data: RedisResult<isize> =
Commands::lrem(&mut self.con()?, key, count, value.to_string());
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("删除列表元素失败: {e}")),
}
}
pub fn hash_get(&mut self, key: &str) -> Result<JsonValue, String> {
let data: RedisResult<Vec<String>> = Commands::hgetall(&mut self.con()?, key);
match data {
Ok(e) => {
let mut list = object! {};
let mut index = 0;
while index < e.len() {
list[e[index].to_string()] = e[index + 1].clone().into();
index += 2;
}
Ok(list)
}
Err(e) => Err(format!("{e}")),
}
}
pub fn hash_add(&mut self, key: &str, field: &str, value: JsonValue) -> Result<bool, String> {
let res: RedisResult<bool> = Commands::hexists(&mut self.con()?, key, field);
let ists = res.unwrap_or(false);
if ists {
let res = self.hash_delete(key, field)?;
if res {
let data: RedisResult<bool> =
Commands::hset(&mut self.con()?, key, field, value.to_string());
match data {
Ok(e) => Ok(e),
Err(e) => {
error!("{e}");
Err(format!("设置哈希类型缓存失败: {e}"))
}
}
} else {
Ok(false)
}
} else {
let data: RedisResult<bool> =
Commands::hset_nx(&mut self.con()?, key, field, value.to_string());
match data {
Ok(e) => Ok(e),
Err(e) => {
error!("{e}");
Err(format!("设置哈希类型缓存失败: {e}"))
}
}
}
}
pub fn hash_get_field_value(&mut self, key: &str, field: &str) -> Result<JsonValue, String> {
let data: RedisResult<String> = Commands::hget(&mut self.con()?, key, field);
match data {
Ok(e) => Ok(json::parse(e.as_str()).unwrap_or(JsonValue::Null)),
Err(e) => {
error!("{e}");
Err(format!("设置哈希类型缓存失败: {e}"))
}
}
}
pub fn hash_get_fields(&mut self, key: &str) -> Result<JsonValue, String> {
let data: RedisResult<Vec<String>> = Commands::hkeys(&mut self.con()?, key);
match data {
Ok(e) => Ok(e.into()),
Err(e) => {
error!("{e}");
Err(format!("获取哈希类型缓存失败: {e}"))
}
}
}
pub fn hash_delete(&mut self, key: &str, field: &str) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::hdel(&mut self.con()?, key, field);
match data {
Ok(e) => Ok(e),
Err(e) => {
error!("{e}");
Err(format!("设置哈希类型缓存失败: {e}"))
}
}
}
pub fn hash_get_values(&mut self, key: &str) -> Result<JsonValue, String> {
let data: RedisResult<Vec<String>> = Commands::hvals(&mut self.con()?, key);
match data {
Ok(e) => {
let mut list = array![];
for item in e.iter() {
list.push(json::parse(item).unwrap_or(object! {})).unwrap();
}
Ok(list)
}
Err(e) => {
error!("{e}");
Err(format!("获取哈希类型缓存失败: {e}"))
}
}
}
pub fn geo_add(
&mut self,
key: &str,
longitude: f64,
latitude: f64,
value: JsonValue,
) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::geo_add(
&mut self.con()?,
key,
&[(
longitude.to_string(),
latitude.to_string(),
value.to_string(),
)],
);
match data {
Ok(e) => Ok(e),
Err(e) => {
error!("{e}");
Err(format!("设置哈希类型缓存失败: {e}"))
}
}
}
pub fn geo_get(&mut self, key: &str, value: JsonValue) -> Result<JsonValue, String> {
let data: RedisResult<Vec<Option<geo::Coord<f64>>>> =
Commands::geo_pos(&mut self.con()?, key, &[value.to_string().clone()]);
match data {
Ok(e) => {
let mut data = object! {};
for item in e.into_iter().flatten() {
let t = item;
data["latitude"] = t.latitude.into();
data["longitude"] = t.longitude.into();
data["name"] = value.clone();
}
Ok(data)
}
Err(e) => {
error!("{e}");
Err(format!("设置哈希类型缓存失败: {e}"))
}
}
}
pub fn geo_dist(
&mut self,
key: &str,
value1: JsonValue,
value2: JsonValue,
) -> Result<JsonValue, String> {
let data: RedisResult<Option<f64>> = Commands::geo_dist(
&mut self.con()?,
key,
value1.to_string(),
value2.to_string(),
redis::geo::Unit::Meters,
);
match data {
Ok(Some(dist)) => Ok(dist.into()),
Ok(None) => Ok(JsonValue::Null),
Err(e) => Err(format!("{e}")),
}
}
pub fn geo_radius(
&mut self,
key: &str,
value: JsonValue,
radius: &str,
) -> Result<JsonValue, String> {
let member = value.to_string();
let radius_val: f64 = radius.parse().map_err(|_| "无效的半径值")?;
let data: RedisResult<Vec<String>> = Commands::geo_radius_by_member(
&mut self.con()?,
key,
member,
radius_val,
redis::geo::Unit::Meters,
redis::geo::RadiusOptions::default(),
);
match data {
Ok(members) => {
let arr: Vec<JsonValue> = members.into_iter().map(|s| s.into()).collect();
Ok(arr.into())
}
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_add(
&mut self,
key: &str,
msg_id: &str,
field: &str,
value: JsonValue,
) -> Result<String, String> {
let data: RedisResult<Option<String>> =
Commands::xadd(&mut self.con()?, key, msg_id, &[(field, value.to_string())]);
match data {
Ok(e) => Ok(e.unwrap().to_string()),
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_count(&mut self, key: &str) -> Result<usize, String> {
let data: RedisResult<usize> = Commands::xlen(&mut self.con()?, key);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_get(&mut self, key: &str) -> Result<JsonValue, String> {
let data: RedisResult<StreamRangeReply> = Commands::xrange(&mut self.con()?, key, "-", "+");
match data {
Ok(e) => {
let mut list = array![];
for id in e.ids.iter() {
for (_, value) in id.map.clone() {
let t = String::from_redis_value(value).unwrap();
let ids = object! {
id:id.id.clone(),
value:json::parse(t.as_str()).unwrap_or(t.into())
};
let _ = list.push(ids);
}
}
Ok(list)
}
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_del(&mut self, key: &str, id: &str) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::xdel(&mut self.con()?, key, &[id]);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_group_create(&mut self, key: &str, group: &str) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::xgroup_create(&mut self.con()?, key, group, "$");
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_group_add_user(
&mut self,
key: &str,
group: &str,
user: &str,
) -> Result<bool, String> {
let data: RedisResult<StreamReadReply> = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(group)
.arg(user)
.arg("COUNT")
.arg(0) .arg("STREAMS")
.arg(key)
.arg(">")
.query(&mut self.con()?);
match data {
Ok(_) => Ok(true),
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_group_del_user(
&mut self,
key: &str,
group: &str,
user: &str,
) -> Result<bool, String> {
let data: RedisResult<usize> = redis::cmd("XGROUP")
.arg("DELCONSUMER")
.arg(key)
.arg(group)
.arg(user)
.query(&mut self.con()?);
match data {
Ok(_) => Ok(true),
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_group_del(&mut self, key: &str, group: &str) -> Result<bool, String> {
let data: RedisResult<bool> = Commands::xgroup_destroy(&mut self.con()?, key, group);
match data {
Ok(e) => Ok(e),
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_group_msg(
&mut self,
key: &str,
group: &str,
user: &str,
) -> Result<JsonValue, String> {
let data: RedisResult<StreamReadReply> = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(group)
.arg(user)
.arg("COUNT")
.arg(2) .arg("STREAMS")
.arg(key)
.arg(">")
.query(&mut self.con()?);
match data {
Ok(e) => {
let mut list = array![];
for item in e.keys.iter().cloned() {
for id in item.ids.iter() {
for (_key, value) in id.map.clone() {
let t = String::from_redis_value(value).unwrap();
let ids = object! {
key:item.key.clone(),
id:id.id.clone(),
value:json::parse(t.as_str()).unwrap_or(t.into())
};
let _ = list.push(ids);
}
}
}
Ok(list)
}
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_get_group(&mut self, key: &str, group: &str) -> Result<bool, String> {
let data: RedisResult<StreamInfoGroupsReply> =
Commands::xinfo_groups(&mut self.con()?, key);
match data {
Ok(e) => {
for item in e.groups {
let name = item.name.clone();
if name == group {
return Ok(true);
}
}
Ok(false)
}
Err(e) => Err(format!("{e}")),
}
}
pub fn stream_get_stream(&mut self, key: &str) -> Result<JsonValue, String> {
let data: RedisResult<StreamInfoStreamReply> =
Commands::xinfo_stream(&mut self.con()?, key);
match data {
Ok(e) => {
let info = object! {
length:e.length,
groups:e.groups,
last_generated_id:e.last_generated_id.clone(),
first_entry:e.first_entry.id.clone(),
last_entry:e.last_entry.id.clone(),
radix_tree_keys:e.radix_tree_keys,
};
Ok(info)
}
Err(e) => Err(format!("{e}")),
}
}
pub fn subscribe(&mut self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
let mut con = self
.pubsub_client
.get_connection()
.map_err(|e| format!("PubSub connect error: {e}"))?;
con.set_read_timeout(Some(std::time::Duration::from_secs(30)))
.map_err(|e| format!("Set read timeout error: {e}"))?;
let mut pubsub = con.as_pubsub();
pubsub
.subscribe(&[key])
.map_err(|e| format!("Subscribe error: {e}"))?;
log::info!("Redis 订阅成功: {}", key);
loop {
match pubsub.get_message() {
Ok(msg) => {
let payload: String = msg.get_payload().unwrap_or_default();
if payload.is_empty() {
continue;
}
let json_value = json::parse(&payload).unwrap_or(JsonValue::new_object());
if tx.send(json_value).is_err() {
log::warn!("Redis 订阅通道已关闭: {}", key);
return Ok(());
}
}
Err(e) => {
if e.is_timeout() {
continue;
}
log::error!("Redis 订阅消息获取失败: {} - {}", key, e);
return Err(format!("Get message error: {e}"));
}
}
}
}
pub fn subscribe_with_reconnect(&self, key: &str, tx: Sender<JsonValue>) -> Result<(), String> {
let pubsub_client = self.pubsub_client.clone();
let key = key.to_string();
std::thread::spawn(move || loop {
let mut con = match pubsub_client.get_connection() {
Ok(c) => c,
Err(e) => {
log::error!("Redis PubSub 连接失败: {} - {}", key, e);
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
};
if let Err(e) = con.set_read_timeout(Some(std::time::Duration::from_secs(30))) {
log::error!("设置超时失败: {} - {}", key, e);
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
let mut pubsub = con.as_pubsub();
if let Err(e) = pubsub.subscribe(&[&key]) {
log::error!("Redis 订阅失败: {} - {}", key, e);
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
log::info!("Redis 订阅成功: {}", key);
loop {
match pubsub.get_message() {
Ok(msg) => {
let payload: String = msg.get_payload().unwrap_or_default();
if payload.is_empty() {
continue;
}
let json_value = json::parse(&payload).unwrap_or(JsonValue::new_object());
if tx.send(json_value).is_err() {
log::warn!("Redis 订阅通道已关闭: {}", key);
return;
}
}
Err(e) => {
if e.is_timeout() {
continue;
}
log::error!("Redis 订阅断开,准备重连: {} - {}", key, e);
break;
}
}
}
std::thread::sleep(std::time::Duration::from_secs(2));
});
Ok(())
}
pub fn publish(&mut self, key: &str, value: JsonValue) -> Result<bool, String> {
let data: RedisResult<usize> = Commands::publish(&mut self.con()?, key, value.to_string());
match data {
Ok(_) => Ok(true),
Err(e) => Err(format!("???{e}")),
}
}
}