1use super::{Error, new_redis_config};
16use async_trait::async_trait;
17use deadpool_redis::{PoolConfig, Status, Timeouts};
18use redis::aio::ConnectionLike;
19use redis::{Arg, Cmd, Pipeline, RedisFuture, Value};
20use std::time::Duration;
21use tibba_config::Config;
22use tracing::info;
23
24type Result<T> = std::result::Result<T, Error>;
25
26#[derive(Debug, Default)]
27pub struct RedisCmdStat {
28 pub cmd: String,
29 pub elapsed: Duration,
30 pub error: Option<String>,
31}
32
33pub type RedisCmdStatCallback = dyn Fn(RedisCmdStat) + Send + Sync;
34
35enum RedisPool {
37 Single(deadpool_redis::Pool),
39 Cluster(deadpool_redis::cluster::Pool),
41}
42
43pub struct RedisClient {
44 pool: RedisPool,
45 stat_callback: Option<&'static RedisCmdStatCallback>,
46}
47pub struct RedisClientConn {
48 conn: Box<dyn ConnectionLike + Send + Sync>,
49 stat_callback: Option<&'static RedisCmdStatCallback>,
50}
51
52impl RedisClient {
53 #[inline]
58 pub async fn conn(&self) -> Result<RedisClientConn> {
59 let conn: Box<dyn ConnectionLike + Send + Sync> = match &self.pool {
60 RedisPool::Single(p) => Box::new(p.get().await.map_err(|e| Error::Common {
61 category: "connection".to_string(),
62 message: e.to_string(),
63 })?),
64 RedisPool::Cluster(p) => Box::new(p.get().await.map_err(|e| Error::Common {
65 category: "connection".to_string(),
66 message: e.to_string(),
67 })?),
68 };
69
70 Ok(RedisClientConn {
71 conn,
72 stat_callback: self.stat_callback,
73 })
74 }
75 pub fn with_stat_callback(&mut self, callback: &'static RedisCmdStatCallback) {
76 self.stat_callback = Some(callback);
77 }
78 pub fn status(&self) -> Status {
82 match &self.pool {
83 RedisPool::Single(p) => p.status(),
84 RedisPool::Cluster(p) => p.status(),
85 }
86 }
87 pub fn close(&self) {
91 match &self.pool {
92 RedisPool::Single(p) => p.close(),
93 RedisPool::Cluster(p) => p.close(),
94 }
95 }
96}
97
98#[inline]
99fn get_command_name(cmd: &Cmd) -> String {
100 if let Some(Arg::Simple(val)) = cmd.args_iter().next()
101 && let Ok(s) = std::str::from_utf8(val)
102 {
103 return s.to_string();
104 }
105 "unknown".to_string()
106}
107
108#[inline]
109fn wrap_with_stat<'a, 'cb, T>(
110 name: String,
111 fut: RedisFuture<'a, T>,
112 callback: &'cb RedisCmdStatCallback,
113) -> RedisFuture<'a, T>
114where
115 T: Send + 'a,
116 'cb: 'a,
117{
118 Box::pin(async move {
119 let start = std::time::Instant::now();
120 let res = fut.await;
121 let elapsed = start.elapsed();
122 let mut stat = RedisCmdStat {
123 cmd: name,
124 elapsed,
125 ..Default::default()
126 };
127 if let Err(e) = &res {
128 stat.error = Some(e.to_string());
129 }
130 callback(stat);
131 res
132 })
133}
134
135#[async_trait]
136impl ConnectionLike for RedisClientConn {
137 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
143 if let Some(cb) = self.stat_callback {
144 let name = get_command_name(cmd);
145 let fut = self.conn.req_packed_command(cmd);
146 wrap_with_stat(name, fut, cb)
147 } else {
148 self.conn.req_packed_command(cmd)
149 }
150 }
151
152 fn req_packed_commands<'a>(
160 &'a mut self,
161 cmd: &'a Pipeline,
162 offset: usize,
163 count: usize,
164 ) -> RedisFuture<'a, Vec<Value>> {
165 if let Some(cb) = self.stat_callback {
166 let fut = self.conn.req_packed_commands(cmd, offset, count);
167 wrap_with_stat("pipeline".to_string(), fut, cb)
168 } else {
169 self.conn.req_packed_commands(cmd, offset, count)
170 }
171 }
172
173 fn get_db(&self) -> i64 {
179 0
180 }
181}
182
183fn make_pool_config(redis_config: &super::RedisConfig) -> PoolConfig {
184 PoolConfig {
185 max_size: redis_config.pool_size as usize,
186 timeouts: Timeouts {
187 wait: Some(redis_config.wait_timeout),
188 create: Some(redis_config.connection_timeout),
189 recycle: Some(redis_config.recycle_timeout),
190 },
191 ..Default::default()
192 }
193}
194
195pub fn new_redis_client(config: &Config) -> Result<RedisClient> {
206 let redis_config = new_redis_config(config)?;
207 let pool_config = make_pool_config(&redis_config);
208
209 let password = redis_config.password.clone().unwrap_or_default();
210 let nodes: Vec<_> = redis_config
211 .nodes
212 .clone()
213 .iter()
214 .map(|v| {
215 if password.is_empty() {
216 return v.to_string();
217 }
218 v.replace(&password, "***")
219 })
220 .collect();
221
222 let pool = if redis_config.nodes.len() <= 1 {
223 let mgr = deadpool_redis::Manager::new(redis_config.nodes[0].as_str()).map_err(|e| {
225 Error::Redis {
226 category: "new_pool".to_string(),
227 source: e,
228 }
229 })?;
230 let pool = deadpool_redis::Pool::builder(mgr)
231 .config(pool_config)
232 .runtime(deadpool_redis::Runtime::Tokio1)
233 .build()
234 .map_err(|e| Error::SingleBuild { source: e })?;
235 RedisPool::Single(pool)
236 } else {
237 let mut cfg = deadpool_redis::cluster::Config::from_urls(redis_config.nodes.clone());
239 cfg.pool = Some(pool_config);
240 let pool = cfg
241 .create_pool(Some(deadpool_redis::cluster::Runtime::Tokio1))
242 .map_err(|e| Error::ClusterBuild { source: e })?;
243 RedisPool::Cluster(pool)
244 };
245 info!(
246 category = "redis",
247 nodes = nodes.join(","),
248 "connect to redis"
249 );
250 Ok(RedisClient {
251 pool,
252 stat_callback: None,
253 })
254}