1use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13pub struct L2Cache {
15 conn_manager: ConnectionManager,
17 hits: Arc<AtomicU64>,
19 misses: Arc<AtomicU64>,
21 sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26 pub async fn new() -> Result<Self> {
28 println!(" 🔴 Initializing L2 Cache (Redis with ConnectionManager)...");
29
30 let redis_url = std::env::var("REDIS_URL")
32 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34 let client = Client::open(redis_url.as_str())?;
35
36 let conn_manager = ConnectionManager::new(client).await?;
38
39 let mut conn = conn_manager.clone();
41 let _: String = redis::cmd("PING").query_async(&mut conn).await?;
42
43 println!(" ✅ L2 Cache connected to Redis at {} (ConnectionManager enabled)", redis_url);
44
45 Ok(Self {
46 conn_manager,
47 hits: Arc::new(AtomicU64::new(0)),
48 misses: Arc::new(AtomicU64::new(0)),
49 sets: Arc::new(AtomicU64::new(0)),
50 })
51 }
52
53 pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
55 let mut conn = self.conn_manager.clone();
56
57 match conn.get::<_, String>(key).await {
58 Ok(json_str) => {
59 match serde_json::from_str(&json_str) {
60 Ok(value) => {
61 self.hits.fetch_add(1, Ordering::Relaxed);
62 Some(value)
63 }
64 Err(_) => {
65 self.misses.fetch_add(1, Ordering::Relaxed);
66 None
67 }
68 }
69 }
70 Err(_) => {
71 self.misses.fetch_add(1, Ordering::Relaxed);
72 None
73 }
74 }
75 }
76
77 pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
82 let mut conn = self.conn_manager.clone();
83
84 let json_str: String = match conn.get(key).await {
86 Ok(s) => s,
87 Err(_) => {
88 self.misses.fetch_add(1, Ordering::Relaxed);
89 return None;
90 }
91 };
92
93 let value: serde_json::Value = match serde_json::from_str(&json_str) {
95 Ok(v) => v,
96 Err(_) => {
97 self.misses.fetch_add(1, Ordering::Relaxed);
98 return None;
99 }
100 };
101
102 let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
104 Ok(ttl) => ttl,
105 Err(_) => -1, };
107
108 self.hits.fetch_add(1, Ordering::Relaxed);
109
110 let ttl = if ttl_secs > 0 {
111 Some(Duration::from_secs(ttl_secs as u64))
112 } else {
113 None };
115
116 Some((value, ttl))
117 }
118
119 pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
121 let json_str = serde_json::to_string(&value)?;
122 let mut conn = self.conn_manager.clone();
123
124 let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
125 self.sets.fetch_add(1, Ordering::Relaxed);
126 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
127 Ok(())
128 }
129
130 pub async fn remove(&self, key: &str) -> Result<()> {
132 let mut conn = self.conn_manager.clone();
133 let _: () = conn.del(key).await?;
134 Ok(())
135 }
136
137 pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
162 let mut conn = self.conn_manager.clone();
163 let mut keys = Vec::new();
164 let mut cursor: u64 = 0;
165
166 loop {
167 let result: (u64, Vec<String>) = redis::cmd("SCAN")
169 .arg(cursor)
170 .arg("MATCH")
171 .arg(pattern)
172 .arg("COUNT")
173 .arg(100) .query_async(&mut conn)
175 .await?;
176
177 cursor = result.0;
178 keys.extend(result.1);
179
180 if cursor == 0 {
182 break;
183 }
184 }
185
186 println!("🔍 [L2] Scanned keys matching '{}': {} found", pattern, keys.len());
187 Ok(keys)
188 }
189
190 pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
194 if keys.is_empty() {
195 return Ok(0);
196 }
197
198 let mut conn = self.conn_manager.clone();
199 let count: usize = conn.del(keys).await?;
200 println!("🗑️ [L2] Removed {} keys", count);
201 Ok(count)
202 }
203
204 pub async fn health_check(&self) -> bool {
206 let test_key = "health_check_l2";
207 let timestamp = std::time::SystemTime::now()
208 .duration_since(std::time::UNIX_EPOCH)
209 .unwrap_or(Duration::from_secs(0))
210 .as_secs();
211 let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
212
213 match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
214 Ok(_) => {
215 match self.get(test_key).await {
216 Some(retrieved) => {
217 let _ = self.remove(test_key).await;
218 retrieved["test"].as_bool().unwrap_or(false)
219 }
220 None => false
221 }
222 }
223 Err(_) => false
224 }
225 }
226
227 pub async fn stream_add(
237 &self,
238 stream_key: &str,
239 fields: Vec<(String, String)>,
240 maxlen: Option<usize>
241 ) -> Result<String> {
242 let mut conn = self.conn_manager.clone();
243
244 let mut cmd = redis::cmd("XADD");
246 cmd.arg(stream_key);
247
248 if let Some(max) = maxlen {
249 cmd.arg("MAXLEN").arg("~").arg(max);
251 }
252
253 cmd.arg("*"); for (field, value) in &fields {
257 cmd.arg(field).arg(value);
258 }
259
260 let entry_id: String = cmd.query_async(&mut conn).await?;
261
262 println!("📤 [L2 Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
263 Ok(entry_id)
264 }
265
266 pub async fn stream_read_latest(
275 &self,
276 stream_key: &str,
277 count: usize
278 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
279 let mut conn = self.conn_manager.clone();
280
281 let raw_result: redis::Value = redis::cmd("XREVRANGE")
283 .arg(stream_key)
284 .arg("+") .arg("-") .arg("COUNT")
287 .arg(count)
288 .query_async(&mut conn)
289 .await?;
290
291 let entries = Self::parse_stream_response(raw_result)?;
292
293 println!("📥 [L2 Stream] Read {} entries from '{}'", entries.len(), stream_key);
294 Ok(entries)
295 }
296
297 pub async fn stream_read(
308 &self,
309 stream_key: &str,
310 last_id: &str,
311 count: usize,
312 block_ms: Option<usize>
313 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
314 let mut conn = self.conn_manager.clone();
315
316 let mut cmd = redis::cmd("XREAD");
317
318 if let Some(block) = block_ms {
320 cmd.arg("BLOCK").arg(block);
321 }
322
323 cmd.arg("COUNT").arg(count);
324 cmd.arg("STREAMS").arg(stream_key).arg(last_id);
325
326 let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
327
328 let entries = Self::parse_xread_response(raw_result)?;
330
331 println!("📥 [L2 Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
332 Ok(entries)
333 }
334
335 fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
338 match value {
339 redis::Value::Array(entries) => {
340 let mut result = Vec::new();
341
342 for entry in entries {
343 if let redis::Value::Array(entry_parts) = entry {
344 if entry_parts.len() >= 2 {
345 let id = Self::value_to_string(&entry_parts[0]);
347
348 if let redis::Value::Array(field_values) = &entry_parts[1] {
350 let mut fields = Vec::new();
351
352 for chunk in field_values.chunks(2) {
354 if chunk.len() == 2 {
355 let field = Self::value_to_string(&chunk[0]);
356 let value = Self::value_to_string(&chunk[1]);
357 fields.push((field, value));
358 }
359 }
360
361 result.push((id, fields));
362 }
363 }
364 }
365 }
366
367 Ok(result)
368 }
369 redis::Value::Nil => Ok(Vec::new()),
370 _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
371 }
372 }
373
374 fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
377 match value {
378 redis::Value::Array(streams) => {
379 let mut all_entries = Vec::new();
380
381 for stream in streams {
382 if let redis::Value::Array(stream_parts) = stream {
383 if stream_parts.len() >= 2 {
384 if let redis::Value::Array(entries) = &stream_parts[1] {
386 for entry in entries {
387 if let redis::Value::Array(entry_parts) = entry {
388 if entry_parts.len() >= 2 {
389 let id = Self::value_to_string(&entry_parts[0]);
390
391 if let redis::Value::Array(field_values) = &entry_parts[1] {
392 let mut fields = Vec::new();
393
394 for chunk in field_values.chunks(2) {
395 if chunk.len() == 2 {
396 let field = Self::value_to_string(&chunk[0]);
397 let value = Self::value_to_string(&chunk[1]);
398 fields.push((field, value));
399 }
400 }
401
402 all_entries.push((id, fields));
403 }
404 }
405 }
406 }
407 }
408 }
409 }
410 }
411
412 Ok(all_entries)
413 }
414 redis::Value::Nil => Ok(Vec::new()),
415 _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
416 }
417 }
418
419 fn value_to_string(value: &redis::Value) -> String {
421 match value {
422 redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
423 redis::Value::SimpleString(s) => s.clone(),
424 redis::Value::Int(i) => i.to_string(),
425 redis::Value::Okay => "OK".to_string(),
426 redis::Value::Nil => String::new(),
427 _ => String::new(),
428 }
429 }
430}
431
432use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
435use async_trait::async_trait;
436
437#[async_trait]
441impl CacheBackend for L2Cache {
442 async fn get(&self, key: &str) -> Option<serde_json::Value> {
443 L2Cache::get(self, key).await
444 }
445
446 async fn set_with_ttl(
447 &self,
448 key: &str,
449 value: serde_json::Value,
450 ttl: Duration,
451 ) -> Result<()> {
452 L2Cache::set_with_ttl(self, key, value, ttl).await
453 }
454
455 async fn remove(&self, key: &str) -> Result<()> {
456 L2Cache::remove(self, key).await
457 }
458
459 async fn health_check(&self) -> bool {
460 L2Cache::health_check(self).await
461 }
462
463 fn name(&self) -> &str {
464 "Redis (L2)"
465 }
466}
467
468#[async_trait]
472impl L2CacheBackend for L2Cache {
473 async fn get_with_ttl(
474 &self,
475 key: &str,
476 ) -> Option<(serde_json::Value, Option<Duration>)> {
477 L2Cache::get_with_ttl(self, key).await
478 }
479}
480
481#[async_trait]
485impl StreamingBackend for L2Cache {
486 async fn stream_add(
487 &self,
488 stream_key: &str,
489 fields: Vec<(String, String)>,
490 maxlen: Option<usize>,
491 ) -> Result<String> {
492 L2Cache::stream_add(self, stream_key, fields, maxlen).await
493 }
494
495 async fn stream_read_latest(
496 &self,
497 stream_key: &str,
498 count: usize,
499 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
500 L2Cache::stream_read_latest(self, stream_key, count).await
501 }
502
503 async fn stream_read(
504 &self,
505 stream_key: &str,
506 last_id: &str,
507 count: usize,
508 block_ms: Option<usize>,
509 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
510 L2Cache::stream_read(self, stream_key, last_id, count, block_ms).await
511 }
512}