multi_tier_cache/
l2_cache.rs

1//! L2 Cache - Redis Cache
2//!
3//! Redis-based distributed cache for warm data storage with persistence.
4
5use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// L2 Cache using Redis with ConnectionManager for automatic reconnection
14pub struct L2Cache {
15    /// Redis connection manager - handles reconnection automatically
16    conn_manager: ConnectionManager,
17    /// Hit counter
18    hits: Arc<AtomicU64>,
19    /// Miss counter
20    misses: Arc<AtomicU64>,
21    /// Set counter
22    sets: Arc<AtomicU64>,
23}
24
25impl L2Cache {
26    /// Create new L2 cache with ConnectionManager for automatic reconnection
27    pub async fn new() -> Result<Self> {
28        println!("  🔴 Initializing L2 Cache (Redis with ConnectionManager)...");
29
30        // Try to connect to Redis
31        let redis_url = std::env::var("REDIS_URL")
32            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33
34        let client = Client::open(redis_url.as_str())?;
35
36        // Create ConnectionManager - handles reconnection automatically
37        let conn_manager = ConnectionManager::new(client).await?;
38
39        // Test connection
40        let mut conn = conn_manager.clone();
41        let _: String = redis::cmd("PING").query_async(&mut conn).await?;
42
43        println!("  ✅ L2 Cache connected to Redis at {} (ConnectionManager enabled)", redis_url);
44
45        Ok(Self {
46            conn_manager,
47            hits: Arc::new(AtomicU64::new(0)),
48            misses: Arc::new(AtomicU64::new(0)),
49            sets: Arc::new(AtomicU64::new(0)),
50        })
51    }
52    
53    /// Get value from L2 cache using persistent ConnectionManager
54    pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
55        let mut conn = self.conn_manager.clone();
56
57        match conn.get::<_, String>(key).await {
58            Ok(json_str) => {
59                match serde_json::from_str(&json_str) {
60                    Ok(value) => {
61                        self.hits.fetch_add(1, Ordering::Relaxed);
62                        Some(value)
63                    }
64                    Err(_) => {
65                        self.misses.fetch_add(1, Ordering::Relaxed);
66                        None
67                    }
68                }
69            }
70            Err(_) => {
71                self.misses.fetch_add(1, Ordering::Relaxed);
72                None
73            }
74        }
75    }
76
77    /// Get value with its remaining TTL from L2 cache
78    ///
79    /// Returns tuple of (value, ttl_seconds) if key exists
80    /// TTL is in seconds, None if key doesn't exist or has no expiration
81    pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
82        let mut conn = self.conn_manager.clone();
83
84        // Get value
85        let json_str: String = match conn.get(key).await {
86            Ok(s) => s,
87            Err(_) => {
88                self.misses.fetch_add(1, Ordering::Relaxed);
89                return None;
90            }
91        };
92
93        // Parse JSON
94        let value: serde_json::Value = match serde_json::from_str(&json_str) {
95            Ok(v) => v,
96            Err(_) => {
97                self.misses.fetch_add(1, Ordering::Relaxed);
98                return None;
99            }
100        };
101
102        // Get TTL (in seconds, -1 = no expiry, -2 = key doesn't exist)
103        let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
104            Ok(ttl) => ttl,
105            Err(_) => -1, // Fallback: treat as no expiry
106        };
107
108        self.hits.fetch_add(1, Ordering::Relaxed);
109
110        let ttl = if ttl_secs > 0 {
111            Some(Duration::from_secs(ttl_secs as u64))
112        } else {
113            None // No expiry or error
114        };
115
116        Some((value, ttl))
117    }
118    
119    /// Set value with custom TTL using persistent ConnectionManager
120    pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
121        let json_str = serde_json::to_string(&value)?;
122        let mut conn = self.conn_manager.clone();
123
124        let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
125        self.sets.fetch_add(1, Ordering::Relaxed);
126        println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
127        Ok(())
128    }
129    
130    /// Remove value from cache using persistent ConnectionManager
131    pub async fn remove(&self, key: &str) -> Result<()> {
132        let mut conn = self.conn_manager.clone();
133        let _: () = conn.del(key).await?;
134        Ok(())
135    }
136
137    /// Scan keys matching a pattern (glob-style: *, ?, [])
138    ///
139    /// Uses Redis SCAN command (non-blocking, cursor-based iteration)
140    /// This is safe for production use, unlike KEYS command.
141    ///
142    /// # Arguments
143    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
144    ///
145    /// # Returns
146    /// Vector of matching key names
147    ///
148    /// # Examples
149    /// ```no_run
150    /// # use multi_tier_cache::L2Cache;
151    /// # async fn example() -> anyhow::Result<()> {
152    /// # let cache = L2Cache::new().await?;
153    /// // Find all user cache keys
154    /// let keys = cache.scan_keys("user:*").await?;
155    ///
156    /// // Find specific user's cache keys
157    /// let keys = cache.scan_keys("user:123:*").await?;
158    /// # Ok(())
159    /// # }
160    /// ```
161    pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
162        let mut conn = self.conn_manager.clone();
163        let mut keys = Vec::new();
164        let mut cursor: u64 = 0;
165
166        loop {
167            // SCAN cursor MATCH pattern COUNT 100
168            let result: (u64, Vec<String>) = redis::cmd("SCAN")
169                .arg(cursor)
170                .arg("MATCH")
171                .arg(pattern)
172                .arg("COUNT")
173                .arg(100) // Fetch 100 keys per iteration
174                .query_async(&mut conn)
175                .await?;
176
177            cursor = result.0;
178            keys.extend(result.1);
179
180            // Cursor 0 means iteration is complete
181            if cursor == 0 {
182                break;
183            }
184        }
185
186        println!("🔍 [L2] Scanned keys matching '{}': {} found", pattern, keys.len());
187        Ok(keys)
188    }
189
190    /// Remove multiple keys at once (bulk delete)
191    ///
192    /// More efficient than calling remove() multiple times
193    pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
194        if keys.is_empty() {
195            return Ok(0);
196        }
197
198        let mut conn = self.conn_manager.clone();
199        let count: usize = conn.del(keys).await?;
200        println!("🗑️  [L2] Removed {} keys", count);
201        Ok(count)
202    }
203    
204    /// Health check
205    pub async fn health_check(&self) -> bool {
206        let test_key = "health_check_l2";
207        let timestamp = std::time::SystemTime::now()
208            .duration_since(std::time::UNIX_EPOCH)
209            .unwrap_or(Duration::from_secs(0))
210            .as_secs();
211        let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
212
213        match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
214            Ok(_) => {
215                match self.get(test_key).await {
216                    Some(retrieved) => {
217                        let _ = self.remove(test_key).await;
218                        retrieved["test"].as_bool().unwrap_or(false)
219                    }
220                    None => false
221                }
222            }
223            Err(_) => false
224        }
225    }
226    
227}
228
229// ===== Trait Implementations =====
230
231use crate::traits::{CacheBackend, L2CacheBackend};
232use async_trait::async_trait;
233
234/// Implement CacheBackend trait for L2Cache
235///
236/// This allows L2Cache to be used as a pluggable backend in the multi-tier cache system.
237#[async_trait]
238impl CacheBackend for L2Cache {
239    async fn get(&self, key: &str) -> Option<serde_json::Value> {
240        L2Cache::get(self, key).await
241    }
242
243    async fn set_with_ttl(
244        &self,
245        key: &str,
246        value: serde_json::Value,
247        ttl: Duration,
248    ) -> Result<()> {
249        L2Cache::set_with_ttl(self, key, value, ttl).await
250    }
251
252    async fn remove(&self, key: &str) -> Result<()> {
253        L2Cache::remove(self, key).await
254    }
255
256    async fn health_check(&self) -> bool {
257        L2Cache::health_check(self).await
258    }
259
260    fn name(&self) -> &str {
261        "Redis (L2)"
262    }
263}
264
265/// Implement L2CacheBackend trait for L2Cache
266///
267/// This extends CacheBackend with TTL introspection capabilities needed for L2->L1 promotion.
268#[async_trait]
269impl L2CacheBackend for L2Cache {
270    async fn get_with_ttl(
271        &self,
272        key: &str,
273    ) -> Option<(serde_json::Value, Option<Duration>)> {
274        L2Cache::get_with_ttl(self, key).await
275    }
276}