1use log::{error, info};
5use once_cell::sync::OnceCell;
6use r2d2::{Error, Pool};
7use r2d2_redis::{r2d2, RedisConnectionManager, redis::Commands};
8use r2d2_redis::redis::RedisError;
9use redis::{Client, PubSubCommands};
10use crate::{Connection as CacheConnection};
11
12pub static REDIS_POOL: OnceCell<r2d2::Pool<RedisConnectionManager>> = OnceCell::new();
14
15pub fn init_redis_pool(connection: CacheConnection, max_size: u32, min_idle: u32) -> Result<bool, Box<dyn std::error::Error>> {
18 let redis_url = if connection.userpass.is_empty() {
19 format!("redis://{}:{}/", connection.hostname, connection.hostport)
20 } else {
21 format!("redis://:{}@{}:{}/", connection.userpass, connection.hostname, connection.hostport)
22 };
23
24 let manager = match RedisConnectionManager::new(redis_url) {
25 Ok(v) => v,
26 Err(err) => {
27 error!("Redis 连接失败: {}", err);
28 return Err(err.into())
29 }
30
31 };
32 let pool = match r2d2::Pool::builder()
33 .max_size(max_size) .min_idle(Some(min_idle)) .build(manager) {
36 Ok(v) => v,
37 Err(err) => {
38 error!("Redis 连接池创建失败: {}", err);
39 return Err(err.into())
40 }
41
42 };
43
44 REDIS_POOL.set(pool).map_err(|_returned_pool| {
45 error!("Redis 连接池设置失败(可能已经初始化过)");
46 Box::new(std::io::Error::new(
47 std::io::ErrorKind::Other,
48 "Redis连接池设置失败"
49 )) as Box<dyn std::error::Error>
50 })?;
51
52 info!("----- Redis 连接池已初始化 -----");
53 Ok(true)
54}
55
56pub fn publish_notify(channel: &str, payload: &str, ) -> Result<usize, Box<dyn std::error::Error>> {
59 let pool = REDIS_POOL.get().ok_or("Redis 连接池获取失败")?;
61 let mut conn = pool.get()?;
63 let receivers: usize = conn.publish(channel, payload)?;
65 Ok(receivers)
66}
67
68pub fn subscribe_notify<F>(connection: CacheConnection, channel: &str, mut handler: F) -> Result<(), Box<dyn std::error::Error>>
71where
72 F: FnMut(String) + Send + 'static,
73{
74 let redis_url = if connection.userpass.is_empty() {
75 format!("redis://{}:{}/", connection.hostname, connection.hostport)
76 } else {
77 format!("redis://:{}@{}:{}/", connection.userpass, connection.hostname, connection.hostport)
78 };
79
80 let client = Client::open(redis_url)?;
81
82 let channel = channel.to_string();
84
85 std::thread::spawn(move || {
86 let mut conn = match client.get_connection() {
88 Ok(conn) => conn,
89 Err(e) => {
90 eprintln!("连接失败: {}", e);
91 return;
92 }
93 };
94
95 let mut pub_sub = conn.as_pubsub();
96 if let Err(e) = pub_sub.subscribe(&channel) {
97 eprintln!("订阅失败: {}", e);
98 return;
99 }
100
101 loop {
102 match pub_sub.get_message() {
103 Ok(msg) => {
104 if let Ok(payload) = msg.get_payload::<String>() {
105 handler(payload);
106 }
107 }
108 Err(e) => {
109 eprintln!("订阅错误: {}", e);
110 break;
111 }
112 }
113 }
114 });
115
116 Ok(())
117}