multi_tier_cache/backends/
redis_cache.rs1use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13pub struct RedisCache {
22 conn_manager: ConnectionManager,
24 hits: Arc<AtomicU64>,
26 misses: Arc<AtomicU64>,
28 sets: Arc<AtomicU64>,
30}
31
32impl RedisCache {
33 pub async fn new() -> Result<Self> {
35 println!(" 🔴 Initializing Redis Cache (with ConnectionManager)...");
36
37 let redis_url = std::env::var("REDIS_URL")
39 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
40
41 let client = Client::open(redis_url.as_str())?;
42
43 let conn_manager = ConnectionManager::new(client).await?;
45
46 let mut conn = conn_manager.clone();
48 let _: String = redis::cmd("PING").query_async(&mut conn).await?;
49
50 println!(" ✅ Redis Cache connected at {} (ConnectionManager enabled)", redis_url);
51
52 Ok(Self {
53 conn_manager,
54 hits: Arc::new(AtomicU64::new(0)),
55 misses: Arc::new(AtomicU64::new(0)),
56 sets: Arc::new(AtomicU64::new(0)),
57 })
58 }
59
60 pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
62 let mut conn = self.conn_manager.clone();
63
64 match conn.get::<_, String>(key).await {
65 Ok(json_str) => {
66 match serde_json::from_str(&json_str) {
67 Ok(value) => {
68 self.hits.fetch_add(1, Ordering::Relaxed);
69 Some(value)
70 }
71 Err(_) => {
72 self.misses.fetch_add(1, Ordering::Relaxed);
73 None
74 }
75 }
76 }
77 Err(_) => {
78 self.misses.fetch_add(1, Ordering::Relaxed);
79 None
80 }
81 }
82 }
83
84 pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
89 let mut conn = self.conn_manager.clone();
90
91 let json_str: String = match conn.get(key).await {
93 Ok(s) => s,
94 Err(_) => {
95 self.misses.fetch_add(1, Ordering::Relaxed);
96 return None;
97 }
98 };
99
100 let value: serde_json::Value = match serde_json::from_str(&json_str) {
102 Ok(v) => v,
103 Err(_) => {
104 self.misses.fetch_add(1, Ordering::Relaxed);
105 return None;
106 }
107 };
108
109 let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
111 Ok(ttl) => ttl,
112 Err(_) => -1, };
114
115 self.hits.fetch_add(1, Ordering::Relaxed);
116
117 let ttl = if ttl_secs > 0 {
118 Some(Duration::from_secs(ttl_secs as u64))
119 } else {
120 None };
122
123 Some((value, ttl))
124 }
125
126 pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
128 let json_str = serde_json::to_string(&value)?;
129 let mut conn = self.conn_manager.clone();
130
131 let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
132 self.sets.fetch_add(1, Ordering::Relaxed);
133 println!("💾 [Redis] Cached '{}' with TTL {:?}", key, ttl);
134 Ok(())
135 }
136
137 pub async fn remove(&self, key: &str) -> Result<()> {
139 let mut conn = self.conn_manager.clone();
140 let _: () = conn.del(key).await?;
141 Ok(())
142 }
143
144 pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
169 let mut conn = self.conn_manager.clone();
170 let mut keys = Vec::new();
171 let mut cursor: u64 = 0;
172
173 loop {
174 let result: (u64, Vec<String>) = redis::cmd("SCAN")
176 .arg(cursor)
177 .arg("MATCH")
178 .arg(pattern)
179 .arg("COUNT")
180 .arg(100) .query_async(&mut conn)
182 .await?;
183
184 cursor = result.0;
185 keys.extend(result.1);
186
187 if cursor == 0 {
189 break;
190 }
191 }
192
193 println!("🔍 [Redis] Scanned keys matching '{}': {} found", pattern, keys.len());
194 Ok(keys)
195 }
196
197 pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
201 if keys.is_empty() {
202 return Ok(0);
203 }
204
205 let mut conn = self.conn_manager.clone();
206 let count: usize = conn.del(keys).await?;
207 println!("🗑️ [Redis] Removed {} keys", count);
208 Ok(count)
209 }
210
211 pub async fn health_check(&self) -> bool {
213 let test_key = "health_check_redis";
214 let timestamp = std::time::SystemTime::now()
215 .duration_since(std::time::UNIX_EPOCH)
216 .unwrap_or(Duration::from_secs(0))
217 .as_secs();
218 let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
219
220 match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
221 Ok(_) => {
222 match self.get(test_key).await {
223 Some(retrieved) => {
224 let _ = self.remove(test_key).await;
225 retrieved["test"].as_bool().unwrap_or(false)
226 }
227 None => false
228 }
229 }
230 Err(_) => false
231 }
232 }
233}
234
235use crate::traits::{CacheBackend, L2CacheBackend};
238use async_trait::async_trait;
239
240#[async_trait]
244impl CacheBackend for RedisCache {
245 async fn get(&self, key: &str) -> Option<serde_json::Value> {
246 RedisCache::get(self, key).await
247 }
248
249 async fn set_with_ttl(
250 &self,
251 key: &str,
252 value: serde_json::Value,
253 ttl: Duration,
254 ) -> Result<()> {
255 RedisCache::set_with_ttl(self, key, value, ttl).await
256 }
257
258 async fn remove(&self, key: &str) -> Result<()> {
259 RedisCache::remove(self, key).await
260 }
261
262 async fn health_check(&self) -> bool {
263 RedisCache::health_check(self).await
264 }
265
266 fn name(&self) -> &str {
267 "Redis"
268 }
269}
270
271#[async_trait]
275impl L2CacheBackend for RedisCache {
276 async fn get_with_ttl(
277 &self,
278 key: &str,
279 ) -> Option<(serde_json::Value, Option<Duration>)> {
280 RedisCache::get_with_ttl(self, key).await
281 }
282}