multi_tier_cache/backends/
redis_cache.rs1use std::sync::Arc;
6use std::time::Duration;
7use anyhow::{Context, Result};
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tracing::{info, debug};
13
14pub struct RedisCache {
23 conn_manager: ConnectionManager,
25 hits: Arc<AtomicU64>,
27 misses: Arc<AtomicU64>,
29 sets: Arc<AtomicU64>,
31}
32
33impl RedisCache {
34 pub async fn new() -> Result<Self> {
36 let redis_url = std::env::var("REDIS_URL")
37 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
38 Self::with_url(&redis_url).await
39 }
40
41 pub async fn with_url(redis_url: &str) -> Result<Self> {
47 info!(redis_url = %redis_url, "Initializing Redis Cache with ConnectionManager");
48
49 let client = Client::open(redis_url)
50 .with_context(|| format!("Failed to create Redis client with URL: {}", redis_url))?;
51
52 let conn_manager = ConnectionManager::new(client).await
54 .context("Failed to establish Redis connection manager")?;
55
56 let mut conn = conn_manager.clone();
58 let _: String = redis::cmd("PING")
59 .query_async(&mut conn)
60 .await
61 .context("Redis PING health check failed")?;
62
63 info!(redis_url = %redis_url, "Redis Cache connected successfully (ConnectionManager enabled)");
64
65 Ok(Self {
66 conn_manager,
67 hits: Arc::new(AtomicU64::new(0)),
68 misses: Arc::new(AtomicU64::new(0)),
69 sets: Arc::new(AtomicU64::new(0)),
70 })
71 }
72
73
74 pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
99 let mut conn = self.conn_manager.clone();
100 let mut keys = Vec::new();
101 let mut cursor: u64 = 0;
102
103 loop {
104 let result: (u64, Vec<String>) = redis::cmd("SCAN")
106 .arg(cursor)
107 .arg("MATCH")
108 .arg(pattern)
109 .arg("COUNT")
110 .arg(100) .query_async(&mut conn)
112 .await?;
113
114 cursor = result.0;
115 keys.extend(result.1);
116
117 if cursor == 0 {
119 break;
120 }
121 }
122
123 debug!(pattern = %pattern, count = keys.len(), "[Redis] Scanned keys matching pattern");
124 Ok(keys)
125 }
126
127 pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
131 if keys.is_empty() {
132 return Ok(0);
133 }
134
135 let mut conn = self.conn_manager.clone();
136 let count: usize = conn.del(keys).await?;
137 debug!(count = count, "[Redis] Removed keys in bulk");
138 Ok(count)
139 }
140
141}
142
143use crate::traits::{CacheBackend, L2CacheBackend};
146use async_trait::async_trait;
147
148#[async_trait]
152impl CacheBackend for RedisCache {
153 async fn get(&self, key: &str) -> Option<serde_json::Value> {
154 let mut conn = self.conn_manager.clone();
155
156 match conn.get::<_, String>(key).await {
157 Ok(json_str) => {
158 match serde_json::from_str(&json_str) {
159 Ok(value) => {
160 self.hits.fetch_add(1, Ordering::Relaxed);
161 Some(value)
162 }
163 Err(_) => {
164 self.misses.fetch_add(1, Ordering::Relaxed);
165 None
166 }
167 }
168 }
169 Err(_) => {
170 self.misses.fetch_add(1, Ordering::Relaxed);
171 None
172 }
173 }
174 }
175
176 async fn set_with_ttl(
177 &self,
178 key: &str,
179 value: serde_json::Value,
180 ttl: Duration,
181 ) -> Result<()> {
182 let json_str = serde_json::to_string(&value)?;
183 let mut conn = self.conn_manager.clone();
184
185 let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
186 self.sets.fetch_add(1, Ordering::Relaxed);
187 debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Redis] Cached key with TTL");
188 Ok(())
189 }
190
191 async fn remove(&self, key: &str) -> Result<()> {
192 let mut conn = self.conn_manager.clone();
193 let _: () = conn.del(key).await?;
194 Ok(())
195 }
196
197 async fn health_check(&self) -> bool {
198 let test_key = "health_check_redis";
199 let timestamp = std::time::SystemTime::now()
200 .duration_since(std::time::UNIX_EPOCH)
201 .unwrap_or(Duration::from_secs(0))
202 .as_secs();
203 let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
204
205 match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
206 Ok(_) => {
207 match self.get(test_key).await {
208 Some(retrieved) => {
209 let _ = self.remove(test_key).await;
210 retrieved["test"].as_bool().unwrap_or(false)
211 }
212 None => false
213 }
214 }
215 Err(_) => false
216 }
217 }
218
219 fn name(&self) -> &str {
220 "Redis"
221 }
222}
223
224#[async_trait]
228impl L2CacheBackend for RedisCache {
229 async fn get_with_ttl(
230 &self,
231 key: &str,
232 ) -> Option<(serde_json::Value, Option<Duration>)> {
233 let mut conn = self.conn_manager.clone();
234
235 let json_str: String = match conn.get(key).await {
237 Ok(s) => s,
238 Err(_) => {
239 self.misses.fetch_add(1, Ordering::Relaxed);
240 return None;
241 }
242 };
243
244 let value: serde_json::Value = match serde_json::from_str(&json_str) {
246 Ok(v) => v,
247 Err(_) => {
248 self.misses.fetch_add(1, Ordering::Relaxed);
249 return None;
250 }
251 };
252
253 let ttl_secs: i64 = redis::cmd("TTL")
255 .arg(key)
256 .query_async(&mut conn)
257 .await
258 .unwrap_or(-1);
259
260 self.hits.fetch_add(1, Ordering::Relaxed);
261
262 let ttl = if ttl_secs > 0 {
263 Some(Duration::from_secs(ttl_secs as u64))
264 } else {
265 None };
267
268 Some((value, ttl))
269 }
270}