multi_tier_cache/
l2_cache.rs1use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13pub struct L2Cache {
15 conn_manager: ConnectionManager,
17 hits: Arc<AtomicU64>,
19 misses: Arc<AtomicU64>,
21 sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26 pub async fn new() -> Result<Self> {
28 println!(" 🔴 Initializing L2 Cache (Redis with ConnectionManager)...");
29
30 let redis_url = std::env::var("REDIS_URL")
32 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34 let client = Client::open(redis_url.as_str())?;
35
36 let conn_manager = ConnectionManager::new(client).await?;
38
39 let mut conn = conn_manager.clone();
41 let _: String = redis::cmd("PING").query_async(&mut conn).await?;
42
43 println!(" ✅ L2 Cache connected to Redis at {} (ConnectionManager enabled)", redis_url);
44
45 Ok(Self {
46 conn_manager,
47 hits: Arc::new(AtomicU64::new(0)),
48 misses: Arc::new(AtomicU64::new(0)),
49 sets: Arc::new(AtomicU64::new(0)),
50 })
51 }
52
53 pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
55 let mut conn = self.conn_manager.clone();
56
57 match conn.get::<_, String>(key).await {
58 Ok(json_str) => {
59 match serde_json::from_str(&json_str) {
60 Ok(value) => {
61 self.hits.fetch_add(1, Ordering::Relaxed);
62 Some(value)
63 }
64 Err(_) => {
65 self.misses.fetch_add(1, Ordering::Relaxed);
66 None
67 }
68 }
69 }
70 Err(_) => {
71 self.misses.fetch_add(1, Ordering::Relaxed);
72 None
73 }
74 }
75 }
76
77 pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
82 let mut conn = self.conn_manager.clone();
83
84 let json_str: String = match conn.get(key).await {
86 Ok(s) => s,
87 Err(_) => {
88 self.misses.fetch_add(1, Ordering::Relaxed);
89 return None;
90 }
91 };
92
93 let value: serde_json::Value = match serde_json::from_str(&json_str) {
95 Ok(v) => v,
96 Err(_) => {
97 self.misses.fetch_add(1, Ordering::Relaxed);
98 return None;
99 }
100 };
101
102 let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
104 Ok(ttl) => ttl,
105 Err(_) => -1, };
107
108 self.hits.fetch_add(1, Ordering::Relaxed);
109
110 let ttl = if ttl_secs > 0 {
111 Some(Duration::from_secs(ttl_secs as u64))
112 } else {
113 None };
115
116 Some((value, ttl))
117 }
118
119 pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
121 let json_str = serde_json::to_string(&value)?;
122 let mut conn = self.conn_manager.clone();
123
124 let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
125 self.sets.fetch_add(1, Ordering::Relaxed);
126 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
127 Ok(())
128 }
129
130 pub async fn remove(&self, key: &str) -> Result<()> {
132 let mut conn = self.conn_manager.clone();
133 let _: () = conn.del(key).await?;
134 Ok(())
135 }
136
137 pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
162 let mut conn = self.conn_manager.clone();
163 let mut keys = Vec::new();
164 let mut cursor: u64 = 0;
165
166 loop {
167 let result: (u64, Vec<String>) = redis::cmd("SCAN")
169 .arg(cursor)
170 .arg("MATCH")
171 .arg(pattern)
172 .arg("COUNT")
173 .arg(100) .query_async(&mut conn)
175 .await?;
176
177 cursor = result.0;
178 keys.extend(result.1);
179
180 if cursor == 0 {
182 break;
183 }
184 }
185
186 println!("🔍 [L2] Scanned keys matching '{}': {} found", pattern, keys.len());
187 Ok(keys)
188 }
189
190 pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
194 if keys.is_empty() {
195 return Ok(0);
196 }
197
198 let mut conn = self.conn_manager.clone();
199 let count: usize = conn.del(keys).await?;
200 println!("🗑️ [L2] Removed {} keys", count);
201 Ok(count)
202 }
203
204 pub async fn health_check(&self) -> bool {
206 let test_key = "health_check_l2";
207 let timestamp = std::time::SystemTime::now()
208 .duration_since(std::time::UNIX_EPOCH)
209 .unwrap_or(Duration::from_secs(0))
210 .as_secs();
211 let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
212
213 match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
214 Ok(_) => {
215 match self.get(test_key).await {
216 Some(retrieved) => {
217 let _ = self.remove(test_key).await;
218 retrieved["test"].as_bool().unwrap_or(false)
219 }
220 None => false
221 }
222 }
223 Err(_) => false
224 }
225 }
226
227}
228
229use crate::traits::{CacheBackend, L2CacheBackend};
232use async_trait::async_trait;
233
234#[async_trait]
238impl CacheBackend for L2Cache {
239 async fn get(&self, key: &str) -> Option<serde_json::Value> {
240 L2Cache::get(self, key).await
241 }
242
243 async fn set_with_ttl(
244 &self,
245 key: &str,
246 value: serde_json::Value,
247 ttl: Duration,
248 ) -> Result<()> {
249 L2Cache::set_with_ttl(self, key, value, ttl).await
250 }
251
252 async fn remove(&self, key: &str) -> Result<()> {
253 L2Cache::remove(self, key).await
254 }
255
256 async fn health_check(&self) -> bool {
257 L2Cache::health_check(self).await
258 }
259
260 fn name(&self) -> &str {
261 "Redis (L2)"
262 }
263}
264
265#[async_trait]
269impl L2CacheBackend for L2Cache {
270 async fn get_with_ttl(
271 &self,
272 key: &str,
273 ) -> Option<(serde_json::Value, Option<Duration>)> {
274 L2Cache::get_with_ttl(self, key).await
275 }
276}