br_cache/
redis_pool.rs

1//! 连接池 + 发布/订阅(v2)
2//! 不影响现有 redis.rs 的逻辑
3
4use log::{error, info};
5use once_cell::sync::OnceCell;
6use r2d2::{Error, Pool};
7use r2d2_redis::{r2d2, RedisConnectionManager, redis::Commands};
8use r2d2_redis::redis::RedisError;
9use redis::{Client, PubSubCommands};
10use crate::{Connection as CacheConnection};
11
12/// 全局 Redis 连接池
13pub static REDIS_POOL: OnceCell<r2d2::Pool<RedisConnectionManager>> = OnceCell::new();
14
15/// 初始化 Redis 连接池
16/// let redis_url = "redis://127.0.0.1/";
17pub fn init_redis_pool(connection: CacheConnection, max_size: u32, min_idle: u32) -> Result<bool, Box<dyn std::error::Error>> {
18    let redis_url = if connection.userpass.is_empty() {
19        format!("redis://{}:{}/", connection.hostname, connection.hostport)
20    } else {
21        format!("redis://:{}@{}:{}/", connection.userpass, connection.hostname, connection.hostport)
22    };
23
24    let manager = match RedisConnectionManager::new(redis_url) {
25        Ok(v) => v,
26        Err(err) => {
27            error!("Redis 连接失败: {}", err);
28            return Err(err.into())
29        }
30
31    };
32    let pool = match r2d2::Pool::builder()
33        .max_size(max_size)       // 最大连接数
34        .min_idle(Some(min_idle))  // 预热连接
35        .build(manager) {
36        Ok(v) => v,
37        Err(err) => {
38            error!("Redis 连接池创建失败: {}", err);
39            return Err(err.into())
40        }
41
42    };
43
44    REDIS_POOL.set(pool).map_err(|_returned_pool| {
45        error!("Redis 连接池设置失败(可能已经初始化过)");
46        Box::new(std::io::Error::new(
47            std::io::ErrorKind::Other,
48            "Redis连接池设置失败"
49        )) as Box<dyn std::error::Error>
50    })?;
51
52    info!("----- Redis 连接池已初始化 -----");
53    Ok(true)
54}
55
56/// 用连接池发布消息
57///
58pub fn publish_notify(channel: &str, payload: &str, ) -> Result<usize, Box<dyn std::error::Error>> {
59    // 先拿到全局 Pool
60    let pool = REDIS_POOL.get().ok_or("Redis 连接池获取失败")?;
61    // 再从池里获取连接
62    let mut conn = pool.get()?;
63    // 调用 publish
64    let receivers: usize = conn.publish(channel, payload)?;
65    Ok(receivers)
66}
67
68/// 使用独立连接订阅消息 - 不走连接池(没必要,在调用订阅处用一个长连接保持就行)
69///
70pub fn subscribe_notify<F>(connection: CacheConnection, channel: &str, mut handler: F) -> Result<(), Box<dyn std::error::Error>>
71where
72    F: FnMut(String) + Send + 'static,
73{
74    let redis_url = if connection.userpass.is_empty() {
75        format!("redis://{}:{}/", connection.hostname, connection.hostport)
76    } else {
77        format!("redis://:{}@{}:{}/", connection.userpass, connection.hostname, connection.hostport)
78    };
79    
80    let client = Client::open(redis_url)?;
81
82    // 在线程内部建立连接,确保连接的生命周期与线程一致
83    let channel = channel.to_string();
84
85    std::thread::spawn(move || {
86        // 在线程内部创建连接
87        let mut conn = match client.get_connection() {
88            Ok(conn) => conn,
89            Err(e) => {
90                eprintln!("连接失败: {}", e);
91                return;
92            }
93        };
94
95        let mut pub_sub = conn.as_pubsub();
96        if let Err(e) = pub_sub.subscribe(&channel) {
97            eprintln!("订阅失败: {}", e);
98            return;
99        }
100
101        loop {
102            match pub_sub.get_message() {
103                Ok(msg) => {
104                    if let Ok(payload) = msg.get_payload::<String>() {
105                        handler(payload);
106                    }
107                }
108                Err(e) => {
109                    eprintln!("订阅错误: {}", e);
110                    break;
111                }
112            }
113        }
114    });
115
116    Ok(())
117}