multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use super::l1_cache::L1Cache;
15use super::l2_cache::L2Cache;
16use super::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
17use super::invalidation::{
18    InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
19    InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
20};
21
22/// RAII cleanup guard for in-flight request tracking
23/// Ensures that entries are removed from DashMap even on early return or panic
24struct CleanupGuard<'a> {
25    map: &'a DashMap<String, Arc<Mutex<()>>>,
26    key: String,
27}
28
29impl<'a> Drop for CleanupGuard<'a> {
30    fn drop(&mut self) {
31        self.map.remove(&self.key);
32    }
33}
34
35/// Cache strategies for different data types
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub enum CacheStrategy {
39    /// Real-time data - 10 seconds TTL
40    RealTime,
41    /// Short-term data - 5 minutes TTL  
42    ShortTerm,
43    /// Medium-term data - 1 hour TTL
44    MediumTerm,
45    /// Long-term data - 3 hours TTL
46    LongTerm,
47    /// Custom TTL
48    Custom(Duration),
49    /// Default strategy (5 minutes)
50    Default,
51}
52
53impl CacheStrategy {
54    /// Convert strategy to duration
55    pub fn to_duration(&self) -> Duration {
56        match self {
57            Self::RealTime => Duration::from_secs(10),
58            Self::ShortTerm => Duration::from_secs(300), // 5 minutes
59            Self::MediumTerm => Duration::from_secs(3600), // 1 hour
60            Self::LongTerm => Duration::from_secs(10800), // 3 hours
61            Self::Custom(duration) => *duration,
62            Self::Default => Duration::from_secs(300),
63        }
64    }
65}
66
67/// Cache Manager - Unified operations across L1 and L2
68pub struct CacheManager {
69    /// L1 Cache (trait object for pluggable backends)
70    l1_cache: Arc<dyn CacheBackend>,
71    /// L2 Cache (trait object for pluggable backends)
72    l2_cache: Arc<dyn L2CacheBackend>,
73    /// L2 Cache concrete instance (for invalidation scan_keys)
74    l2_cache_concrete: Option<Arc<L2Cache>>,
75    /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
76    streaming_backend: Option<Arc<dyn StreamingBackend>>,
77    /// Statistics
78    total_requests: Arc<AtomicU64>,
79    l1_hits: Arc<AtomicU64>,
80    l2_hits: Arc<AtomicU64>,
81    misses: Arc<AtomicU64>,
82    promotions: Arc<AtomicUsize>,
83    /// In-flight requests to prevent Cache Stampede on L2/compute operations
84    in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
85    /// Invalidation publisher (for broadcasting invalidation messages)
86    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
87    /// Invalidation subscriber (for receiving invalidation messages)
88    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
89    /// Invalidation statistics
90    invalidation_stats: Arc<AtomicInvalidationStats>,
91}
92
93impl CacheManager {
94    /// Create new cache manager with trait objects (pluggable backends)
95    ///
96    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
97    ///
98    /// # Arguments
99    ///
100    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
101    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
102    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
103    ///
104    /// # Example
105    ///
106    /// ```rust,ignore
107    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
108    /// use std::sync::Arc;
109    ///
110    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
111    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
112    ///
113    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
114    /// ```
115    pub async fn new_with_backends(
116        l1_cache: Arc<dyn CacheBackend>,
117        l2_cache: Arc<dyn L2CacheBackend>,
118        streaming_backend: Option<Arc<dyn StreamingBackend>>,
119    ) -> Result<Self> {
120        println!("  ðŸŽŊ Initializing Cache Manager with custom backends...");
121        println!("    L1: {}", l1_cache.name());
122        println!("    L2: {}", l2_cache.name());
123        if streaming_backend.is_some() {
124            println!("    Streaming: enabled");
125        } else {
126            println!("    Streaming: disabled");
127        }
128
129        Ok(Self {
130            l1_cache,
131            l2_cache,
132            l2_cache_concrete: None,
133            streaming_backend,
134            total_requests: Arc::new(AtomicU64::new(0)),
135            l1_hits: Arc::new(AtomicU64::new(0)),
136            l2_hits: Arc::new(AtomicU64::new(0)),
137            misses: Arc::new(AtomicU64::new(0)),
138            promotions: Arc::new(AtomicUsize::new(0)),
139            in_flight_requests: Arc::new(DashMap::new()),
140            invalidation_publisher: None,
141            invalidation_subscriber: None,
142            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
143        })
144    }
145
146    /// Create new cache manager with default backends (backward compatible)
147    ///
148    /// This is the legacy constructor maintained for backward compatibility.
149    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
150    ///
151    /// # Arguments
152    ///
153    /// * `l1_cache` - Moka L1 cache instance
154    /// * `l2_cache` - Redis L2 cache instance
155    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
156        println!("  ðŸŽŊ Initializing Cache Manager...");
157
158        // Convert concrete types to trait objects
159        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
160        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
161        // L2Cache also implements StreamingBackend, so use it for streaming
162        let streaming_backend: Arc<dyn StreamingBackend> = l2_cache;
163
164        Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
165    }
166
167    /// Create new cache manager with invalidation support
168    ///
169    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
170    ///
171    /// # Arguments
172    ///
173    /// * `l1_cache` - Moka L1 cache instance
174    /// * `l2_cache` - Redis L2 cache instance
175    /// * `redis_url` - Redis connection URL for Pub/Sub
176    /// * `config` - Invalidation configuration
177    ///
178    /// # Example
179    ///
180    /// ```rust,ignore
181    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
182    ///
183    /// let config = InvalidationConfig {
184    ///     channel: "my_app:cache:invalidate".to_string(),
185    ///     ..Default::default()
186    /// };
187    ///
188    /// let manager = CacheManager::new_with_invalidation(
189    ///     l1, l2, "redis://localhost", config
190    /// ).await?;
191    /// ```
192    pub async fn new_with_invalidation(
193        l1_cache: Arc<L1Cache>,
194        l2_cache: Arc<L2Cache>,
195        redis_url: &str,
196        config: InvalidationConfig,
197    ) -> Result<Self> {
198        println!("  ðŸŽŊ Initializing Cache Manager with Invalidation...");
199        println!("    Pub/Sub channel: {}", config.channel);
200
201        // Convert concrete types to trait objects
202        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
203        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
204        let streaming_backend: Arc<dyn StreamingBackend> = l2_cache.clone();
205
206        // Create publisher
207        let client = redis::Client::open(redis_url)?;
208        let conn_manager = redis::aio::ConnectionManager::new(client).await?;
209        let publisher = InvalidationPublisher::new(conn_manager, config.clone());
210
211        // Create subscriber
212        let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
213        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
214
215        let manager = Self {
216            l1_cache: l1_backend,
217            l2_cache: l2_backend,
218            l2_cache_concrete: Some(l2_cache),
219            streaming_backend: Some(streaming_backend),
220            total_requests: Arc::new(AtomicU64::new(0)),
221            l1_hits: Arc::new(AtomicU64::new(0)),
222            l2_hits: Arc::new(AtomicU64::new(0)),
223            misses: Arc::new(AtomicU64::new(0)),
224            promotions: Arc::new(AtomicUsize::new(0)),
225            in_flight_requests: Arc::new(DashMap::new()),
226            invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
227            invalidation_subscriber: Some(Arc::new(subscriber)),
228            invalidation_stats,
229        };
230
231        // Start subscriber with handler
232        manager.start_invalidation_subscriber();
233
234        println!("  ✅ Cache Manager initialized with invalidation support");
235
236        Ok(manager)
237    }
238
239    /// Start the invalidation subscriber background task
240    fn start_invalidation_subscriber(&self) {
241        if let Some(subscriber) = &self.invalidation_subscriber {
242            let l1_cache = Arc::clone(&self.l1_cache);
243            let l2_cache_concrete = self.l2_cache_concrete.clone();
244
245            subscriber.start(move |msg| {
246                let l1 = Arc::clone(&l1_cache);
247                let _l2 = l2_cache_concrete.clone();
248
249                async move {
250                    match msg {
251                        InvalidationMessage::Remove { key } => {
252                            // Remove from L1
253                            l1.remove(&key).await?;
254                            println!("🗑ïļ  [Invalidation] Removed '{}' from L1", key);
255                        }
256                        InvalidationMessage::Update { key, value, ttl_secs } => {
257                            // Update L1 with new value
258                            let ttl = ttl_secs
259                                .map(Duration::from_secs)
260                                .unwrap_or_else(|| Duration::from_secs(300));
261                            l1.set_with_ttl(&key, value, ttl).await?;
262                            println!("🔄 [Invalidation] Updated '{}' in L1", key);
263                        }
264                        InvalidationMessage::RemovePattern { pattern } => {
265                            // For pattern-based invalidation, we can't easily iterate L1 cache
266                            // So we just log it. The pattern invalidation is mainly for L2.
267                            // L1 entries will naturally expire via TTL.
268                            println!("🔍 [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
269                        }
270                        InvalidationMessage::RemoveBulk { keys } => {
271                            // Remove multiple keys from L1
272                            for key in keys {
273                                if let Err(e) = l1.remove(&key).await {
274                                    eprintln!("⚠ïļ  Failed to remove '{}' from L1: {}", key, e);
275                                }
276                            }
277                            println!("🗑ïļ  [Invalidation] Bulk removed keys from L1");
278                        }
279                    }
280                    Ok(())
281                }
282            });
283
284            println!("ðŸ“Ą Invalidation subscriber started");
285        }
286    }
287
288    /// Get value from cache (L1 first, then L2 fallback with promotion)
289    /// 
290    /// This method now includes built-in Cache Stampede protection when cache misses occur.
291    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
292    /// unnecessary duplicate work on external data sources.
293    /// 
294    /// # Arguments
295    /// * `key` - Cache key to retrieve
296    /// 
297    /// # Returns
298    /// * `Ok(Some(value))` - Cache hit, value found in L1 or L2
299    /// * `Ok(None)` - Cache miss, value not found in either cache
300    /// * `Err(error)` - Cache operation failed
301    pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
302        self.total_requests.fetch_add(1, Ordering::Relaxed);
303        
304        // Fast path: Try L1 first (no locking needed)
305        if let Some(value) = self.l1_cache.get(key).await {
306            self.l1_hits.fetch_add(1, Ordering::Relaxed);
307            return Ok(Some(value));
308        }
309        
310        // L1 miss - implement Cache Stampede protection for L2 lookup
311        let key_owned = key.to_string();
312        let lock_guard = self.in_flight_requests
313            .entry(key_owned.clone())
314            .or_insert_with(|| Arc::new(Mutex::new(())))
315            .clone();
316        
317        let _guard = lock_guard.lock().await;
318        
319        // RAII cleanup guard - ensures entry is removed even on early return or panic
320        let cleanup_guard = CleanupGuard {
321            map: &self.in_flight_requests,
322            key: key_owned.clone(),
323        };
324        
325        // Double-check L1 cache after acquiring lock
326        // (Another concurrent request might have populated it while we were waiting)
327        if let Some(value) = self.l1_cache.get(key).await {
328            self.l1_hits.fetch_add(1, Ordering::Relaxed);
329            // cleanup_guard will auto-remove entry on drop
330            return Ok(Some(value));
331        }
332        
333        // Check L2 cache with TTL information
334        if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
335            self.l2_hits.fetch_add(1, Ordering::Relaxed);
336
337            // Promote to L1 with same TTL as Redis (or default if no TTL)
338            let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
339
340            if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
341                // L1 promotion failed, but we still have the data
342                eprintln!("⚠ïļ Failed to promote key '{}' to L1 cache", key);
343            } else {
344                self.promotions.fetch_add(1, Ordering::Relaxed);
345                println!("⮆ïļ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
346            }
347
348            // cleanup_guard will auto-remove entry on drop
349            return Ok(Some(value));
350        }
351        
352        // Both L1 and L2 miss
353        self.misses.fetch_add(1, Ordering::Relaxed);
354        
355        // cleanup_guard will auto-remove entry on drop
356        drop(cleanup_guard);
357        
358        Ok(None)
359    }
360    
361    /// Get value from cache with fallback computation (enhanced backward compatibility)
362    /// 
363    /// This is a convenience method that combines `get()` with optional computation.
364    /// If the value is not found in cache, it will execute the compute function
365    /// and cache the result automatically.
366    /// 
367    /// # Arguments
368    /// * `key` - Cache key
369    /// * `compute_fn` - Optional function to compute value if not in cache
370    /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
371    /// 
372    /// # Returns
373    /// * `Ok(Some(value))` - Value found in cache or computed successfully
374    /// * `Ok(None)` - Value not in cache and no compute function provided
375    /// * `Err(error)` - Cache operation or computation failed
376    /// 
377    /// # Example
378    /// ```ignore
379    /// // Simple cache get (existing behavior)
380    /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
381    ///
382    /// // Get with computation fallback (new enhanced behavior)
383    /// let api_data = cache_manager.get_with_fallback(
384    ///     "api_response",
385    ///     Some(|| async { fetch_data_from_api().await }),
386    ///     Some(CacheStrategy::RealTime)
387    /// ).await?;
388    /// ```
389    
390    /// Set value with specific cache strategy (both L1 and L2)
391    pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
392        let ttl = strategy.to_duration();
393        
394        // Store in both L1 and L2
395        let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
396        let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
397        
398        // Return success if at least one cache succeeded
399        match (l1_result, l2_result) {
400            (Ok(_), Ok(_)) => {
401                // Both succeeded
402                println!("ðŸ’ū [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
403                Ok(())
404            }
405            (Ok(_), Err(_)) => {
406                // L1 succeeded, L2 failed
407                eprintln!("⚠ïļ L2 cache set failed for key '{}', continuing with L1", key);
408                println!("ðŸ’ū [L1] Cached '{}' with TTL {:?}", key, ttl);
409                Ok(())
410            }
411            (Err(_), Ok(_)) => {
412                // L1 failed, L2 succeeded
413                eprintln!("⚠ïļ L1 cache set failed for key '{}', continuing with L2", key);
414                println!("ðŸ’ū [L2] Cached '{}' with TTL {:?}", key, ttl);
415                Ok(())
416            }
417            (Err(e1), Err(_e2)) => {
418                // Both failed
419                Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
420            }
421        }
422    }
423    
424    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
425    /// 
426    /// This method provides comprehensive Cache Stampede protection:
427    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
428    /// 2. Check L2 cache with mutex-based coalescing
429    /// 3. Compute fresh data with protection against concurrent computations
430    /// 
431    /// # Arguments
432    /// * `key` - Cache key
433    /// * `strategy` - Cache strategy for TTL and storage behavior
434    /// * `compute_fn` - Async function to compute the value if not in any cache
435    /// 
436    /// # Example
437    /// ```ignore
438    /// let api_data = cache_manager.get_or_compute_with(
439    ///     "api_response",
440    ///     CacheStrategy::RealTime,
441    ///     || async {
442    ///         fetch_data_from_api().await
443    ///     }
444    /// ).await?;
445    /// ```
446    #[allow(dead_code)]
447    pub async fn get_or_compute_with<F, Fut>(
448        &self,
449        key: &str,
450        strategy: CacheStrategy,
451        compute_fn: F,
452    ) -> Result<serde_json::Value>
453    where
454        F: FnOnce() -> Fut + Send,
455        Fut: Future<Output = Result<serde_json::Value>> + Send,
456    {
457        self.total_requests.fetch_add(1, Ordering::Relaxed);
458        
459        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
460        if let Some(value) = self.l1_cache.get(key).await {
461            self.l1_hits.fetch_add(1, Ordering::Relaxed);
462            return Ok(value);
463        }
464        
465        // 2. L1 miss - try L2 with Cache Stampede protection
466        let key_owned = key.to_string();
467        let lock_guard = self.in_flight_requests
468            .entry(key_owned.clone())
469            .or_insert_with(|| Arc::new(Mutex::new(())))
470            .clone();
471        
472        let _guard = lock_guard.lock().await;
473        
474        // RAII cleanup guard - ensures entry is removed even on early return or panic
475        let _cleanup_guard = CleanupGuard {
476            map: &self.in_flight_requests,
477            key: key_owned,
478        };
479        
480        // 3. Double-check L1 cache after acquiring lock
481        // (Another request might have populated it while we were waiting)
482        if let Some(value) = self.l1_cache.get(key).await {
483            self.l1_hits.fetch_add(1, Ordering::Relaxed);
484            // _cleanup_guard will auto-remove entry on drop
485            return Ok(value);
486        }
487        
488        // 4. Check L2 cache with TTL
489        if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
490            self.l2_hits.fetch_add(1, Ordering::Relaxed);
491
492            // Promote to L1 using Redis TTL (or strategy TTL as fallback)
493            let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
494
495            if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
496                eprintln!("⚠ïļ Failed to promote key '{}' to L1: {}", key, e);
497            } else {
498                self.promotions.fetch_add(1, Ordering::Relaxed);
499                println!("⮆ïļ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
500            }
501
502            // _cleanup_guard will auto-remove entry on drop
503            return Ok(value);
504        }
505        
506        // 5. Both L1 and L2 miss - compute fresh data
507        println!("ðŸ’ŧ Computing fresh data for key: '{}' (Cache Stampede protected)", key);
508        let fresh_data = compute_fn().await?;
509        
510        // 6. Store in both caches
511        if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
512            eprintln!("⚠ïļ Failed to cache computed data for key '{}': {}", key, e);
513        }
514        
515        // 7. _cleanup_guard will auto-remove entry on drop
516
517        Ok(fresh_data)
518    }
519
520    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
521    ///
522    /// This method provides the same functionality as `get_or_compute_with()` but with
523    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
524    /// API calls, or any computation that returns structured data.
525    ///
526    /// # Type Safety
527    ///
528    /// - Returns your actual type `T` instead of `serde_json::Value`
529    /// - Compiler enforces Serialize + DeserializeOwned bounds
530    /// - No manual JSON conversion needed
531    ///
532    /// # Cache Flow
533    ///
534    /// 1. Check L1 cache → deserialize if found
535    /// 2. Check L2 cache → deserialize + promote to L1 if found
536    /// 3. Execute compute_fn → serialize → store in L1+L2
537    /// 4. Full stampede protection (only ONE request computes)
538    ///
539    /// # Arguments
540    ///
541    /// * `key` - Cache key
542    /// * `strategy` - Cache strategy for TTL
543    /// * `compute_fn` - Async function returning `Result<T>`
544    ///
545    /// # Example - Database Query
546    ///
547    /// ```no_run
548    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
549    /// # use std::sync::Arc;
550    /// # use serde::{Serialize, Deserialize};
551    /// # async fn example() -> anyhow::Result<()> {
552    /// # let l1 = Arc::new(L1Cache::new().await?);
553    /// # let l2 = Arc::new(L2Cache::new().await?);
554    /// # let cache_manager = CacheManager::new(l1, l2);
555    ///
556    /// #[derive(Serialize, Deserialize)]
557    /// struct User {
558    ///     id: i64,
559    ///     name: String,
560    /// }
561    ///
562    /// // Type-safe database caching (example - requires sqlx)
563    /// // let user: User = cache_manager.get_or_compute_typed(
564    /// //     "user:123",
565    /// //     CacheStrategy::MediumTerm,
566    /// //     || async {
567    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
568    /// //             .bind(123)
569    /// //             .fetch_one(&pool)
570    /// //             .await
571    /// //     }
572    /// // ).await?;
573    /// # Ok(())
574    /// # }
575    /// ```
576    ///
577    /// # Example - API Call
578    ///
579    /// ```no_run
580    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
581    /// # use std::sync::Arc;
582    /// # use serde::{Serialize, Deserialize};
583    /// # async fn example() -> anyhow::Result<()> {
584    /// # let l1 = Arc::new(L1Cache::new().await?);
585    /// # let l2 = Arc::new(L2Cache::new().await?);
586    /// # let cache_manager = CacheManager::new(l1, l2);
587    /// #[derive(Serialize, Deserialize)]
588    /// struct ApiResponse {
589    ///     data: String,
590    ///     timestamp: i64,
591    /// }
592    ///
593    /// // API call caching (example - requires reqwest)
594    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
595    /// //     "api:endpoint",
596    /// //     CacheStrategy::RealTime,
597    /// //     || async {
598    /// //         reqwest::get("https://api.example.com/data")
599    /// //             .await?
600    /// //             .json::<ApiResponse>()
601    /// //             .await
602    /// //     }
603    /// // ).await?;
604    /// # Ok(())
605    /// # }
606    /// ```
607    ///
608    /// # Performance
609    ///
610    /// - L1 hit: <1ms + deserialization (~10-50Ξs for small structs)
611    /// - L2 hit: 2-5ms + deserialization + L1 promotion
612    /// - Compute: Your function time + serialization + L1+L2 storage
613    /// - Stampede protection: 99.6% latency reduction under high concurrency
614    ///
615    /// # Errors
616    ///
617    /// Returns error if:
618    /// - Compute function fails
619    /// - Serialization fails (invalid type for JSON)
620    /// - Deserialization fails (cache data doesn't match type T)
621    /// - Cache operations fail (Redis connection issues)
622    pub async fn get_or_compute_typed<T, F, Fut>(
623        &self,
624        key: &str,
625        strategy: CacheStrategy,
626        compute_fn: F,
627    ) -> Result<T>
628    where
629        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
630        F: FnOnce() -> Fut + Send,
631        Fut: Future<Output = Result<T>> + Send,
632    {
633        self.total_requests.fetch_add(1, Ordering::Relaxed);
634
635        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
636        if let Some(cached_json) = self.l1_cache.get(key).await {
637            self.l1_hits.fetch_add(1, Ordering::Relaxed);
638
639            // Attempt to deserialize from JSON to type T
640            match serde_json::from_value::<T>(cached_json) {
641                Ok(typed_value) => {
642                    println!("✅ [L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
643                    return Ok(typed_value);
644                }
645                Err(e) => {
646                    // Deserialization failed - cache data may be stale or corrupt
647                    eprintln!("⚠ïļ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
648                    // Fall through to recompute
649                }
650            }
651        }
652
653        // 2. L1 miss - try L2 with Cache Stampede protection
654        let key_owned = key.to_string();
655        let lock_guard = self.in_flight_requests
656            .entry(key_owned.clone())
657            .or_insert_with(|| Arc::new(Mutex::new(())))
658            .clone();
659
660        let _guard = lock_guard.lock().await;
661
662        // RAII cleanup guard - ensures entry is removed even on early return or panic
663        let _cleanup_guard = CleanupGuard {
664            map: &self.in_flight_requests,
665            key: key_owned,
666        };
667
668        // 3. Double-check L1 cache after acquiring lock
669        // (Another request might have populated it while we were waiting)
670        if let Some(cached_json) = self.l1_cache.get(key).await {
671            self.l1_hits.fetch_add(1, Ordering::Relaxed);
672            if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
673                println!("✅ [L1 HIT] Deserialized '{}' after lock acquisition", key);
674                return Ok(typed_value);
675            }
676        }
677
678        // 4. Check L2 cache with TTL
679        if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
680            self.l2_hits.fetch_add(1, Ordering::Relaxed);
681
682            // Attempt to deserialize
683            match serde_json::from_value::<T>(cached_json.clone()) {
684                Ok(typed_value) => {
685                    println!("✅ [L2 HIT] Deserialized '{}' from Redis", key);
686
687                    // Promote to L1 using Redis TTL (or strategy TTL as fallback)
688                    let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
689
690                    if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
691                        eprintln!("⚠ïļ Failed to promote key '{}' to L1: {}", key, e);
692                    } else {
693                        self.promotions.fetch_add(1, Ordering::Relaxed);
694                        println!("⮆ïļ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
695                    }
696
697                    return Ok(typed_value);
698                }
699                Err(e) => {
700                    eprintln!("⚠ïļ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
701                    // Fall through to recompute
702                }
703            }
704        }
705
706        // 5. Both L1 and L2 miss (or deserialization failed) - compute fresh data
707        println!("ðŸ’ŧ Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
708        let typed_value = compute_fn().await?;
709
710        // 6. Serialize to JSON for storage
711        let json_value = serde_json::to_value(&typed_value)
712            .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
713
714        // 7. Store in both L1 and L2 caches
715        if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
716            eprintln!("⚠ïļ Failed to cache computed typed data for key '{}': {}", key, e);
717        } else {
718            println!("ðŸ’ū Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
719        }
720
721        // 8. _cleanup_guard will auto-remove entry on drop
722
723        Ok(typed_value)
724    }
725
726    /// Get comprehensive cache statistics
727    #[allow(dead_code)]
728    pub fn get_stats(&self) -> CacheManagerStats {
729        let total_reqs = self.total_requests.load(Ordering::Relaxed);
730        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
731        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
732        let misses = self.misses.load(Ordering::Relaxed);
733        
734        CacheManagerStats {
735            total_requests: total_reqs,
736            l1_hits,
737            l2_hits,
738            total_hits: l1_hits + l2_hits,
739            misses,
740            hit_rate: if total_reqs > 0 { 
741                ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0 
742            } else { 0.0 },
743            l1_hit_rate: if total_reqs > 0 { 
744                (l1_hits as f64 / total_reqs as f64) * 100.0 
745            } else { 0.0 },
746            promotions: self.promotions.load(Ordering::Relaxed),
747            in_flight_requests: self.in_flight_requests.len(),
748        }
749    }
750    
751    // ===== Redis Streams Methods =====
752    
753    /// Publish data to Redis Stream
754    ///
755    /// # Arguments
756    /// * `stream_key` - Name of the stream (e.g., "events_stream")
757    /// * `fields` - Field-value pairs to publish
758    /// * `maxlen` - Optional max length for stream trimming
759    ///
760    /// # Returns
761    /// The entry ID generated by Redis
762    ///
763    /// # Errors
764    /// Returns error if streaming backend is not configured
765    pub async fn publish_to_stream(
766        &self,
767        stream_key: &str,
768        fields: Vec<(String, String)>,
769        maxlen: Option<usize>
770    ) -> Result<String> {
771        match &self.streaming_backend {
772            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
773            None => Err(anyhow::anyhow!("Streaming backend not configured"))
774        }
775    }
776    
777    /// Read latest entries from Redis Stream
778    ///
779    /// # Arguments
780    /// * `stream_key` - Name of the stream
781    /// * `count` - Number of latest entries to retrieve
782    ///
783    /// # Returns
784    /// Vector of (entry_id, fields) tuples (newest first)
785    ///
786    /// # Errors
787    /// Returns error if streaming backend is not configured
788    pub async fn read_stream_latest(
789        &self,
790        stream_key: &str,
791        count: usize
792    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
793        match &self.streaming_backend {
794            Some(backend) => backend.stream_read_latest(stream_key, count).await,
795            None => Err(anyhow::anyhow!("Streaming backend not configured"))
796        }
797    }
798    
799    /// Read from Redis Stream with optional blocking
800    ///
801    /// # Arguments
802    /// * `stream_key` - Name of the stream
803    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
804    /// * `count` - Max entries to retrieve
805    /// * `block_ms` - Optional blocking timeout in ms
806    ///
807    /// # Returns
808    /// Vector of (entry_id, fields) tuples
809    ///
810    /// # Errors
811    /// Returns error if streaming backend is not configured
812    pub async fn read_stream(
813        &self,
814        stream_key: &str,
815        last_id: &str,
816        count: usize,
817        block_ms: Option<usize>
818    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
819        match &self.streaming_backend {
820            Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
821            None => Err(anyhow::anyhow!("Streaming backend not configured"))
822        }
823    }
824
825    // ===== Cache Invalidation Methods =====
826
827    /// Invalidate a cache key across all instances
828    ///
829    /// This removes the key from both L1 and L2 caches and broadcasts
830    /// the invalidation to all other cache instances via Redis Pub/Sub.
831    ///
832    /// # Arguments
833    /// * `key` - Cache key to invalidate
834    ///
835    /// # Example
836    /// ```rust,ignore
837    /// // Invalidate user cache after profile update
838    /// cache_manager.invalidate("user:123").await?;
839    /// ```
840    pub async fn invalidate(&self, key: &str) -> Result<()> {
841        // Remove from local L1 cache
842        self.l1_cache.remove(key).await?;
843
844        // Remove from L2 cache
845        self.l2_cache.remove(key).await?;
846
847        // Broadcast to other instances
848        if let Some(publisher) = &self.invalidation_publisher {
849            let mut pub_lock = publisher.lock().await;
850            let msg = InvalidationMessage::remove(key);
851            pub_lock.publish(&msg).await?;
852            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
853        }
854
855        println!("🗑ïļ  Invalidated '{}' across all instances", key);
856        Ok(())
857    }
858
859    /// Update cache value across all instances
860    ///
861    /// This updates the key in both L1 and L2 caches and broadcasts
862    /// the update to all other cache instances, avoiding cache misses.
863    ///
864    /// # Arguments
865    /// * `key` - Cache key to update
866    /// * `value` - New value
867    /// * `ttl` - Optional TTL (uses default if None)
868    ///
869    /// # Example
870    /// ```rust,ignore
871    /// // Update user cache with new data
872    /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
873    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
874    /// ```
875    pub async fn update_cache(
876        &self,
877        key: &str,
878        value: serde_json::Value,
879        ttl: Option<Duration>,
880    ) -> Result<()> {
881        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
882
883        // Update local L1 cache
884        self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
885
886        // Update L2 cache
887        self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
888
889        // Broadcast update to other instances
890        if let Some(publisher) = &self.invalidation_publisher {
891            let mut pub_lock = publisher.lock().await;
892            let msg = InvalidationMessage::update(key, value, Some(ttl));
893            pub_lock.publish(&msg).await?;
894            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
895        }
896
897        println!("🔄 Updated '{}' across all instances", key);
898        Ok(())
899    }
900
901    /// Invalidate all keys matching a pattern
902    ///
903    /// This scans L2 cache for keys matching the pattern, removes them,
904    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
905    ///
906    /// # Arguments
907    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
908    ///
909    /// # Example
910    /// ```rust,ignore
911    /// // Invalidate all user caches
912    /// cache_manager.invalidate_pattern("user:*").await?;
913    ///
914    /// // Invalidate specific user's related caches
915    /// cache_manager.invalidate_pattern("user:123:*").await?;
916    /// ```
917    pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
918        // Scan L2 for matching keys
919        let keys = if let Some(l2) = &self.l2_cache_concrete {
920            l2.scan_keys(pattern).await?
921        } else {
922            return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
923        };
924
925        if keys.is_empty() {
926            println!("🔍 No keys found matching pattern '{}'", pattern);
927            return Ok(());
928        }
929
930        // Remove from L2 in bulk
931        if let Some(l2) = &self.l2_cache_concrete {
932            l2.remove_bulk(&keys).await?;
933        }
934
935        // Broadcast pattern invalidation
936        if let Some(publisher) = &self.invalidation_publisher {
937            let mut pub_lock = publisher.lock().await;
938            let msg = InvalidationMessage::remove_bulk(keys.clone());
939            pub_lock.publish(&msg).await?;
940            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
941        }
942
943        println!("🔍 Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
944        Ok(())
945    }
946
947    /// Set value with automatic broadcast to all instances
948    ///
949    /// This is a write-through operation that updates the cache and
950    /// broadcasts the update to all other instances automatically.
951    ///
952    /// # Arguments
953    /// * `key` - Cache key
954    /// * `value` - Value to cache
955    /// * `strategy` - Cache strategy (determines TTL)
956    ///
957    /// # Example
958    /// ```rust,ignore
959    /// // Update and broadcast in one call
960    /// let data = serde_json::json!({"status": "active"});
961    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
962    /// ```
963    pub async fn set_with_broadcast(
964        &self,
965        key: &str,
966        value: serde_json::Value,
967        strategy: CacheStrategy,
968    ) -> Result<()> {
969        let ttl = strategy.to_duration();
970
971        // Set in local caches
972        self.set_with_strategy(key, value.clone(), strategy).await?;
973
974        // Broadcast update if invalidation is enabled
975        if let Some(publisher) = &self.invalidation_publisher {
976            let mut pub_lock = publisher.lock().await;
977            let msg = InvalidationMessage::update(key, value, Some(ttl));
978            pub_lock.publish(&msg).await?;
979            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
980        }
981
982        Ok(())
983    }
984
985    /// Get invalidation statistics
986    ///
987    /// Returns statistics about invalidation operations if invalidation is enabled.
988    pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
989        if self.invalidation_subscriber.is_some() {
990            Some(self.invalidation_stats.snapshot())
991        } else {
992            None
993        }
994    }
995}
996
997/// Cache Manager statistics
998#[allow(dead_code)]
999#[derive(Debug, Clone)]
1000pub struct CacheManagerStats {
1001    pub total_requests: u64,
1002    pub l1_hits: u64,
1003    pub l2_hits: u64,
1004    pub total_hits: u64,
1005    pub misses: u64,
1006    pub hit_rate: f64,
1007    pub l1_hit_rate: f64,
1008    pub promotions: usize,
1009    pub in_flight_requests: usize,
1010}