multi_tier_cache/
l2_cache.rs

1//! L2 Cache - Redis Cache
2//!
3//! Redis-based distributed cache for warm data storage with persistence.
4
5use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// L2 Cache using Redis with ConnectionManager for automatic reconnection
14pub struct L2Cache {
15    /// Redis connection manager - handles reconnection automatically
16    conn_manager: ConnectionManager,
17    /// Hit counter
18    hits: Arc<AtomicU64>,
19    /// Miss counter
20    misses: Arc<AtomicU64>,
21    /// Set counter
22    sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26    /// Create new L2 cache with ConnectionManager for automatic reconnection
27    pub async fn new() -> Result<Self> {
28        println!("  🔴 Initializing L2 Cache (Redis with ConnectionManager)...");
29
30        // Try to connect to Redis
31        let redis_url = std::env::var("REDIS_URL")
32            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34        let client = Client::open(redis_url.as_str())?;
35
36        // Create ConnectionManager - handles reconnection automatically
37        let conn_manager = ConnectionManager::new(client).await?;
38
39        // Test connection
40        let mut conn = conn_manager.clone();
41        let _: String = redis::cmd("PING").query_async(&mut conn).await?;
42
43        println!("  ✅ L2 Cache connected to Redis at {} (ConnectionManager enabled)", redis_url);
44
45        Ok(Self {
46            conn_manager,
47            hits: Arc::new(AtomicU64::new(0)),
48            misses: Arc::new(AtomicU64::new(0)),
49            sets: Arc::new(AtomicU64::new(0)),
50        })
51    }
52    
53    /// Get value from L2 cache using persistent ConnectionManager
54    pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
55        let mut conn = self.conn_manager.clone();
56
57        match conn.get::<_, String>(key).await {
58            Ok(json_str) => {
59                match serde_json::from_str(&json_str) {
60                    Ok(value) => {
61                        self.hits.fetch_add(1, Ordering::Relaxed);
62                        Some(value)
63                    }
64                    Err(_) => {
65                        self.misses.fetch_add(1, Ordering::Relaxed);
66                        None
67                    }
68                }
69            }
70            Err(_) => {
71                self.misses.fetch_add(1, Ordering::Relaxed);
72                None
73            }
74        }
75    }
76
77    /// Get value with its remaining TTL from L2 cache
78    ///
79    /// Returns tuple of (value, ttl_seconds) if key exists
80    /// TTL is in seconds, None if key doesn't exist or has no expiration
81    pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
82        let mut conn = self.conn_manager.clone();
83
84        // Get value
85        let json_str: String = match conn.get(key).await {
86            Ok(s) => s,
87            Err(_) => {
88                self.misses.fetch_add(1, Ordering::Relaxed);
89                return None;
90            }
91        };
92
93        // Parse JSON
94        let value: serde_json::Value = match serde_json::from_str(&json_str) {
95            Ok(v) => v,
96            Err(_) => {
97                self.misses.fetch_add(1, Ordering::Relaxed);
98                return None;
99            }
100        };
101
102        // Get TTL (in seconds, -1 = no expiry, -2 = key doesn't exist)
103        let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
104            Ok(ttl) => ttl,
105            Err(_) => -1, // Fallback: treat as no expiry
106        };
107
108        self.hits.fetch_add(1, Ordering::Relaxed);
109
110        let ttl = if ttl_secs > 0 {
111            Some(Duration::from_secs(ttl_secs as u64))
112        } else {
113            None // No expiry or error
114        };
115
116        Some((value, ttl))
117    }
118    
119    /// Set value with custom TTL using persistent ConnectionManager
120    pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
121        let json_str = serde_json::to_string(&value)?;
122        let mut conn = self.conn_manager.clone();
123
124        let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
125        self.sets.fetch_add(1, Ordering::Relaxed);
126        println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
127        Ok(())
128    }
129    
130    /// Remove value from cache using persistent ConnectionManager
131    pub async fn remove(&self, key: &str) -> Result<()> {
132        let mut conn = self.conn_manager.clone();
133        let _: () = conn.del(key).await?;
134        Ok(())
135    }
136    
137    /// Health check
138    pub async fn health_check(&self) -> bool {
139        let test_key = "health_check_l2";
140        let timestamp = std::time::SystemTime::now()
141            .duration_since(std::time::UNIX_EPOCH)
142            .unwrap_or(Duration::from_secs(0))
143            .as_secs();
144        let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
145
146        match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
147            Ok(_) => {
148                match self.get(test_key).await {
149                    Some(retrieved) => {
150                        let _ = self.remove(test_key).await;
151                        retrieved["test"].as_bool().unwrap_or(false)
152                    }
153                    None => false
154                }
155            }
156            Err(_) => false
157        }
158    }
159    
160    /// Publish data to Redis Stream using XADD
161    ///
162    /// # Arguments
163    /// * `stream_key` - Name of the Redis Stream (e.g., "events_stream")
164    /// * `fields` - Vec of field-value pairs to add to the stream
165    /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
166    ///
167    /// # Returns
168    /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
169    pub async fn stream_add(
170        &self,
171        stream_key: &str,
172        fields: Vec<(String, String)>,
173        maxlen: Option<usize>
174    ) -> Result<String> {
175        let mut conn = self.conn_manager.clone();
176
177        // Build XADD command with MAXLEN trimming if specified
178        let mut cmd = redis::cmd("XADD");
179        cmd.arg(stream_key);
180
181        if let Some(max) = maxlen {
182            // Use MAXLEN ~ for approximate trimming (more efficient)
183            cmd.arg("MAXLEN").arg("~").arg(max);
184        }
185
186        cmd.arg("*");  // Auto-generate ID
187
188        // Add all field-value pairs
189        for (field, value) in &fields {
190            cmd.arg(field).arg(value);
191        }
192
193        let entry_id: String = cmd.query_async(&mut conn).await?;
194
195        println!("📤 [L2 Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
196        Ok(entry_id)
197    }
198    
199    /// Read latest entries from Redis Stream using XREVRANGE
200    /// 
201    /// # Arguments
202    /// * `stream_key` - Name of the Redis Stream
203    /// * `count` - Number of latest entries to retrieve
204    /// 
205    /// # Returns
206    /// Vector of (entry_id, fields) tuples, ordered from newest to oldest
207    pub async fn stream_read_latest(
208        &self,
209        stream_key: &str,
210        count: usize
211    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
212        let mut conn = self.conn_manager.clone();
213
214        // XREVRANGE returns a raw Value that we need to parse manually
215        let raw_result: redis::Value = redis::cmd("XREVRANGE")
216            .arg(stream_key)
217            .arg("+")  // Start from the newest
218            .arg("-")  // To the oldest
219            .arg("COUNT")
220            .arg(count)
221            .query_async(&mut conn)
222            .await?;
223
224        let entries = Self::parse_stream_response(raw_result)?;
225
226        println!("📥 [L2 Stream] Read {} entries from '{}'", entries.len(), stream_key);
227        Ok(entries)
228    }
229    
230    /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
231    /// 
232    /// # Arguments
233    /// * `stream_key` - Name of the Redis Stream
234    /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
235    /// * `count` - Maximum number of entries to retrieve
236    /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
237    /// 
238    /// # Returns
239    /// Vector of (entry_id, fields) tuples
240    pub async fn stream_read(
241        &self,
242        stream_key: &str,
243        last_id: &str,
244        count: usize,
245        block_ms: Option<usize>
246    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
247        let mut conn = self.conn_manager.clone();
248
249        let mut cmd = redis::cmd("XREAD");
250
251        // Add BLOCK if specified
252        if let Some(block) = block_ms {
253            cmd.arg("BLOCK").arg(block);
254        }
255
256        cmd.arg("COUNT").arg(count);
257        cmd.arg("STREAMS").arg(stream_key).arg(last_id);
258
259        let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
260
261        // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
262        let entries = Self::parse_xread_response(raw_result)?;
263
264        println!("📥 [L2 Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
265        Ok(entries)
266    }
267    
268    /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
269    /// Format: [[id, [field, value, field, value, ...]], ...]
270    fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
271        match value {
272            redis::Value::Array(entries) => {
273                let mut result = Vec::new();
274                
275                for entry in entries {
276                    if let redis::Value::Array(entry_parts) = entry {
277                        if entry_parts.len() >= 2 {
278                            // First element is the ID
279                            let id = Self::value_to_string(&entry_parts[0]);
280                            
281                            // Second element is the field-value array
282                            if let redis::Value::Array(field_values) = &entry_parts[1] {
283                                let mut fields = Vec::new();
284                                
285                                // Process pairs of field-value
286                                for chunk in field_values.chunks(2) {
287                                    if chunk.len() == 2 {
288                                        let field = Self::value_to_string(&chunk[0]);
289                                        let value = Self::value_to_string(&chunk[1]);
290                                        fields.push((field, value));
291                                    }
292                                }
293                                
294                                result.push((id, fields));
295                            }
296                        }
297                    }
298                }
299                
300                Ok(result)
301            }
302            redis::Value::Nil => Ok(Vec::new()),
303            _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
304        }
305    }
306    
307    /// Helper function to parse XREAD response
308    /// Format: [[stream_name, [[id, [field, value, ...]], ...]], ...]
309    fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
310        match value {
311            redis::Value::Array(streams) => {
312                let mut all_entries = Vec::new();
313                
314                for stream in streams {
315                    if let redis::Value::Array(stream_parts) = stream {
316                        if stream_parts.len() >= 2 {
317                            // Second element contains the entries
318                            if let redis::Value::Array(entries) = &stream_parts[1] {
319                                for entry in entries {
320                                    if let redis::Value::Array(entry_parts) = entry {
321                                        if entry_parts.len() >= 2 {
322                                            let id = Self::value_to_string(&entry_parts[0]);
323                                            
324                                            if let redis::Value::Array(field_values) = &entry_parts[1] {
325                                                let mut fields = Vec::new();
326                                                
327                                                for chunk in field_values.chunks(2) {
328                                                    if chunk.len() == 2 {
329                                                        let field = Self::value_to_string(&chunk[0]);
330                                                        let value = Self::value_to_string(&chunk[1]);
331                                                        fields.push((field, value));
332                                                    }
333                                                }
334                                                
335                                                all_entries.push((id, fields));
336                                            }
337                                        }
338                                    }
339                                }
340                            }
341                        }
342                    }
343                }
344                
345                Ok(all_entries)
346            }
347            redis::Value::Nil => Ok(Vec::new()),
348            _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
349        }
350    }
351    
352    /// Helper function to convert Redis Value to String
353    fn value_to_string(value: &redis::Value) -> String {
354        match value {
355            redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
356            redis::Value::SimpleString(s) => s.clone(),
357            redis::Value::Int(i) => i.to_string(),
358            redis::Value::Okay => "OK".to_string(),
359            redis::Value::Nil => String::new(),
360            _ => String::new(),
361        }
362    }
363}
364
365// ===== Trait Implementations =====
366
367use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
368use async_trait::async_trait;
369
370/// Implement CacheBackend trait for L2Cache
371///
372/// This allows L2Cache to be used as a pluggable backend in the multi-tier cache system.
373#[async_trait]
374impl CacheBackend for L2Cache {
375    async fn get(&self, key: &str) -> Option<serde_json::Value> {
376        L2Cache::get(self, key).await
377    }
378
379    async fn set_with_ttl(
380        &self,
381        key: &str,
382        value: serde_json::Value,
383        ttl: Duration,
384    ) -> Result<()> {
385        L2Cache::set_with_ttl(self, key, value, ttl).await
386    }
387
388    async fn remove(&self, key: &str) -> Result<()> {
389        L2Cache::remove(self, key).await
390    }
391
392    async fn health_check(&self) -> bool {
393        L2Cache::health_check(self).await
394    }
395
396    fn name(&self) -> &str {
397        "Redis (L2)"
398    }
399}
400
401/// Implement L2CacheBackend trait for L2Cache
402///
403/// This extends CacheBackend with TTL introspection capabilities needed for L2->L1 promotion.
404#[async_trait]
405impl L2CacheBackend for L2Cache {
406    async fn get_with_ttl(
407        &self,
408        key: &str,
409    ) -> Option<(serde_json::Value, Option<Duration>)> {
410        L2Cache::get_with_ttl(self, key).await
411    }
412}
413
414/// Implement StreamingBackend trait for L2Cache
415///
416/// This enables Redis Streams functionality for event-driven architectures.
417#[async_trait]
418impl StreamingBackend for L2Cache {
419    async fn stream_add(
420        &self,
421        stream_key: &str,
422        fields: Vec<(String, String)>,
423        maxlen: Option<usize>,
424    ) -> Result<String> {
425        L2Cache::stream_add(self, stream_key, fields, maxlen).await
426    }
427
428    async fn stream_read_latest(
429        &self,
430        stream_key: &str,
431        count: usize,
432    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
433        L2Cache::stream_read_latest(self, stream_key, count).await
434    }
435
436    async fn stream_read(
437        &self,
438        stream_key: &str,
439        last_id: &str,
440        count: usize,
441        block_ms: Option<usize>,
442    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
443        L2Cache::stream_read(self, stream_key, last_id, count, block_ms).await
444    }
445}