multi_tier_cache/backends/
redis_cache.rs1use crate::error::CacheResult;
2use crate::traits::{CacheBackend, L2CacheBackend};
3use bytes::Bytes;
4use futures_util::future::BoxFuture;
5use redis::aio::ConnectionManager;
6use redis::{AsyncCommands, Client};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::Duration;
10use tracing::{debug, info};
11
12pub struct RedisCache {
14 conn_manager: ConnectionManager,
16 hits: Arc<AtomicU64>,
18 misses: Arc<AtomicU64>,
20 sets: Arc<AtomicU64>,
22}
23
24impl RedisCache {
25 pub async fn new() -> CacheResult<Self> {
31 let redis_url =
32 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33 Self::with_url(&redis_url).await
34 }
35
36 pub async fn with_url(redis_url: &str) -> CacheResult<Self> {
42 info!(redis_url = %redis_url, "Initializing Redis Cache with ConnectionManager");
43
44 let client = Client::open(redis_url).map_err(|e| {
45 crate::error::CacheError::ConfigError(format!("Failed to create Redis client: {e}"))
46 })?;
47
48 let conn_manager = ConnectionManager::new(client).await.map_err(|e| {
49 crate::error::CacheError::BackendError(format!(
50 "Failed to establish Redis connection manager: {e}"
51 ))
52 })?;
53
54 let mut conn = conn_manager.clone();
55 let _: String = redis::cmd("PING")
56 .query_async(&mut conn)
57 .await
58 .map_err(|e| {
59 crate::error::CacheError::BackendError(format!(
60 "Redis PING health check failed: {e}"
61 ))
62 })?;
63
64 info!(redis_url = %redis_url, "Redis Cache connected successfully");
65
66 Ok(Self {
67 conn_manager,
68 hits: Arc::new(AtomicU64::new(0)),
69 misses: Arc::new(AtomicU64::new(0)),
70 sets: Arc::new(AtomicU64::new(0)),
71 })
72 }
73
74 pub async fn scan_keys(&self, pattern: &str) -> CacheResult<Vec<String>> {
80 let mut conn = self.conn_manager.clone();
81 let mut keys = Vec::new();
82 let mut cursor: u64 = 0;
83
84 loop {
85 let result: (u64, Vec<String>) = redis::cmd("SCAN")
86 .arg(cursor)
87 .arg("MATCH")
88 .arg(pattern)
89 .arg("COUNT")
90 .arg(100)
91 .query_async(&mut conn)
92 .await?;
93
94 cursor = result.0;
95 keys.extend(result.1);
96
97 if cursor == 0 {
98 break;
99 }
100 }
101
102 debug!(pattern = %pattern, count = keys.len(), "[Redis] Scanned keys matching pattern");
103 Ok(keys)
104 }
105
106 pub async fn remove_bulk(&self, keys: &[String]) -> CacheResult<usize> {
112 if keys.is_empty() {
113 return Ok(0);
114 }
115
116 let mut conn = self.conn_manager.clone();
117 let count: usize = conn.del(keys).await?;
118 debug!(count = count, "[Redis] Removed keys in bulk");
119 Ok(count)
120 }
121}
122
123impl CacheBackend for RedisCache {
127 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
128 Box::pin(async move {
129 let mut conn = self.conn_manager.clone();
130 let result: redis::RedisResult<Vec<u8>> = conn.get(key).await;
131 match result {
132 Ok(bytes) if !bytes.is_empty() => {
133 self.hits.fetch_add(1, Ordering::Relaxed);
134 Some(Bytes::from(bytes))
135 }
136 _ => {
137 self.misses.fetch_add(1, Ordering::Relaxed);
138 None
139 }
140 }
141 })
142 }
143
144 fn set_with_ttl<'a>(
145 &'a self,
146 key: &'a str,
147 value: Bytes,
148 ttl: Duration,
149 ) -> BoxFuture<'a, CacheResult<()>> {
150 Box::pin(async move {
151 let mut conn = self.conn_manager.clone();
152 let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX);
153 let result = conn.pset_ex(key, value.to_vec(), ttl_ms).await;
154 if result.is_ok() {
155 self.sets.fetch_add(1, Ordering::Relaxed);
156 debug!(key = %key, ttl_ms = %ttl.as_millis(), "[Redis] Cached key bytes with TTL");
157 }
158 result.map_err(|e| {
159 crate::error::CacheError::BackendError(format!("Redis set failed: {e}"))
160 })
161 })
162 }
163
164 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
165 Box::pin(async move {
166 let mut conn = self.conn_manager.clone();
167 let _: usize = conn.del(key).await?;
168 Ok(())
169 })
170 }
171
172 fn health_check(&self) -> BoxFuture<'_, bool> {
173 Box::pin(async move {
174 let mut conn = self.conn_manager.clone();
175 let result: redis::RedisResult<String> =
176 redis::cmd("PING").query_async(&mut conn).await;
177 result.is_ok()
178 })
179 }
180
181 fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
182 Box::pin(async move {
183 let keys = self.scan_keys(pattern).await?;
184 if !keys.is_empty() {
185 self.remove_bulk(&keys).await?;
186 }
187 Ok(())
188 })
189 }
190
191 fn name(&self) -> &'static str {
192 "Redis"
193 }
194}
195
196impl L2CacheBackend for RedisCache {
197 fn get_with_ttl<'a>(
198 &'a self,
199 key: &'a str,
200 ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
201 Box::pin(async move {
202 let mut conn = self.conn_manager.clone();
203 let (bytes, ttl_secs): (Option<Vec<u8>>, i64) =
205 match redis::pipe().get(key).ttl(key).query_async(&mut conn).await {
206 Ok(res) => res,
207 Err(_) => return None,
208 };
209
210 match bytes {
211 Some(b) if !b.is_empty() => {
212 self.hits.fetch_add(1, Ordering::Relaxed);
213 let ttl = if ttl_secs > 0 {
214 Some(Duration::from_secs(ttl_secs.unsigned_abs()))
215 } else {
216 None
217 };
218 Some((Bytes::from(b), ttl))
219 }
220 _ => {
221 self.misses.fetch_add(1, Ordering::Relaxed);
222 None
223 }
224 }
225 })
226 }
227}