multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//! 
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use super::l1_cache::L1Cache;
15use super::l2_cache::L2Cache;
16
17/// RAII cleanup guard for in-flight request tracking
18/// Ensures that entries are removed from DashMap even on early return or panic
19struct CleanupGuard<'a> {
20    map: &'a DashMap<String, Arc<Mutex<()>>>,
21    key: String,
22}
23
24impl<'a> Drop for CleanupGuard<'a> {
25    fn drop(&mut self) {
26        self.map.remove(&self.key);
27    }
28}
29
30/// Cache strategies for different data types
31#[derive(Debug, Clone)]
32#[allow(dead_code)]
33pub enum CacheStrategy {
34    /// Real-time data - 10 seconds TTL
35    RealTime,
36    /// Short-term data - 5 minutes TTL  
37    ShortTerm,
38    /// Medium-term data - 1 hour TTL
39    MediumTerm,
40    /// Long-term data - 3 hours TTL
41    LongTerm,
42    /// Custom TTL
43    Custom(Duration),
44    /// Default strategy (5 minutes)
45    Default,
46}
47
48impl CacheStrategy {
49    /// Convert strategy to duration
50    pub fn to_duration(&self) -> Duration {
51        match self {
52            Self::RealTime => Duration::from_secs(10),
53            Self::ShortTerm => Duration::from_secs(300), // 5 minutes
54            Self::MediumTerm => Duration::from_secs(3600), // 1 hour
55            Self::LongTerm => Duration::from_secs(10800), // 3 hours
56            Self::Custom(duration) => *duration,
57            Self::Default => Duration::from_secs(300),
58        }
59    }
60}
61
62/// Cache Manager - Unified operations across L1 and L2
63pub struct CacheManager {
64    /// L1 Cache (Moka)
65    l1_cache: Arc<L1Cache>,
66    /// L2 Cache (Redis)
67    l2_cache: Arc<L2Cache>,
68    /// Statistics
69    total_requests: Arc<AtomicU64>,
70    l1_hits: Arc<AtomicU64>,
71    l2_hits: Arc<AtomicU64>,
72    misses: Arc<AtomicU64>,
73    promotions: Arc<AtomicUsize>,
74    /// In-flight requests to prevent Cache Stampede on L2/compute operations
75    in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
76}
77
78impl CacheManager {
79    /// Create new cache manager
80    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
81        println!("  🎯 Initializing Cache Manager...");
82        
83        Ok(Self {
84            l1_cache,
85            l2_cache,
86            total_requests: Arc::new(AtomicU64::new(0)),
87            l1_hits: Arc::new(AtomicU64::new(0)),
88            l2_hits: Arc::new(AtomicU64::new(0)),
89            misses: Arc::new(AtomicU64::new(0)),
90            promotions: Arc::new(AtomicUsize::new(0)),
91            in_flight_requests: Arc::new(DashMap::new()),
92        })
93    }
94    
95    /// Get value from cache (L1 first, then L2 fallback with promotion)
96    /// 
97    /// This method now includes built-in Cache Stampede protection when cache misses occur.
98    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
99    /// unnecessary duplicate work on external data sources.
100    /// 
101    /// # Arguments
102    /// * `key` - Cache key to retrieve
103    /// 
104    /// # Returns
105    /// * `Ok(Some(value))` - Cache hit, value found in L1 or L2
106    /// * `Ok(None)` - Cache miss, value not found in either cache
107    /// * `Err(error)` - Cache operation failed
108    pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
109        self.total_requests.fetch_add(1, Ordering::Relaxed);
110        
111        // Fast path: Try L1 first (no locking needed)
112        if let Some(value) = self.l1_cache.get(key).await {
113            self.l1_hits.fetch_add(1, Ordering::Relaxed);
114            return Ok(Some(value));
115        }
116        
117        // L1 miss - implement Cache Stampede protection for L2 lookup
118        let key_owned = key.to_string();
119        let lock_guard = self.in_flight_requests
120            .entry(key_owned.clone())
121            .or_insert_with(|| Arc::new(Mutex::new(())))
122            .clone();
123        
124        let _guard = lock_guard.lock().await;
125        
126        // RAII cleanup guard - ensures entry is removed even on early return or panic
127        let cleanup_guard = CleanupGuard {
128            map: &self.in_flight_requests,
129            key: key_owned.clone(),
130        };
131        
132        // Double-check L1 cache after acquiring lock
133        // (Another concurrent request might have populated it while we were waiting)
134        if let Some(value) = self.l1_cache.get(key).await {
135            self.l1_hits.fetch_add(1, Ordering::Relaxed);
136            // cleanup_guard will auto-remove entry on drop
137            return Ok(Some(value));
138        }
139        
140        // Check L2 cache
141        if let Some(value) = self.l2_cache.get(key).await {
142            self.l2_hits.fetch_add(1, Ordering::Relaxed);
143            
144            // Promote to L1 for faster access next time
145            if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), Duration::from_secs(300)).await {
146                // L1 promotion failed, but we still have the data
147                eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
148            } else {
149                self.promotions.fetch_add(1, Ordering::Relaxed);
150                println!("⬆️ Promoted '{}' from L2 to L1 (via get)", key);
151            }
152            
153            // cleanup_guard will auto-remove entry on drop
154            return Ok(Some(value));
155        }
156        
157        // Both L1 and L2 miss
158        self.misses.fetch_add(1, Ordering::Relaxed);
159        
160        // cleanup_guard will auto-remove entry on drop
161        drop(cleanup_guard);
162        
163        Ok(None)
164    }
165    
166    /// Get value from cache with fallback computation (enhanced backward compatibility)
167    /// 
168    /// This is a convenience method that combines `get()` with optional computation.
169    /// If the value is not found in cache, it will execute the compute function
170    /// and cache the result automatically.
171    /// 
172    /// # Arguments
173    /// * `key` - Cache key
174    /// * `compute_fn` - Optional function to compute value if not in cache
175    /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
176    /// 
177    /// # Returns
178    /// * `Ok(Some(value))` - Value found in cache or computed successfully
179    /// * `Ok(None)` - Value not in cache and no compute function provided
180    /// * `Err(error)` - Cache operation or computation failed
181    /// 
182    /// # Example
183    /// ```rust
184    /// // Simple cache get (existing behavior)
185    /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
186    ///
187    /// // Get with computation fallback (new enhanced behavior)
188    /// let api_data = cache_manager.get_with_fallback(
189    ///     "api_response",
190    ///     Some(|| async { fetch_data_from_api().await }),
191    ///     Some(CacheStrategy::RealTime)
192    /// ).await?;
193    /// ```
194    
195    /// Set value with specific cache strategy (both L1 and L2)
196    pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
197        let ttl = strategy.to_duration();
198        
199        // Store in both L1 and L2
200        let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
201        let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
202        
203        // Return success if at least one cache succeeded
204        match (l1_result, l2_result) {
205            (Ok(_), Ok(_)) => {
206                // Both succeeded
207                println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
208                Ok(())
209            }
210            (Ok(_), Err(_)) => {
211                // L1 succeeded, L2 failed
212                eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
213                println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
214                Ok(())
215            }
216            (Err(_), Ok(_)) => {
217                // L1 failed, L2 succeeded
218                eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
219                println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
220                Ok(())
221            }
222            (Err(e1), Err(_e2)) => {
223                // Both failed
224                Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
225            }
226        }
227    }
228    
229    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
230    /// 
231    /// This method provides comprehensive Cache Stampede protection:
232    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
233    /// 2. Check L2 cache with mutex-based coalescing
234    /// 3. Compute fresh data with protection against concurrent computations
235    /// 
236    /// # Arguments
237    /// * `key` - Cache key
238    /// * `strategy` - Cache strategy for TTL and storage behavior
239    /// * `compute_fn` - Async function to compute the value if not in any cache
240    /// 
241    /// # Example
242    /// ```rust
243    /// let api_data = cache_manager.get_or_compute_with(
244    ///     "api_response",
245    ///     CacheStrategy::RealTime,
246    ///     || async {
247    ///         fetch_data_from_api().await
248    ///     }
249    /// ).await?;
250    /// ```
251    #[allow(dead_code)]
252    pub async fn get_or_compute_with<F, Fut>(
253        &self,
254        key: &str,
255        strategy: CacheStrategy,
256        compute_fn: F,
257    ) -> Result<serde_json::Value>
258    where
259        F: FnOnce() -> Fut + Send,
260        Fut: Future<Output = Result<serde_json::Value>> + Send,
261    {
262        self.total_requests.fetch_add(1, Ordering::Relaxed);
263        
264        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
265        if let Some(value) = self.l1_cache.get(key).await {
266            self.l1_hits.fetch_add(1, Ordering::Relaxed);
267            return Ok(value);
268        }
269        
270        // 2. L1 miss - try L2 with Cache Stampede protection
271        let key_owned = key.to_string();
272        let lock_guard = self.in_flight_requests
273            .entry(key_owned.clone())
274            .or_insert_with(|| Arc::new(Mutex::new(())))
275            .clone();
276        
277        let _guard = lock_guard.lock().await;
278        
279        // RAII cleanup guard - ensures entry is removed even on early return or panic
280        let _cleanup_guard = CleanupGuard {
281            map: &self.in_flight_requests,
282            key: key_owned,
283        };
284        
285        // 3. Double-check L1 cache after acquiring lock
286        // (Another request might have populated it while we were waiting)
287        if let Some(value) = self.l1_cache.get(key).await {
288            self.l1_hits.fetch_add(1, Ordering::Relaxed);
289            // _cleanup_guard will auto-remove entry on drop
290            return Ok(value);
291        }
292        
293        // 4. Check L2 cache
294        if let Some(value) = self.l2_cache.get(key).await {
295            self.l2_hits.fetch_add(1, Ordering::Relaxed);
296            
297            // Promote to L1 for future requests
298            let ttl = strategy.to_duration();
299            if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await {
300                eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
301            } else {
302                self.promotions.fetch_add(1, Ordering::Relaxed);
303                println!("⬆️ Promoted '{}' from L2 to L1", key);
304            }
305            
306            // _cleanup_guard will auto-remove entry on drop
307            return Ok(value);
308        }
309        
310        // 5. Both L1 and L2 miss - compute fresh data
311        println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
312        let fresh_data = compute_fn().await?;
313        
314        // 6. Store in both caches
315        if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
316            eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
317        }
318        
319        // 7. _cleanup_guard will auto-remove entry on drop
320        
321        Ok(fresh_data)
322    }
323    
324    /// Get comprehensive cache statistics
325    #[allow(dead_code)]
326    pub fn get_stats(&self) -> CacheManagerStats {
327        let total_reqs = self.total_requests.load(Ordering::Relaxed);
328        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
329        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
330        let misses = self.misses.load(Ordering::Relaxed);
331        
332        CacheManagerStats {
333            total_requests: total_reqs,
334            l1_hits,
335            l2_hits,
336            total_hits: l1_hits + l2_hits,
337            misses,
338            hit_rate: if total_reqs > 0 { 
339                ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0 
340            } else { 0.0 },
341            l1_hit_rate: if total_reqs > 0 { 
342                (l1_hits as f64 / total_reqs as f64) * 100.0 
343            } else { 0.0 },
344            promotions: self.promotions.load(Ordering::Relaxed),
345            in_flight_requests: self.in_flight_requests.len(),
346        }
347    }
348    
349    // ===== Redis Streams Methods =====
350    
351    /// Publish data to Redis Stream
352    ///
353    /// # Arguments
354    /// * `stream_key` - Name of the stream (e.g., "events_stream")
355    /// * `fields` - Field-value pairs to publish
356    /// * `maxlen` - Optional max length for stream trimming
357    ///
358    /// # Returns
359    /// The entry ID generated by Redis
360    pub async fn publish_to_stream(
361        &self,
362        stream_key: &str,
363        fields: Vec<(String, String)>,
364        maxlen: Option<usize>
365    ) -> Result<String> {
366        self.l2_cache.stream_add(stream_key, fields, maxlen).await
367    }
368    
369    /// Read latest entries from Redis Stream
370    /// 
371    /// # Arguments
372    /// * `stream_key` - Name of the stream
373    /// * `count` - Number of latest entries to retrieve
374    /// 
375    /// # Returns
376    /// Vector of (entry_id, fields) tuples (newest first)
377    pub async fn read_stream_latest(
378        &self,
379        stream_key: &str,
380        count: usize
381    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
382        self.l2_cache.stream_read_latest(stream_key, count).await
383    }
384    
385    /// Read from Redis Stream with optional blocking
386    /// 
387    /// # Arguments
388    /// * `stream_key` - Name of the stream
389    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
390    /// * `count` - Max entries to retrieve
391    /// * `block_ms` - Optional blocking timeout in ms
392    /// 
393    /// # Returns
394    /// Vector of (entry_id, fields) tuples
395    pub async fn read_stream(
396        &self,
397        stream_key: &str,
398        last_id: &str,
399        count: usize,
400        block_ms: Option<usize>
401    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
402        self.l2_cache.stream_read(stream_key, last_id, count, block_ms).await
403    }
404}
405
406/// Cache Manager statistics
407#[allow(dead_code)]
408#[derive(Debug, Clone)]
409pub struct CacheManagerStats {
410    pub total_requests: u64,
411    pub l1_hits: u64,
412    pub l2_hits: u64,
413    pub total_hits: u64,
414    pub misses: u64,
415    pub hit_rate: f64,
416    pub l1_hit_rate: f64,
417    pub promotions: usize,
418    pub in_flight_requests: usize,
419}