tibba_cache/
pool.rs

1// Copyright 2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, new_redis_config};
16use async_trait::async_trait;
17use deadpool_redis::{PoolConfig, Status, Timeouts};
18use redis::aio::ConnectionLike;
19use redis::{Arg, Cmd, Pipeline, RedisFuture, Value};
20use std::time::Duration;
21use tibba_config::Config;
22use tracing::info;
23
24type Result<T> = std::result::Result<T, Error>;
25
26#[derive(Debug, Default)]
27pub struct RedisCmdStat {
28    pub cmd: String,
29    pub elapsed: Duration,
30    pub error: Option<String>,
31}
32
33pub type RedisCmdStatCallback = dyn Fn(RedisCmdStat) + Send + Sync;
34
35/// Redis connection pool enum that supports both single node and cluster configurations
36enum RedisPool {
37    /// Single Redis node connection pool
38    Single(deadpool_redis::Pool),
39    /// Redis cluster connection pool
40    Cluster(deadpool_redis::cluster::Pool),
41}
42
43pub struct RedisClient {
44    pool: RedisPool,
45    stat_callback: Option<&'static RedisCmdStatCallback>,
46}
47pub struct RedisClientConn {
48    conn: Box<dyn ConnectionLike + Send + Sync>,
49    stat_callback: Option<&'static RedisCmdStatCallback>,
50}
51
52impl RedisClient {
53    /// Gets a connection from the pool
54    /// # Returns
55    /// * `Ok(RedisConnection)` - A connection wrapper that works with both single and cluster modes
56    /// * `Err(Error)` - Failed to get connection from pool
57    #[inline]
58    pub async fn conn(&self) -> Result<RedisClientConn> {
59        let conn: Box<dyn ConnectionLike + Send + Sync> = match &self.pool {
60            RedisPool::Single(p) => Box::new(p.get().await.map_err(|e| Error::Common {
61                category: "connection".to_string(),
62                message: e.to_string(),
63            })?),
64            RedisPool::Cluster(p) => Box::new(p.get().await.map_err(|e| Error::Common {
65                category: "connection".to_string(),
66                message: e.to_string(),
67            })?),
68        };
69
70        Ok(RedisClientConn {
71            conn,
72            stat_callback: self.stat_callback,
73        })
74    }
75    pub fn with_stat_callback(&mut self, callback: &'static RedisCmdStatCallback) {
76        self.stat_callback = Some(callback);
77    }
78    /// Gets the status of the pool
79    /// # Returns
80    /// * `Status` - The status of the pool
81    pub fn status(&self) -> Status {
82        match &self.pool {
83            RedisPool::Single(p) => p.status(),
84            RedisPool::Cluster(p) => p.status(),
85        }
86    }
87    /// Closes the pool
88    /// # Notes
89    /// * This operation resizes the pool to 0
90    pub fn close(&self) {
91        match &self.pool {
92            RedisPool::Single(p) => p.close(),
93            RedisPool::Cluster(p) => p.close(),
94        }
95    }
96}
97
98#[inline]
99fn get_command_name(cmd: &Cmd) -> String {
100    if let Some(Arg::Simple(val)) = cmd.args_iter().next()
101        && let Ok(s) = std::str::from_utf8(val)
102    {
103        return s.to_string();
104    }
105    "unknown".to_string()
106}
107
108#[inline]
109fn wrap_with_stat<'a, 'cb, T>(
110    name: String,
111    fut: RedisFuture<'a, T>,
112    callback: &'cb RedisCmdStatCallback,
113) -> RedisFuture<'a, T>
114where
115    T: Send + 'a,
116    'cb: 'a,
117{
118    Box::pin(async move {
119        let start = std::time::Instant::now();
120        let res = fut.await;
121        let elapsed = start.elapsed();
122        let mut stat = RedisCmdStat {
123            cmd: name,
124            elapsed,
125            ..Default::default()
126        };
127        if let Err(e) = &res {
128            stat.error = Some(e.to_string());
129        }
130        callback(stat);
131        res
132    })
133}
134
135#[async_trait]
136impl ConnectionLike for RedisClientConn {
137    /// Executes a packed Redis command
138    /// # Arguments
139    /// * `cmd` - The Redis command to execute
140    /// # Returns
141    /// * `RedisFuture<Value>` - Future that resolves to the command result
142    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
143        if let Some(cb) = self.stat_callback {
144            let name = get_command_name(cmd);
145            let fut = self.conn.req_packed_command(cmd);
146            wrap_with_stat(name, fut, cb)
147        } else {
148            self.conn.req_packed_command(cmd)
149        }
150    }
151
152    /// Executes multiple packed Redis commands in a pipeline
153    /// # Arguments
154    /// * `cmd` - The pipeline of Redis commands
155    /// * `offset` - Starting offset in the pipeline
156    /// * `count` - Number of commands to execute
157    /// # Returns
158    /// * `RedisFuture<Vec<Value>>` - Future that resolves to multiple command results
159    fn req_packed_commands<'a>(
160        &'a mut self,
161        cmd: &'a Pipeline,
162        offset: usize,
163        count: usize,
164    ) -> RedisFuture<'a, Vec<Value>> {
165        if let Some(cb) = self.stat_callback {
166            let fut = self.conn.req_packed_commands(cmd, offset, count);
167            wrap_with_stat("pipeline".to_string(), fut, cb)
168        } else {
169            self.conn.req_packed_commands(cmd, offset, count)
170        }
171    }
172
173    /// Gets the current Redis database number
174    /// # Notes
175    /// * Always returns 0 as database selection is not supported in cluster mode
176    /// # Returns
177    /// * `i64` - The database number (always 0)
178    fn get_db(&self) -> i64 {
179        0
180    }
181}
182
183fn make_pool_config(redis_config: &super::RedisConfig) -> PoolConfig {
184    PoolConfig {
185        max_size: redis_config.pool_size as usize,
186        timeouts: Timeouts {
187            wait: Some(redis_config.wait_timeout),
188            create: Some(redis_config.connection_timeout),
189            recycle: Some(redis_config.recycle_timeout),
190        },
191        ..Default::default()
192    }
193}
194
195/// Creates a new Redis connection pool based on configuration
196/// # Arguments
197/// * `config` - Redis configuration including connection details and pool settings
198/// # Returns
199/// * `Ok(RedisClient)` - Successfully created pool (single or cluster)
200/// * `Err(Error)` - Failed to create pool
201/// # Notes
202/// * Creates a single node pool if only one node is configured
203/// * Creates a cluster pool if multiple nodes are configured
204/// * Configures pool size and various timeouts from the provided config
205pub fn new_redis_client(config: &Config) -> Result<RedisClient> {
206    let redis_config = new_redis_config(config)?;
207    let pool_config = make_pool_config(&redis_config);
208
209    let password = redis_config.password.clone().unwrap_or_default();
210    let nodes: Vec<_> = redis_config
211        .nodes
212        .clone()
213        .iter()
214        .map(|v| {
215            if password.is_empty() {
216                return v.to_string();
217            }
218            v.replace(&password, "***")
219        })
220        .collect();
221
222    let pool = if redis_config.nodes.len() <= 1 {
223        // Single node configuration
224        let mgr = deadpool_redis::Manager::new(redis_config.nodes[0].as_str()).map_err(|e| {
225            Error::Redis {
226                category: "new_pool".to_string(),
227                source: e,
228            }
229        })?;
230        let pool = deadpool_redis::Pool::builder(mgr)
231            .config(pool_config)
232            .runtime(deadpool_redis::Runtime::Tokio1)
233            .build()
234            .map_err(|e| Error::SingleBuild { source: e })?;
235        RedisPool::Single(pool)
236    } else {
237        // Cluster configuration
238        let mut cfg = deadpool_redis::cluster::Config::from_urls(redis_config.nodes.clone());
239        cfg.pool = Some(pool_config);
240        let pool = cfg
241            .create_pool(Some(deadpool_redis::cluster::Runtime::Tokio1))
242            .map_err(|e| Error::ClusterBuild { source: e })?;
243        RedisPool::Cluster(pool)
244    };
245    info!(
246        category = "redis",
247        nodes = nodes.join(","),
248        "connect to redis"
249    );
250    Ok(RedisClient {
251        pool,
252        stat_callback: None,
253    })
254}