1use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13pub struct L2Cache {
15 conn_manager: ConnectionManager,
17 hits: Arc<AtomicU64>,
19 misses: Arc<AtomicU64>,
21 sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26 pub async fn new() -> Result<Self> {
28 println!(" 🔴 Initializing L2 Cache (Redis with ConnectionManager)...");
29
30 let redis_url = std::env::var("REDIS_URL")
32 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34 let client = Client::open(redis_url.as_str())?;
35
36 let conn_manager = ConnectionManager::new(client).await?;
38
39 let mut conn = conn_manager.clone();
41 let _: String = redis::cmd("PING").query_async(&mut conn).await?;
42
43 println!(" ✅ L2 Cache connected to Redis at {} (ConnectionManager enabled)", redis_url);
44
45 Ok(Self {
46 conn_manager,
47 hits: Arc::new(AtomicU64::new(0)),
48 misses: Arc::new(AtomicU64::new(0)),
49 sets: Arc::new(AtomicU64::new(0)),
50 })
51 }
52
53 pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
55 let mut conn = self.conn_manager.clone();
56
57 match conn.get::<_, String>(key).await {
58 Ok(json_str) => {
59 match serde_json::from_str(&json_str) {
60 Ok(value) => {
61 self.hits.fetch_add(1, Ordering::Relaxed);
62 Some(value)
63 }
64 Err(_) => {
65 self.misses.fetch_add(1, Ordering::Relaxed);
66 None
67 }
68 }
69 }
70 Err(_) => {
71 self.misses.fetch_add(1, Ordering::Relaxed);
72 None
73 }
74 }
75 }
76
77 pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
82 let mut conn = self.conn_manager.clone();
83
84 let json_str: String = match conn.get(key).await {
86 Ok(s) => s,
87 Err(_) => {
88 self.misses.fetch_add(1, Ordering::Relaxed);
89 return None;
90 }
91 };
92
93 let value: serde_json::Value = match serde_json::from_str(&json_str) {
95 Ok(v) => v,
96 Err(_) => {
97 self.misses.fetch_add(1, Ordering::Relaxed);
98 return None;
99 }
100 };
101
102 let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
104 Ok(ttl) => ttl,
105 Err(_) => -1, };
107
108 self.hits.fetch_add(1, Ordering::Relaxed);
109
110 let ttl = if ttl_secs > 0 {
111 Some(Duration::from_secs(ttl_secs as u64))
112 } else {
113 None };
115
116 Some((value, ttl))
117 }
118
119 pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
121 let json_str = serde_json::to_string(&value)?;
122 let mut conn = self.conn_manager.clone();
123
124 let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
125 self.sets.fetch_add(1, Ordering::Relaxed);
126 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
127 Ok(())
128 }
129
130 pub async fn remove(&self, key: &str) -> Result<()> {
132 let mut conn = self.conn_manager.clone();
133 let _: () = conn.del(key).await?;
134 Ok(())
135 }
136
137 pub async fn health_check(&self) -> bool {
139 let test_key = "health_check_l2";
140 let timestamp = std::time::SystemTime::now()
141 .duration_since(std::time::UNIX_EPOCH)
142 .unwrap_or(Duration::from_secs(0))
143 .as_secs();
144 let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
145
146 match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
147 Ok(_) => {
148 match self.get(test_key).await {
149 Some(retrieved) => {
150 let _ = self.remove(test_key).await;
151 retrieved["test"].as_bool().unwrap_or(false)
152 }
153 None => false
154 }
155 }
156 Err(_) => false
157 }
158 }
159
160 pub async fn stream_add(
170 &self,
171 stream_key: &str,
172 fields: Vec<(String, String)>,
173 maxlen: Option<usize>
174 ) -> Result<String> {
175 let mut conn = self.conn_manager.clone();
176
177 let mut cmd = redis::cmd("XADD");
179 cmd.arg(stream_key);
180
181 if let Some(max) = maxlen {
182 cmd.arg("MAXLEN").arg("~").arg(max);
184 }
185
186 cmd.arg("*"); for (field, value) in &fields {
190 cmd.arg(field).arg(value);
191 }
192
193 let entry_id: String = cmd.query_async(&mut conn).await?;
194
195 println!("📤 [L2 Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
196 Ok(entry_id)
197 }
198
199 pub async fn stream_read_latest(
208 &self,
209 stream_key: &str,
210 count: usize
211 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
212 let mut conn = self.conn_manager.clone();
213
214 let raw_result: redis::Value = redis::cmd("XREVRANGE")
216 .arg(stream_key)
217 .arg("+") .arg("-") .arg("COUNT")
220 .arg(count)
221 .query_async(&mut conn)
222 .await?;
223
224 let entries = Self::parse_stream_response(raw_result)?;
225
226 println!("📥 [L2 Stream] Read {} entries from '{}'", entries.len(), stream_key);
227 Ok(entries)
228 }
229
230 pub async fn stream_read(
241 &self,
242 stream_key: &str,
243 last_id: &str,
244 count: usize,
245 block_ms: Option<usize>
246 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
247 let mut conn = self.conn_manager.clone();
248
249 let mut cmd = redis::cmd("XREAD");
250
251 if let Some(block) = block_ms {
253 cmd.arg("BLOCK").arg(block);
254 }
255
256 cmd.arg("COUNT").arg(count);
257 cmd.arg("STREAMS").arg(stream_key).arg(last_id);
258
259 let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
260
261 let entries = Self::parse_xread_response(raw_result)?;
263
264 println!("📥 [L2 Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
265 Ok(entries)
266 }
267
268 fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
271 match value {
272 redis::Value::Array(entries) => {
273 let mut result = Vec::new();
274
275 for entry in entries {
276 if let redis::Value::Array(entry_parts) = entry {
277 if entry_parts.len() >= 2 {
278 let id = Self::value_to_string(&entry_parts[0]);
280
281 if let redis::Value::Array(field_values) = &entry_parts[1] {
283 let mut fields = Vec::new();
284
285 for chunk in field_values.chunks(2) {
287 if chunk.len() == 2 {
288 let field = Self::value_to_string(&chunk[0]);
289 let value = Self::value_to_string(&chunk[1]);
290 fields.push((field, value));
291 }
292 }
293
294 result.push((id, fields));
295 }
296 }
297 }
298 }
299
300 Ok(result)
301 }
302 redis::Value::Nil => Ok(Vec::new()),
303 _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
304 }
305 }
306
307 fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
310 match value {
311 redis::Value::Array(streams) => {
312 let mut all_entries = Vec::new();
313
314 for stream in streams {
315 if let redis::Value::Array(stream_parts) = stream {
316 if stream_parts.len() >= 2 {
317 if let redis::Value::Array(entries) = &stream_parts[1] {
319 for entry in entries {
320 if let redis::Value::Array(entry_parts) = entry {
321 if entry_parts.len() >= 2 {
322 let id = Self::value_to_string(&entry_parts[0]);
323
324 if let redis::Value::Array(field_values) = &entry_parts[1] {
325 let mut fields = Vec::new();
326
327 for chunk in field_values.chunks(2) {
328 if chunk.len() == 2 {
329 let field = Self::value_to_string(&chunk[0]);
330 let value = Self::value_to_string(&chunk[1]);
331 fields.push((field, value));
332 }
333 }
334
335 all_entries.push((id, fields));
336 }
337 }
338 }
339 }
340 }
341 }
342 }
343 }
344
345 Ok(all_entries)
346 }
347 redis::Value::Nil => Ok(Vec::new()),
348 _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
349 }
350 }
351
352 fn value_to_string(value: &redis::Value) -> String {
354 match value {
355 redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
356 redis::Value::SimpleString(s) => s.clone(),
357 redis::Value::Int(i) => i.to_string(),
358 redis::Value::Okay => "OK".to_string(),
359 redis::Value::Nil => String::new(),
360 _ => String::new(),
361 }
362 }
363}
364
365use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
368use async_trait::async_trait;
369
370#[async_trait]
374impl CacheBackend for L2Cache {
375 async fn get(&self, key: &str) -> Option<serde_json::Value> {
376 L2Cache::get(self, key).await
377 }
378
379 async fn set_with_ttl(
380 &self,
381 key: &str,
382 value: serde_json::Value,
383 ttl: Duration,
384 ) -> Result<()> {
385 L2Cache::set_with_ttl(self, key, value, ttl).await
386 }
387
388 async fn remove(&self, key: &str) -> Result<()> {
389 L2Cache::remove(self, key).await
390 }
391
392 async fn health_check(&self) -> bool {
393 L2Cache::health_check(self).await
394 }
395
396 fn name(&self) -> &str {
397 "Redis (L2)"
398 }
399}
400
401#[async_trait]
405impl L2CacheBackend for L2Cache {
406 async fn get_with_ttl(
407 &self,
408 key: &str,
409 ) -> Option<(serde_json::Value, Option<Duration>)> {
410 L2Cache::get_with_ttl(self, key).await
411 }
412}
413
414#[async_trait]
418impl StreamingBackend for L2Cache {
419 async fn stream_add(
420 &self,
421 stream_key: &str,
422 fields: Vec<(String, String)>,
423 maxlen: Option<usize>,
424 ) -> Result<String> {
425 L2Cache::stream_add(self, stream_key, fields, maxlen).await
426 }
427
428 async fn stream_read_latest(
429 &self,
430 stream_key: &str,
431 count: usize,
432 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
433 L2Cache::stream_read_latest(self, stream_key, count).await
434 }
435
436 async fn stream_read(
437 &self,
438 stream_key: &str,
439 last_id: &str,
440 count: usize,
441 block_ms: Option<usize>,
442 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
443 L2Cache::stream_read(self, stream_key, last_id, count, block_ms).await
444 }
445}