multi_tier_cache/
l2_cache.rs

1//! L2 Cache - Redis Cache
2//!
3//! Redis-based distributed cache for warm data storage with persistence.
4
5use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// L2 Cache using Redis with ConnectionManager for automatic reconnection
14pub struct L2Cache {
15    /// Redis connection manager - handles reconnection automatically
16    conn_manager: ConnectionManager,
17    /// Hit counter
18    hits: Arc<AtomicU64>,
19    /// Miss counter
20    misses: Arc<AtomicU64>,
21    /// Set counter
22    sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26    /// Create new L2 cache with ConnectionManager for automatic reconnection
27    pub async fn new() -> Result<Self> {
28        println!("  🔴 Initializing L2 Cache (Redis with ConnectionManager)...");
29
30        // Try to connect to Redis
31        let redis_url = std::env::var("REDIS_URL")
32            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34        let client = Client::open(redis_url.as_str())?;
35
36        // Create ConnectionManager - handles reconnection automatically
37        let conn_manager = ConnectionManager::new(client).await?;
38
39        // Test connection
40        let mut conn = conn_manager.clone();
41        let _: String = redis::cmd("PING").query_async(&mut conn).await?;
42
43        println!("  ✅ L2 Cache connected to Redis at {} (ConnectionManager enabled)", redis_url);
44
45        Ok(Self {
46            conn_manager,
47            hits: Arc::new(AtomicU64::new(0)),
48            misses: Arc::new(AtomicU64::new(0)),
49            sets: Arc::new(AtomicU64::new(0)),
50        })
51    }
52    
53    /// Get value from L2 cache using persistent ConnectionManager
54    pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
55        let mut conn = self.conn_manager.clone();
56
57        match conn.get::<_, String>(key).await {
58            Ok(json_str) => {
59                match serde_json::from_str(&json_str) {
60                    Ok(value) => {
61                        self.hits.fetch_add(1, Ordering::Relaxed);
62                        Some(value)
63                    }
64                    Err(_) => {
65                        self.misses.fetch_add(1, Ordering::Relaxed);
66                        None
67                    }
68                }
69            }
70            Err(_) => {
71                self.misses.fetch_add(1, Ordering::Relaxed);
72                None
73            }
74        }
75    }
76
77    /// Get value with its remaining TTL from L2 cache
78    ///
79    /// Returns tuple of (value, ttl_seconds) if key exists
80    /// TTL is in seconds, None if key doesn't exist or has no expiration
81    pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
82        let mut conn = self.conn_manager.clone();
83
84        // Get value
85        let json_str: String = match conn.get(key).await {
86            Ok(s) => s,
87            Err(_) => {
88                self.misses.fetch_add(1, Ordering::Relaxed);
89                return None;
90            }
91        };
92
93        // Parse JSON
94        let value: serde_json::Value = match serde_json::from_str(&json_str) {
95            Ok(v) => v,
96            Err(_) => {
97                self.misses.fetch_add(1, Ordering::Relaxed);
98                return None;
99            }
100        };
101
102        // Get TTL (in seconds, -1 = no expiry, -2 = key doesn't exist)
103        let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
104            Ok(ttl) => ttl,
105            Err(_) => -1, // Fallback: treat as no expiry
106        };
107
108        self.hits.fetch_add(1, Ordering::Relaxed);
109
110        let ttl = if ttl_secs > 0 {
111            Some(Duration::from_secs(ttl_secs as u64))
112        } else {
113            None // No expiry or error
114        };
115
116        Some((value, ttl))
117    }
118    
119    /// Set value with custom TTL using persistent ConnectionManager
120    pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
121        let json_str = serde_json::to_string(&value)?;
122        let mut conn = self.conn_manager.clone();
123
124        let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
125        self.sets.fetch_add(1, Ordering::Relaxed);
126        println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
127        Ok(())
128    }
129    
130    /// Remove value from cache using persistent ConnectionManager
131    pub async fn remove(&self, key: &str) -> Result<()> {
132        let mut conn = self.conn_manager.clone();
133        let _: () = conn.del(key).await?;
134        Ok(())
135    }
136
137    /// Scan keys matching a pattern (glob-style: *, ?, [])
138    ///
139    /// Uses Redis SCAN command (non-blocking, cursor-based iteration)
140    /// This is safe for production use, unlike KEYS command.
141    ///
142    /// # Arguments
143    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
144    ///
145    /// # Returns
146    /// Vector of matching key names
147    ///
148    /// # Examples
149    /// ```no_run
150    /// # use multi_tier_cache::L2Cache;
151    /// # async fn example() -> anyhow::Result<()> {
152    /// # let cache = L2Cache::new().await?;
153    /// // Find all user cache keys
154    /// let keys = cache.scan_keys("user:*").await?;
155    ///
156    /// // Find specific user's cache keys
157    /// let keys = cache.scan_keys("user:123:*").await?;
158    /// # Ok(())
159    /// # }
160    /// ```
161    pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
162        let mut conn = self.conn_manager.clone();
163        let mut keys = Vec::new();
164        let mut cursor: u64 = 0;
165
166        loop {
167            // SCAN cursor MATCH pattern COUNT 100
168            let result: (u64, Vec<String>) = redis::cmd("SCAN")
169                .arg(cursor)
170                .arg("MATCH")
171                .arg(pattern)
172                .arg("COUNT")
173                .arg(100) // Fetch 100 keys per iteration
174                .query_async(&mut conn)
175                .await?;
176
177            cursor = result.0;
178            keys.extend(result.1);
179
180            // Cursor 0 means iteration is complete
181            if cursor == 0 {
182                break;
183            }
184        }
185
186        println!("🔍 [L2] Scanned keys matching '{}': {} found", pattern, keys.len());
187        Ok(keys)
188    }
189
190    /// Remove multiple keys at once (bulk delete)
191    ///
192    /// More efficient than calling remove() multiple times
193    pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
194        if keys.is_empty() {
195            return Ok(0);
196        }
197
198        let mut conn = self.conn_manager.clone();
199        let count: usize = conn.del(keys).await?;
200        println!("🗑️  [L2] Removed {} keys", count);
201        Ok(count)
202    }
203    
204    /// Health check
205    pub async fn health_check(&self) -> bool {
206        let test_key = "health_check_l2";
207        let timestamp = std::time::SystemTime::now()
208            .duration_since(std::time::UNIX_EPOCH)
209            .unwrap_or(Duration::from_secs(0))
210            .as_secs();
211        let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
212
213        match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
214            Ok(_) => {
215                match self.get(test_key).await {
216                    Some(retrieved) => {
217                        let _ = self.remove(test_key).await;
218                        retrieved["test"].as_bool().unwrap_or(false)
219                    }
220                    None => false
221                }
222            }
223            Err(_) => false
224        }
225    }
226    
227    /// Publish data to Redis Stream using XADD
228    ///
229    /// # Arguments
230    /// * `stream_key` - Name of the Redis Stream (e.g., "events_stream")
231    /// * `fields` - Vec of field-value pairs to add to the stream
232    /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
233    ///
234    /// # Returns
235    /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
236    pub async fn stream_add(
237        &self,
238        stream_key: &str,
239        fields: Vec<(String, String)>,
240        maxlen: Option<usize>
241    ) -> Result<String> {
242        let mut conn = self.conn_manager.clone();
243
244        // Build XADD command with MAXLEN trimming if specified
245        let mut cmd = redis::cmd("XADD");
246        cmd.arg(stream_key);
247
248        if let Some(max) = maxlen {
249            // Use MAXLEN ~ for approximate trimming (more efficient)
250            cmd.arg("MAXLEN").arg("~").arg(max);
251        }
252
253        cmd.arg("*");  // Auto-generate ID
254
255        // Add all field-value pairs
256        for (field, value) in &fields {
257            cmd.arg(field).arg(value);
258        }
259
260        let entry_id: String = cmd.query_async(&mut conn).await?;
261
262        println!("📤 [L2 Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
263        Ok(entry_id)
264    }
265    
266    /// Read latest entries from Redis Stream using XREVRANGE
267    /// 
268    /// # Arguments
269    /// * `stream_key` - Name of the Redis Stream
270    /// * `count` - Number of latest entries to retrieve
271    /// 
272    /// # Returns
273    /// Vector of (entry_id, fields) tuples, ordered from newest to oldest
274    pub async fn stream_read_latest(
275        &self,
276        stream_key: &str,
277        count: usize
278    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
279        let mut conn = self.conn_manager.clone();
280
281        // XREVRANGE returns a raw Value that we need to parse manually
282        let raw_result: redis::Value = redis::cmd("XREVRANGE")
283            .arg(stream_key)
284            .arg("+")  // Start from the newest
285            .arg("-")  // To the oldest
286            .arg("COUNT")
287            .arg(count)
288            .query_async(&mut conn)
289            .await?;
290
291        let entries = Self::parse_stream_response(raw_result)?;
292
293        println!("📥 [L2 Stream] Read {} entries from '{}'", entries.len(), stream_key);
294        Ok(entries)
295    }
296    
297    /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
298    /// 
299    /// # Arguments
300    /// * `stream_key` - Name of the Redis Stream
301    /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
302    /// * `count` - Maximum number of entries to retrieve
303    /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
304    /// 
305    /// # Returns
306    /// Vector of (entry_id, fields) tuples
307    pub async fn stream_read(
308        &self,
309        stream_key: &str,
310        last_id: &str,
311        count: usize,
312        block_ms: Option<usize>
313    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
314        let mut conn = self.conn_manager.clone();
315
316        let mut cmd = redis::cmd("XREAD");
317
318        // Add BLOCK if specified
319        if let Some(block) = block_ms {
320            cmd.arg("BLOCK").arg(block);
321        }
322
323        cmd.arg("COUNT").arg(count);
324        cmd.arg("STREAMS").arg(stream_key).arg(last_id);
325
326        let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
327
328        // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
329        let entries = Self::parse_xread_response(raw_result)?;
330
331        println!("📥 [L2 Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
332        Ok(entries)
333    }
334    
335    /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
336    /// Format: [[id, [field, value, field, value, ...]], ...]
337    fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
338        match value {
339            redis::Value::Array(entries) => {
340                let mut result = Vec::new();
341                
342                for entry in entries {
343                    if let redis::Value::Array(entry_parts) = entry {
344                        if entry_parts.len() >= 2 {
345                            // First element is the ID
346                            let id = Self::value_to_string(&entry_parts[0]);
347                            
348                            // Second element is the field-value array
349                            if let redis::Value::Array(field_values) = &entry_parts[1] {
350                                let mut fields = Vec::new();
351                                
352                                // Process pairs of field-value
353                                for chunk in field_values.chunks(2) {
354                                    if chunk.len() == 2 {
355                                        let field = Self::value_to_string(&chunk[0]);
356                                        let value = Self::value_to_string(&chunk[1]);
357                                        fields.push((field, value));
358                                    }
359                                }
360                                
361                                result.push((id, fields));
362                            }
363                        }
364                    }
365                }
366                
367                Ok(result)
368            }
369            redis::Value::Nil => Ok(Vec::new()),
370            _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
371        }
372    }
373    
374    /// Helper function to parse XREAD response
375    /// Format: [[stream_name, [[id, [field, value, ...]], ...]], ...]
376    fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
377        match value {
378            redis::Value::Array(streams) => {
379                let mut all_entries = Vec::new();
380                
381                for stream in streams {
382                    if let redis::Value::Array(stream_parts) = stream {
383                        if stream_parts.len() >= 2 {
384                            // Second element contains the entries
385                            if let redis::Value::Array(entries) = &stream_parts[1] {
386                                for entry in entries {
387                                    if let redis::Value::Array(entry_parts) = entry {
388                                        if entry_parts.len() >= 2 {
389                                            let id = Self::value_to_string(&entry_parts[0]);
390                                            
391                                            if let redis::Value::Array(field_values) = &entry_parts[1] {
392                                                let mut fields = Vec::new();
393                                                
394                                                for chunk in field_values.chunks(2) {
395                                                    if chunk.len() == 2 {
396                                                        let field = Self::value_to_string(&chunk[0]);
397                                                        let value = Self::value_to_string(&chunk[1]);
398                                                        fields.push((field, value));
399                                                    }
400                                                }
401                                                
402                                                all_entries.push((id, fields));
403                                            }
404                                        }
405                                    }
406                                }
407                            }
408                        }
409                    }
410                }
411                
412                Ok(all_entries)
413            }
414            redis::Value::Nil => Ok(Vec::new()),
415            _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
416        }
417    }
418    
419    /// Helper function to convert Redis Value to String
420    fn value_to_string(value: &redis::Value) -> String {
421        match value {
422            redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
423            redis::Value::SimpleString(s) => s.clone(),
424            redis::Value::Int(i) => i.to_string(),
425            redis::Value::Okay => "OK".to_string(),
426            redis::Value::Nil => String::new(),
427            _ => String::new(),
428        }
429    }
430}
431
432// ===== Trait Implementations =====
433
434use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
435use async_trait::async_trait;
436
437/// Implement CacheBackend trait for L2Cache
438///
439/// This allows L2Cache to be used as a pluggable backend in the multi-tier cache system.
440#[async_trait]
441impl CacheBackend for L2Cache {
442    async fn get(&self, key: &str) -> Option<serde_json::Value> {
443        L2Cache::get(self, key).await
444    }
445
446    async fn set_with_ttl(
447        &self,
448        key: &str,
449        value: serde_json::Value,
450        ttl: Duration,
451    ) -> Result<()> {
452        L2Cache::set_with_ttl(self, key, value, ttl).await
453    }
454
455    async fn remove(&self, key: &str) -> Result<()> {
456        L2Cache::remove(self, key).await
457    }
458
459    async fn health_check(&self) -> bool {
460        L2Cache::health_check(self).await
461    }
462
463    fn name(&self) -> &str {
464        "Redis (L2)"
465    }
466}
467
468/// Implement L2CacheBackend trait for L2Cache
469///
470/// This extends CacheBackend with TTL introspection capabilities needed for L2->L1 promotion.
471#[async_trait]
472impl L2CacheBackend for L2Cache {
473    async fn get_with_ttl(
474        &self,
475        key: &str,
476    ) -> Option<(serde_json::Value, Option<Duration>)> {
477        L2Cache::get_with_ttl(self, key).await
478    }
479}
480
481/// Implement StreamingBackend trait for L2Cache
482///
483/// This enables Redis Streams functionality for event-driven architectures.
484#[async_trait]
485impl StreamingBackend for L2Cache {
486    async fn stream_add(
487        &self,
488        stream_key: &str,
489        fields: Vec<(String, String)>,
490        maxlen: Option<usize>,
491    ) -> Result<String> {
492        L2Cache::stream_add(self, stream_key, fields, maxlen).await
493    }
494
495    async fn stream_read_latest(
496        &self,
497        stream_key: &str,
498        count: usize,
499    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
500        L2Cache::stream_read_latest(self, stream_key, count).await
501    }
502
503    async fn stream_read(
504        &self,
505        stream_key: &str,
506        last_id: &str,
507        count: usize,
508        block_ms: Option<usize>,
509    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
510        L2Cache::stream_read(self, stream_key, last_id, count, block_ms).await
511    }
512}