1use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13pub struct L2Cache {
15 conn_manager: ConnectionManager,
17 hits: Arc<AtomicU64>,
19 misses: Arc<AtomicU64>,
21 sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26 pub async fn new() -> Result<Self> {
28 println!(" 🔴 Initializing L2 Cache (Redis with ConnectionManager)...");
29
30 let redis_url = std::env::var("REDIS_URL")
32 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34 let client = Client::open(redis_url.as_str())?;
35
36 let conn_manager = ConnectionManager::new(client).await?;
38
39 let mut conn = conn_manager.clone();
41 let _: String = redis::cmd("PING").query_async(&mut conn).await?;
42
43 println!(" ✅ L2 Cache connected to Redis at {} (ConnectionManager enabled)", redis_url);
44
45 Ok(Self {
46 conn_manager,
47 hits: Arc::new(AtomicU64::new(0)),
48 misses: Arc::new(AtomicU64::new(0)),
49 sets: Arc::new(AtomicU64::new(0)),
50 })
51 }
52
53 pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
55 let mut conn = self.conn_manager.clone();
56
57 match conn.get::<_, String>(key).await {
58 Ok(json_str) => {
59 match serde_json::from_str(&json_str) {
60 Ok(value) => {
61 self.hits.fetch_add(1, Ordering::Relaxed);
62 Some(value)
63 }
64 Err(_) => {
65 self.misses.fetch_add(1, Ordering::Relaxed);
66 None
67 }
68 }
69 }
70 Err(_) => {
71 self.misses.fetch_add(1, Ordering::Relaxed);
72 None
73 }
74 }
75 }
76
77 pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
82 let mut conn = self.conn_manager.clone();
83
84 let json_str: String = match conn.get(key).await {
86 Ok(s) => s,
87 Err(_) => {
88 self.misses.fetch_add(1, Ordering::Relaxed);
89 return None;
90 }
91 };
92
93 let value: serde_json::Value = match serde_json::from_str(&json_str) {
95 Ok(v) => v,
96 Err(_) => {
97 self.misses.fetch_add(1, Ordering::Relaxed);
98 return None;
99 }
100 };
101
102 let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
104 Ok(ttl) => ttl,
105 Err(_) => -1, };
107
108 self.hits.fetch_add(1, Ordering::Relaxed);
109
110 let ttl = if ttl_secs > 0 {
111 Some(Duration::from_secs(ttl_secs as u64))
112 } else {
113 None };
115
116 Some((value, ttl))
117 }
118
119 pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
121 let json_str = serde_json::to_string(&value)?;
122 let mut conn = self.conn_manager.clone();
123
124 let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
125 self.sets.fetch_add(1, Ordering::Relaxed);
126 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
127 Ok(())
128 }
129
130 pub async fn remove(&self, key: &str) -> Result<()> {
132 let mut conn = self.conn_manager.clone();
133 let _: () = conn.del(key).await?;
134 Ok(())
135 }
136
137 pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
157 let mut conn = self.conn_manager.clone();
158 let mut keys = Vec::new();
159 let mut cursor: u64 = 0;
160
161 loop {
162 let result: (u64, Vec<String>) = redis::cmd("SCAN")
164 .arg(cursor)
165 .arg("MATCH")
166 .arg(pattern)
167 .arg("COUNT")
168 .arg(100) .query_async(&mut conn)
170 .await?;
171
172 cursor = result.0;
173 keys.extend(result.1);
174
175 if cursor == 0 {
177 break;
178 }
179 }
180
181 println!("🔍 [L2] Scanned keys matching '{}': {} found", pattern, keys.len());
182 Ok(keys)
183 }
184
185 pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
189 if keys.is_empty() {
190 return Ok(0);
191 }
192
193 let mut conn = self.conn_manager.clone();
194 let count: usize = conn.del(keys).await?;
195 println!("🗑️ [L2] Removed {} keys", count);
196 Ok(count)
197 }
198
199 pub async fn health_check(&self) -> bool {
201 let test_key = "health_check_l2";
202 let timestamp = std::time::SystemTime::now()
203 .duration_since(std::time::UNIX_EPOCH)
204 .unwrap_or(Duration::from_secs(0))
205 .as_secs();
206 let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
207
208 match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
209 Ok(_) => {
210 match self.get(test_key).await {
211 Some(retrieved) => {
212 let _ = self.remove(test_key).await;
213 retrieved["test"].as_bool().unwrap_or(false)
214 }
215 None => false
216 }
217 }
218 Err(_) => false
219 }
220 }
221
222 pub async fn stream_add(
232 &self,
233 stream_key: &str,
234 fields: Vec<(String, String)>,
235 maxlen: Option<usize>
236 ) -> Result<String> {
237 let mut conn = self.conn_manager.clone();
238
239 let mut cmd = redis::cmd("XADD");
241 cmd.arg(stream_key);
242
243 if let Some(max) = maxlen {
244 cmd.arg("MAXLEN").arg("~").arg(max);
246 }
247
248 cmd.arg("*"); for (field, value) in &fields {
252 cmd.arg(field).arg(value);
253 }
254
255 let entry_id: String = cmd.query_async(&mut conn).await?;
256
257 println!("📤 [L2 Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
258 Ok(entry_id)
259 }
260
261 pub async fn stream_read_latest(
270 &self,
271 stream_key: &str,
272 count: usize
273 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
274 let mut conn = self.conn_manager.clone();
275
276 let raw_result: redis::Value = redis::cmd("XREVRANGE")
278 .arg(stream_key)
279 .arg("+") .arg("-") .arg("COUNT")
282 .arg(count)
283 .query_async(&mut conn)
284 .await?;
285
286 let entries = Self::parse_stream_response(raw_result)?;
287
288 println!("📥 [L2 Stream] Read {} entries from '{}'", entries.len(), stream_key);
289 Ok(entries)
290 }
291
292 pub async fn stream_read(
303 &self,
304 stream_key: &str,
305 last_id: &str,
306 count: usize,
307 block_ms: Option<usize>
308 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
309 let mut conn = self.conn_manager.clone();
310
311 let mut cmd = redis::cmd("XREAD");
312
313 if let Some(block) = block_ms {
315 cmd.arg("BLOCK").arg(block);
316 }
317
318 cmd.arg("COUNT").arg(count);
319 cmd.arg("STREAMS").arg(stream_key).arg(last_id);
320
321 let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
322
323 let entries = Self::parse_xread_response(raw_result)?;
325
326 println!("📥 [L2 Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
327 Ok(entries)
328 }
329
330 fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
333 match value {
334 redis::Value::Array(entries) => {
335 let mut result = Vec::new();
336
337 for entry in entries {
338 if let redis::Value::Array(entry_parts) = entry {
339 if entry_parts.len() >= 2 {
340 let id = Self::value_to_string(&entry_parts[0]);
342
343 if let redis::Value::Array(field_values) = &entry_parts[1] {
345 let mut fields = Vec::new();
346
347 for chunk in field_values.chunks(2) {
349 if chunk.len() == 2 {
350 let field = Self::value_to_string(&chunk[0]);
351 let value = Self::value_to_string(&chunk[1]);
352 fields.push((field, value));
353 }
354 }
355
356 result.push((id, fields));
357 }
358 }
359 }
360 }
361
362 Ok(result)
363 }
364 redis::Value::Nil => Ok(Vec::new()),
365 _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
366 }
367 }
368
369 fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
372 match value {
373 redis::Value::Array(streams) => {
374 let mut all_entries = Vec::new();
375
376 for stream in streams {
377 if let redis::Value::Array(stream_parts) = stream {
378 if stream_parts.len() >= 2 {
379 if let redis::Value::Array(entries) = &stream_parts[1] {
381 for entry in entries {
382 if let redis::Value::Array(entry_parts) = entry {
383 if entry_parts.len() >= 2 {
384 let id = Self::value_to_string(&entry_parts[0]);
385
386 if let redis::Value::Array(field_values) = &entry_parts[1] {
387 let mut fields = Vec::new();
388
389 for chunk in field_values.chunks(2) {
390 if chunk.len() == 2 {
391 let field = Self::value_to_string(&chunk[0]);
392 let value = Self::value_to_string(&chunk[1]);
393 fields.push((field, value));
394 }
395 }
396
397 all_entries.push((id, fields));
398 }
399 }
400 }
401 }
402 }
403 }
404 }
405 }
406
407 Ok(all_entries)
408 }
409 redis::Value::Nil => Ok(Vec::new()),
410 _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
411 }
412 }
413
414 fn value_to_string(value: &redis::Value) -> String {
416 match value {
417 redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
418 redis::Value::SimpleString(s) => s.clone(),
419 redis::Value::Int(i) => i.to_string(),
420 redis::Value::Okay => "OK".to_string(),
421 redis::Value::Nil => String::new(),
422 _ => String::new(),
423 }
424 }
425}
426
427use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
430use async_trait::async_trait;
431
432#[async_trait]
436impl CacheBackend for L2Cache {
437 async fn get(&self, key: &str) -> Option<serde_json::Value> {
438 L2Cache::get(self, key).await
439 }
440
441 async fn set_with_ttl(
442 &self,
443 key: &str,
444 value: serde_json::Value,
445 ttl: Duration,
446 ) -> Result<()> {
447 L2Cache::set_with_ttl(self, key, value, ttl).await
448 }
449
450 async fn remove(&self, key: &str) -> Result<()> {
451 L2Cache::remove(self, key).await
452 }
453
454 async fn health_check(&self) -> bool {
455 L2Cache::health_check(self).await
456 }
457
458 fn name(&self) -> &str {
459 "Redis (L2)"
460 }
461}
462
463#[async_trait]
467impl L2CacheBackend for L2Cache {
468 async fn get_with_ttl(
469 &self,
470 key: &str,
471 ) -> Option<(serde_json::Value, Option<Duration>)> {
472 L2Cache::get_with_ttl(self, key).await
473 }
474}
475
476#[async_trait]
480impl StreamingBackend for L2Cache {
481 async fn stream_add(
482 &self,
483 stream_key: &str,
484 fields: Vec<(String, String)>,
485 maxlen: Option<usize>,
486 ) -> Result<String> {
487 L2Cache::stream_add(self, stream_key, fields, maxlen).await
488 }
489
490 async fn stream_read_latest(
491 &self,
492 stream_key: &str,
493 count: usize,
494 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
495 L2Cache::stream_read_latest(self, stream_key, count).await
496 }
497
498 async fn stream_read(
499 &self,
500 stream_key: &str,
501 last_id: &str,
502 count: usize,
503 block_ms: Option<usize>,
504 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
505 L2Cache::stream_read(self, stream_key, last_id, count, block_ms).await
506 }
507}