multi_tier_cache/
l2_cache.rs

1//! L2 Cache - Redis Cache
2//! 
3//! Redis-based distributed cache for warm data storage with persistence.
4
5use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use serde_json;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12/// L2 Cache using Redis
13pub struct L2Cache {
14    /// Redis client
15    client: Client,
16    /// Connection pool (using a simple approach)
17    /// Hit counter
18    hits: Arc<AtomicU64>,
19    /// Miss counter
20    misses: Arc<AtomicU64>,
21    /// Set counter
22    sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26    /// Create new L2 cache
27    pub async fn new() -> Result<Self> {
28        println!("  🔴 Initializing L2 Cache (Redis)...");
29        
30        // Try to connect to Redis
31        let redis_url = std::env::var("REDIS_URL")
32            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33            
34        let client = Client::open(redis_url.as_str())?;
35        
36        // Test connection
37        let mut conn = client.get_multiplexed_async_connection().await?;
38        let _: String = redis::cmd("PING").query_async(&mut conn).await?;
39        
40        println!("  ✅ L2 Cache connected to Redis at {}", redis_url);
41        
42        Ok(Self {
43            client,
44            hits: Arc::new(AtomicU64::new(0)),
45            misses: Arc::new(AtomicU64::new(0)),
46            sets: Arc::new(AtomicU64::new(0)),
47        })
48    }
49    
50    /// Get value from L2 cache
51    pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
52        match self.client.get_multiplexed_async_connection().await {
53            Ok(mut conn) => {
54                match conn.get::<_, String>(key).await {
55                    Ok(json_str) => {
56                        match serde_json::from_str(&json_str) {
57                            Ok(value) => {
58                                self.hits.fetch_add(1, Ordering::Relaxed);
59                                Some(value)
60                            }
61                            Err(_) => {
62                                self.misses.fetch_add(1, Ordering::Relaxed);
63                                None
64                            }
65                        }
66                    }
67                    Err(_) => {
68                        self.misses.fetch_add(1, Ordering::Relaxed);
69                        None
70                    }
71                }
72            }
73            Err(_) => {
74                self.misses.fetch_add(1, Ordering::Relaxed);
75                None
76            }
77        }
78    }
79    
80    /// Set value with custom TTL
81    pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
82        let json_str = serde_json::to_string(&value)?;
83        let mut conn = self.client.get_multiplexed_async_connection().await?;
84        
85        let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
86        self.sets.fetch_add(1, Ordering::Relaxed);
87        println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
88        Ok(())
89    }
90    
91    /// Remove value from cache
92    pub async fn remove(&self, key: &str) -> Result<()> {
93        let mut conn = self.client.get_multiplexed_async_connection().await?;
94        let _: () = conn.del(key).await?;
95        Ok(())
96    }
97    
98    /// Health check
99    pub async fn health_check(&self) -> bool {
100        let test_key = "health_check_l2";
101        let timestamp = std::time::SystemTime::now()
102            .duration_since(std::time::UNIX_EPOCH)
103            .unwrap_or(Duration::from_secs(0))
104            .as_secs();
105        let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
106
107        match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
108            Ok(_) => {
109                match self.get(test_key).await {
110                    Some(retrieved) => {
111                        let _ = self.remove(test_key).await;
112                        retrieved["test"].as_bool().unwrap_or(false)
113                    }
114                    None => false
115                }
116            }
117            Err(_) => false
118        }
119    }
120    
121    /// Publish data to Redis Stream using XADD
122    ///
123    /// # Arguments
124    /// * `stream_key` - Name of the Redis Stream (e.g., "events_stream")
125    /// * `fields` - Vec of field-value pairs to add to the stream
126    /// * `maxlen` - Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
127    ///
128    /// # Returns
129    /// The stream entry ID generated by Redis (e.g., "1234567890123-0")
130    pub async fn stream_add(
131        &self, 
132        stream_key: &str, 
133        fields: Vec<(String, String)>,
134        maxlen: Option<usize>
135    ) -> Result<String> {
136        let mut conn = self.client.get_multiplexed_async_connection().await?;
137        
138        // Build XADD command with MAXLEN trimming if specified
139        let mut cmd = redis::cmd("XADD");
140        cmd.arg(stream_key);
141        
142        if let Some(max) = maxlen {
143            // Use MAXLEN ~ for approximate trimming (more efficient)
144            cmd.arg("MAXLEN").arg("~").arg(max);
145        }
146        
147        cmd.arg("*");  // Auto-generate ID
148        
149        // Add all field-value pairs
150        for (field, value) in &fields {
151            cmd.arg(field).arg(value);
152        }
153        
154        let entry_id: String = cmd.query_async(&mut conn).await?;
155        
156        println!("📤 [L2 Stream] Published to '{}' (ID: {}, fields: {})", stream_key, entry_id, fields.len());
157        Ok(entry_id)
158    }
159    
160    /// Read latest entries from Redis Stream using XREVRANGE
161    /// 
162    /// # Arguments
163    /// * `stream_key` - Name of the Redis Stream
164    /// * `count` - Number of latest entries to retrieve
165    /// 
166    /// # Returns
167    /// Vector of (entry_id, fields) tuples, ordered from newest to oldest
168    pub async fn stream_read_latest(
169        &self,
170        stream_key: &str,
171        count: usize
172    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
173        let mut conn = self.client.get_multiplexed_async_connection().await?;
174        
175        // XREVRANGE returns a raw Value that we need to parse manually
176        let raw_result: redis::Value = redis::cmd("XREVRANGE")
177            .arg(stream_key)
178            .arg("+")  // Start from the newest
179            .arg("-")  // To the oldest
180            .arg("COUNT")
181            .arg(count)
182            .query_async(&mut conn)
183            .await?;
184        
185        let entries = Self::parse_stream_response(raw_result)?;
186        
187        println!("📥 [L2 Stream] Read {} entries from '{}'", entries.len(), stream_key);
188        Ok(entries)
189    }
190    
191    /// Read entries from Redis Stream using XREAD (blocking or non-blocking)
192    /// 
193    /// # Arguments
194    /// * `stream_key` - Name of the Redis Stream
195    /// * `last_id` - Last entry ID seen (use "0" to read from beginning, "$" for only new entries)
196    /// * `count` - Maximum number of entries to retrieve
197    /// * `block_ms` - Optional blocking timeout in milliseconds (None for non-blocking)
198    /// 
199    /// # Returns
200    /// Vector of (entry_id, fields) tuples
201    pub async fn stream_read(
202        &self,
203        stream_key: &str,
204        last_id: &str,
205        count: usize,
206        block_ms: Option<usize>
207    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
208        let mut conn = self.client.get_multiplexed_async_connection().await?;
209        
210        let mut cmd = redis::cmd("XREAD");
211        
212        // Add BLOCK if specified
213        if let Some(block) = block_ms {
214            cmd.arg("BLOCK").arg(block);
215        }
216        
217        cmd.arg("COUNT").arg(count);
218        cmd.arg("STREAMS").arg(stream_key).arg(last_id);
219        
220        let raw_result: redis::Value = cmd.query_async(&mut conn).await?;
221        
222        // XREAD returns [[stream_name, [[id, [field, value, ...]], ...]]]
223        let entries = Self::parse_xread_response(raw_result)?;
224        
225        println!("📥 [L2 Stream] XREAD retrieved {} entries from '{}'", entries.len(), stream_key);
226        Ok(entries)
227    }
228    
229    /// Helper function to parse Redis stream response from XREVRANGE/XRANGE
230    /// Format: [[id, [field, value, field, value, ...]], ...]
231    fn parse_stream_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
232        match value {
233            redis::Value::Array(entries) => {
234                let mut result = Vec::new();
235                
236                for entry in entries {
237                    if let redis::Value::Array(entry_parts) = entry {
238                        if entry_parts.len() >= 2 {
239                            // First element is the ID
240                            let id = Self::value_to_string(&entry_parts[0]);
241                            
242                            // Second element is the field-value array
243                            if let redis::Value::Array(field_values) = &entry_parts[1] {
244                                let mut fields = Vec::new();
245                                
246                                // Process pairs of field-value
247                                for chunk in field_values.chunks(2) {
248                                    if chunk.len() == 2 {
249                                        let field = Self::value_to_string(&chunk[0]);
250                                        let value = Self::value_to_string(&chunk[1]);
251                                        fields.push((field, value));
252                                    }
253                                }
254                                
255                                result.push((id, fields));
256                            }
257                        }
258                    }
259                }
260                
261                Ok(result)
262            }
263            redis::Value::Nil => Ok(Vec::new()),
264            _ => Err(anyhow::anyhow!("Unexpected Redis stream response format"))
265        }
266    }
267    
268    /// Helper function to parse XREAD response
269    /// Format: [[stream_name, [[id, [field, value, ...]], ...]], ...]
270    fn parse_xread_response(value: redis::Value) -> Result<Vec<(String, Vec<(String, String)>)>> {
271        match value {
272            redis::Value::Array(streams) => {
273                let mut all_entries = Vec::new();
274                
275                for stream in streams {
276                    if let redis::Value::Array(stream_parts) = stream {
277                        if stream_parts.len() >= 2 {
278                            // Second element contains the entries
279                            if let redis::Value::Array(entries) = &stream_parts[1] {
280                                for entry in entries {
281                                    if let redis::Value::Array(entry_parts) = entry {
282                                        if entry_parts.len() >= 2 {
283                                            let id = Self::value_to_string(&entry_parts[0]);
284                                            
285                                            if let redis::Value::Array(field_values) = &entry_parts[1] {
286                                                let mut fields = Vec::new();
287                                                
288                                                for chunk in field_values.chunks(2) {
289                                                    if chunk.len() == 2 {
290                                                        let field = Self::value_to_string(&chunk[0]);
291                                                        let value = Self::value_to_string(&chunk[1]);
292                                                        fields.push((field, value));
293                                                    }
294                                                }
295                                                
296                                                all_entries.push((id, fields));
297                                            }
298                                        }
299                                    }
300                                }
301                            }
302                        }
303                    }
304                }
305                
306                Ok(all_entries)
307            }
308            redis::Value::Nil => Ok(Vec::new()),
309            _ => Err(anyhow::anyhow!("Unexpected XREAD response format"))
310        }
311    }
312    
313    /// Helper function to convert Redis Value to String
314    fn value_to_string(value: &redis::Value) -> String {
315        match value {
316            redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
317            redis::Value::SimpleString(s) => s.clone(),
318            redis::Value::Int(i) => i.to_string(),
319            redis::Value::Okay => "OK".to_string(),
320            redis::Value::Nil => String::new(),
321            _ => String::new(),
322        }
323    }
324}