multi_tier_cache/
l2_cache.rs

1//! L2 Cache - Redis Cache
2//!
3//! Redis-based distributed cache for warm data storage with persistence.
4
5use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// L2 Cache using Redis with ConnectionManager for automatic reconnection
14pub struct L2Cache {
15    /// Redis connection manager - handles reconnection automatically
16    conn_manager: ConnectionManager,
17    /// Hit counter
18    hits: Arc<AtomicU64>,
19    /// Miss counter
20    misses: Arc<AtomicU64>,
21    /// Set counter
22    sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26    /// Create new L2 cache with ConnectionManager for automatic reconnection
27    pub async fn new() -> Result<Self> {
28        println!("  🔴 Initializing L2 Cache (Redis with ConnectionManager)...");
29
30        // Try to connect to Redis
31        let redis_url = std::env::var("REDIS_URL")
32            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34        let client = Client::open(redis_url.as_str())?;
35
36        // Create ConnectionManager - handles reconnection automatically
37        let conn_manager = ConnectionManager::new(client).await?;
38
39        // Test connection
40        let mut conn = conn_manager.clone();
41        let _: String = redis::cmd("PING").query_async(&mut conn).await?;
42
43        println!("  ✅ L2 Cache connected to Redis at {} (ConnectionManager enabled)", redis_url);
44
45        Ok(Self {
46            conn_manager,
47            hits: Arc::new(AtomicU64::new(0)),
48            misses: Arc::new(AtomicU64::new(0)),
49            sets: Arc::new(AtomicU64::new(0)),
50        })
51    }
52    
53    /// Get value from L2 cache using persistent ConnectionManager
54    pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
55        let mut conn = self.conn_manager.clone();
56
57        match conn.get::<_, String>(key).await {
58            Ok(json_str) => {
59                match serde_json::from_str(&json_str) {
60                    Ok(value) => {
61                        self.hits.fetch_add(1, Ordering::Relaxed);
62                        Some(value)
63                    }
64                    Err(_) => {
65                        self.misses.fetch_add(1, Ordering::Relaxed);
66                        None
67                    }
68                }
69            }
70            Err(_) => {
71                self.misses.fetch_add(1, Ordering::Relaxed);
72                None
73            }
74        }
75    }
76
77    /// Get value with its remaining TTL from L2 cache
78    ///
79    /// Returns tuple of (value, ttl_seconds) if key exists
80    /// TTL is in seconds, None if key doesn't exist or has no expiration
81    pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
82        let mut conn = self.conn_manager.clone();
83
84        // Get value
85        let json_str: String = match conn.get(key).await {
86            Ok(s) => s,
87            Err(_) => {
88                self.misses.fetch_add(1, Ordering::Relaxed);
89                return None;
90            }
91        };
92
93        // Parse JSON
94        let value: serde_json::Value = match serde_json::from_str(&json_str) {
95            Ok(v) => v,
96            Err(_) => {
97                self.misses.fetch_add(1, Ordering::Relaxed);
98                return None;
99            }
100        };
101
102        // Get TTL (in seconds, -1 = no expiry, -2 = key doesn't exist)
103        let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
104            Ok(ttl) => ttl,
105            Err(_) => -1, // Fallback: treat as no expiry
106        };
107
108        self.hits.fetch_add(1, Ordering::Relaxed);
109
110        let ttl = if ttl_secs > 0 {
111            Some(Duration::from_secs(ttl_secs as u64))
112        } else {
113            None // No expiry or error
114        };
115
116        Some((value, ttl))
117    }
118    
119    /// Set value with custom TTL using persistent ConnectionManager
120    pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
121        let json_str = serde_json::to_string(&value)?;
122        let mut conn = self.conn_manager.clone();
123
124        let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
125        self.sets.fetch_add(1, Ordering::Relaxed);
126        println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
127        Ok(())
128    }
129    
130    /// Remove value from cache using persistent ConnectionManager
131    pub async fn remove(&self, key: &str) -> Result<()> {
132        let mut conn = self.conn_manager.clone();
133        let _: () = conn.del(key).await?;
134        Ok(())
135    }
136
137    /// Scan keys matching a pattern (glob-style: *, ?, [])
138    ///
139    /// Uses Redis SCAN command (non-blocking, cursor-based iteration)
140    /// This is safe for production use, unlike KEYS command.
141    ///
142    /// # Arguments
143    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
144    ///
145    /// # Returns
146    /// Vector of matching key names
147    ///
148    /// # Examples
149    /// ```
150    /// // Find all user cache keys
151    /// let keys = cache.scan_keys("user:*").await?;
152    ///
153    /// // Find specific user's cache keys
154    /// let keys = cache.scan_keys("user:123:*").await?;
155    /// ```
156    pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
157        let mut conn = self.conn_manager.clone();
158        let mut keys = Vec::new();
159        let mut cursor: u64 = 0;
160
161        loop {
162            // SCAN cursor MATCH pattern COUNT 100
163            let result: (u64, Vec<String>) = redis::cmd("SCAN")
164                .arg(cursor)
165                .arg("MATCH")
166                .arg(pattern)
167                .arg("COUNT")
168                .arg(100) // Fetch 100 keys per iteration
169                .query_async(&mut conn)
170                .await?;
171
172            cursor = result.0;
173            keys.extend(result.1);
174
175            // Cursor 0 means iteration is complete
176            if cursor == 0 {
177                break;
178            }
179        }
180
181        println!("🔍 [L2] Scanned keys matching '{}': {} found", pattern, keys.len());
182        Ok(keys)
183    }
184
185    /// Remove multiple keys at once (bulk delete)
186    ///
187    /// More efficient than calling remove() multiple times
188    pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
189        if keys.is_empty() {
190            return Ok(0);
191        }
192
193        let mut conn = self.conn_manager.clone();
194        let count: usize = conn.del(keys).await?;
195        println!("🗑️  [L2] Removed {} keys", count);
196        Ok(count)
197    }
198    
199    /// Health check
200    pub async fn health_check(&self) -> bool {
201        let test_key = "health_check_l2";
202        let timestamp = std::time::SystemTime::now()
203            .duration_since(std::time::UNIX_EPOCH)
204            .unwrap_or(Duration::from_secs(0))
205            .as_secs();
206        let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
207
208        match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
209            Ok(_) => {
210                match self.get(test_key).await {
211                    Some(retrieved) => {
212                        let _ = self.remove(test_key).await;
213                        retrieved["test"].as_bool().unwrap_or(false)
214                    }
215                    None => false
216                }
217            }
218            Err(_) => false
219        }
220    }
221    
222    /// Publish data to Redis Stream using XADD
223    ///
224    /// # Arguments
225    /// * `stream_key` - Name of the Redis Stream (e.g., "events_stream")
226    /// * `fields` - Vec of field-value pairs to add to the stream
227    /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
228    ///
229    /// # Returns
230    /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
231    pub async fn stream_add(
232        &self,
233        stream_key: &str,
234        fields: Vec<(String, String)>,
235        maxlen: Option<usize>
236    ) -> Result<String> {
237        let mut conn = self.conn_manager.clone();
238
239        // Build XADD command with MAXLEN trimming if specified
240        let mut cmd = redis::cmd("XADD");
241        cmd.arg(stream_key);
242
243        if let Some(max) = maxlen {
244            // Use MAXLEN ~ for approximate trimming (more efficient)
245            cmd.arg("MAXLEN").arg("~").arg(max);
246        }
247
248        cmd.arg("*");  // Auto-generate ID
249
250        // Add all field-value pairs
251        for (field, value) in &fields {
252            cmd.arg(field).arg(value);
253        }
254
255        let entry_id: String = cmd.query_async(&mut conn).await?;
256
257        println!("📤 [L2 Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
258        Ok(entry_id)
259    }
260    
261    /// Read latest entries from Redis Stream using XREVRANGE
262    /// 
263    /// # Arguments
264    /// * `stream_key` - Name of the Redis Stream
265    /// * `count` - Number of latest entries to retrieve
266    /// 
267    /// # Returns
268    /// Vector of (entry_id, fields) tuples, ordered from newest to oldest
269    pub async fn stream_read_latest(
270        &self,
271        stream_key: &str,
272        count: usize
273    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
274        let mut conn = self.conn_manager.clone();
275
276        // XREVRANGE returns a raw Value that we need to parse manually
277        let raw_result: redis::Value = redis::cmd("XREVRANGE")
278            .arg(stream_key)
279            .arg("+")  // Start from the newest
280            .arg("-")  // To the oldest
281            .arg("COUNT")
282            .arg(count)
283            .query_async(&mut conn)
284            .await?;
285
286        let entries = Self::parse_stream_response(raw_result)?;
287
288        println!("📥 [L2 Stream] Read {} entries from '{}'", entries.len(), stream_key);
289        Ok(entries)
290    }
291    
292    /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
293    /// 
294    /// # Arguments
295    /// * `stream_key` - Name of the Redis Stream
296    /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
297    /// * `count` - Maximum number of entries to retrieve
298    /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
299    /// 
300    /// # Returns
301    /// Vector of (entry_id, fields) tuples
302    pub async fn stream_read(
303        &self,
304        stream_key: &str,
305        last_id: &str,
306        count: usize,
307        block_ms: Option<usize>
308    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
309        let mut conn = self.conn_manager.clone();
310
311        let mut cmd = redis::cmd("XREAD");
312
313        // Add BLOCK if specified
314        if let Some(block) = block_ms {
315            cmd.arg("BLOCK").arg(block);
316        }
317
318        cmd.arg("COUNT").arg(count);
319        cmd.arg("STREAMS").arg(stream_key).arg(last_id);
320
321        let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
322
323        // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
324        let entries = Self::parse_xread_response(raw_result)?;
325
326        println!("📥 [L2 Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
327        Ok(entries)
328    }
329    
330    /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
331    /// Format: [[id, [field, value, field, value, ...]], ...]
332    fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
333        match value {
334            redis::Value::Array(entries) => {
335                let mut result = Vec::new();
336                
337                for entry in entries {
338                    if let redis::Value::Array(entry_parts) = entry {
339                        if entry_parts.len() >= 2 {
340                            // First element is the ID
341                            let id = Self::value_to_string(&entry_parts[0]);
342                            
343                            // Second element is the field-value array
344                            if let redis::Value::Array(field_values) = &entry_parts[1] {
345                                let mut fields = Vec::new();
346                                
347                                // Process pairs of field-value
348                                for chunk in field_values.chunks(2) {
349                                    if chunk.len() == 2 {
350                                        let field = Self::value_to_string(&chunk[0]);
351                                        let value = Self::value_to_string(&chunk[1]);
352                                        fields.push((field, value));
353                                    }
354                                }
355                                
356                                result.push((id, fields));
357                            }
358                        }
359                    }
360                }
361                
362                Ok(result)
363            }
364            redis::Value::Nil => Ok(Vec::new()),
365            _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
366        }
367    }
368    
369    /// Helper function to parse XREAD response
370    /// Format: [[stream_name, [[id, [field, value, ...]], ...]], ...]
371    fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
372        match value {
373            redis::Value::Array(streams) => {
374                let mut all_entries = Vec::new();
375                
376                for stream in streams {
377                    if let redis::Value::Array(stream_parts) = stream {
378                        if stream_parts.len() >= 2 {
379                            // Second element contains the entries
380                            if let redis::Value::Array(entries) = &stream_parts[1] {
381                                for entry in entries {
382                                    if let redis::Value::Array(entry_parts) = entry {
383                                        if entry_parts.len() >= 2 {
384                                            let id = Self::value_to_string(&entry_parts[0]);
385                                            
386                                            if let redis::Value::Array(field_values) = &entry_parts[1] {
387                                                let mut fields = Vec::new();
388                                                
389                                                for chunk in field_values.chunks(2) {
390                                                    if chunk.len() == 2 {
391                                                        let field = Self::value_to_string(&chunk[0]);
392                                                        let value = Self::value_to_string(&chunk[1]);
393                                                        fields.push((field, value));
394                                                    }
395                                                }
396                                                
397                                                all_entries.push((id, fields));
398                                            }
399                                        }
400                                    }
401                                }
402                            }
403                        }
404                    }
405                }
406                
407                Ok(all_entries)
408            }
409            redis::Value::Nil => Ok(Vec::new()),
410            _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
411        }
412    }
413    
414    /// Helper function to convert Redis Value to String
415    fn value_to_string(value: &redis::Value) -> String {
416        match value {
417            redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
418            redis::Value::SimpleString(s) => s.clone(),
419            redis::Value::Int(i) => i.to_string(),
420            redis::Value::Okay => "OK".to_string(),
421            redis::Value::Nil => String::new(),
422            _ => String::new(),
423        }
424    }
425}
426
427// ===== Trait Implementations =====
428
429use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
430use async_trait::async_trait;
431
432/// Implement CacheBackend trait for L2Cache
433///
434/// This allows L2Cache to be used as a pluggable backend in the multi-tier cache system.
435#[async_trait]
436impl CacheBackend for L2Cache {
437    async fn get(&self, key: &str) -> Option<serde_json::Value> {
438        L2Cache::get(self, key).await
439    }
440
441    async fn set_with_ttl(
442        &self,
443        key: &str,
444        value: serde_json::Value,
445        ttl: Duration,
446    ) -> Result<()> {
447        L2Cache::set_with_ttl(self, key, value, ttl).await
448    }
449
450    async fn remove(&self, key: &str) -> Result<()> {
451        L2Cache::remove(self, key).await
452    }
453
454    async fn health_check(&self) -> bool {
455        L2Cache::health_check(self).await
456    }
457
458    fn name(&self) -> &str {
459        "Redis (L2)"
460    }
461}
462
463/// Implement L2CacheBackend trait for L2Cache
464///
465/// This extends CacheBackend with TTL introspection capabilities needed for L2->L1 promotion.
466#[async_trait]
467impl L2CacheBackend for L2Cache {
468    async fn get_with_ttl(
469        &self,
470        key: &str,
471    ) -> Option<(serde_json::Value, Option<Duration>)> {
472        L2Cache::get_with_ttl(self, key).await
473    }
474}
475
476/// Implement StreamingBackend trait for L2Cache
477///
478/// This enables Redis Streams functionality for event-driven architectures.
479#[async_trait]
480impl StreamingBackend for L2Cache {
481    async fn stream_add(
482        &self,
483        stream_key: &str,
484        fields: Vec<(String, String)>,
485        maxlen: Option<usize>,
486    ) -> Result<String> {
487        L2Cache::stream_add(self, stream_key, fields, maxlen).await
488    }
489
490    async fn stream_read_latest(
491        &self,
492        stream_key: &str,
493        count: usize,
494    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
495        L2Cache::stream_read_latest(self, stream_key, count).await
496    }
497
498    async fn stream_read(
499        &self,
500        stream_key: &str,
501        last_id: &str,
502        count: usize,
503        block_ms: Option<usize>,
504    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
505        L2Cache::stream_read(self, stream_key, last_id, count, block_ms).await
506    }
507}