multi_tier_cache/backends/
redis_cache.rs1use anyhow::{Context, Result};
6use redis::aio::ConnectionManager;
7use redis::{AsyncCommands, Client};
8use serde_json;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tracing::{debug, info};
13
14pub struct RedisCache {
23 conn_manager: ConnectionManager,
25 hits: Arc<AtomicU64>,
27 misses: Arc<AtomicU64>,
29 sets: Arc<AtomicU64>,
31}
32
33impl RedisCache {
34 pub async fn new() -> Result<Self> {
39 let redis_url =
40 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
41 Self::with_url(&redis_url).await
42 }
43
44 pub async fn with_url(redis_url: &str) -> Result<Self> {
53 info!(redis_url = %redis_url, "Initializing Redis Cache with ConnectionManager");
54
55 let client = Client::open(redis_url)
56 .with_context(|| format!("Failed to create Redis client with URL: {redis_url}"))?;
57
58 let conn_manager = ConnectionManager::new(client)
60 .await
61 .context("Failed to establish Redis connection manager")?;
62
63 let mut conn = conn_manager.clone();
65 let _: String = redis::cmd("PING")
66 .query_async(&mut conn)
67 .await
68 .context("Redis PING health check failed")?;
69
70 info!(redis_url = %redis_url, "Redis Cache connected successfully (ConnectionManager enabled)");
71
72 Ok(Self {
73 conn_manager,
74 hits: Arc::new(AtomicU64::new(0)),
75 misses: Arc::new(AtomicU64::new(0)),
76 sets: Arc::new(AtomicU64::new(0)),
77 })
78 }
79
80 pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
108 let mut conn = self.conn_manager.clone();
109 let mut keys = Vec::new();
110 let mut cursor: u64 = 0;
111
112 loop {
113 let result: (u64, Vec<String>) = redis::cmd("SCAN")
115 .arg(cursor)
116 .arg("MATCH")
117 .arg(pattern)
118 .arg("COUNT")
119 .arg(100) .query_async(&mut conn)
121 .await?;
122
123 cursor = result.0;
124 keys.extend(result.1);
125
126 if cursor == 0 {
128 break;
129 }
130 }
131
132 debug!(pattern = %pattern, count = keys.len(), "[Redis] Scanned keys matching pattern");
133 Ok(keys)
134 }
135
136 pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
143 if keys.is_empty() {
144 return Ok(0);
145 }
146
147 let mut conn = self.conn_manager.clone();
148 let count: usize = conn.del(keys).await?;
149 debug!(count = count, "[Redis] Removed keys in bulk");
150 Ok(count)
151 }
152}
153
154use crate::traits::{CacheBackend, L2CacheBackend};
157use async_trait::async_trait;
158
159#[async_trait]
163impl CacheBackend for RedisCache {
164 async fn get(&self, key: &str) -> Option<serde_json::Value> {
165 let mut conn = self.conn_manager.clone();
166
167 if let Ok(json_str) = conn.get::<_, String>(key).await {
168 if let Ok(value) = serde_json::from_str(&json_str) {
169 self.hits.fetch_add(1, Ordering::Relaxed);
170 Some(value)
171 } else {
172 self.misses.fetch_add(1, Ordering::Relaxed);
173 None
174 }
175 } else {
176 self.misses.fetch_add(1, Ordering::Relaxed);
177 None
178 }
179 }
180
181 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
182 let json_str = serde_json::to_string(&value)?;
183 let mut conn = self.conn_manager.clone();
184
185 let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
186 self.sets.fetch_add(1, Ordering::Relaxed);
187 debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Redis] Cached key with TTL");
188 Ok(())
189 }
190
191 async fn remove(&self, key: &str) -> Result<()> {
192 let mut conn = self.conn_manager.clone();
193 let _: () = conn.del(key).await?;
194 Ok(())
195 }
196
197 async fn health_check(&self) -> bool {
198 let test_key = "health_check_redis";
199 let timestamp = std::time::SystemTime::now()
200 .duration_since(std::time::UNIX_EPOCH)
201 .unwrap_or(Duration::from_secs(0))
202 .as_secs();
203 let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
204
205 match self
206 .set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10))
207 .await
208 {
209 Ok(()) => match self.get(test_key).await {
210 Some(retrieved) => {
211 let _ = self.remove(test_key).await;
212 retrieved
213 .get("test")
214 .and_then(serde_json::Value::as_bool)
215 .unwrap_or(false)
216 }
217 None => false,
218 },
219 Err(_) => false,
220 }
221 }
222
223 fn name(&self) -> &'static str {
224 "Redis"
225 }
226}
227
228#[async_trait]
232impl L2CacheBackend for RedisCache {
233 async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
234 let mut conn = self.conn_manager.clone();
235
236 let json_str: String = if let Ok(s) = conn.get(key).await {
238 s
239 } else {
240 self.misses.fetch_add(1, Ordering::Relaxed);
241 return None;
242 };
243
244 let value: serde_json::Value = if let Ok(v) = serde_json::from_str(&json_str) {
246 v
247 } else {
248 self.misses.fetch_add(1, Ordering::Relaxed);
249 return None;
250 };
251
252 let ttl_secs: i64 = redis::cmd("TTL")
254 .arg(key)
255 .query_async(&mut conn)
256 .await
257 .unwrap_or(-1);
258
259 self.hits.fetch_add(1, Ordering::Relaxed);
260
261 let ttl = if ttl_secs > 0 {
262 Some(Duration::from_secs(ttl_secs.unsigned_abs()))
263 } else {
264 None };
266
267 Some((value, ttl))
268 }
269}