multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use crate::backends::{L1Cache, L2Cache};
15use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
16use super::invalidation::{
17    InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
18    InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
19};
20
21/// RAII cleanup guard for in-flight request tracking
22/// Ensures that entries are removed from DashMap even on early return or panic
23struct CleanupGuard<'a> {
24    map: &'a DashMap<String, Arc<Mutex<()>>>,
25    key: String,
26}
27
28impl<'a> Drop for CleanupGuard<'a> {
29    fn drop(&mut self) {
30        self.map.remove(&self.key);
31    }
32}
33
34/// Cache strategies for different data types
35#[derive(Debug, Clone)]
36#[allow(dead_code)]
37pub enum CacheStrategy {
38    /// Real-time data - 10 seconds TTL
39    RealTime,
40    /// Short-term data - 5 minutes TTL  
41    ShortTerm,
42    /// Medium-term data - 1 hour TTL
43    MediumTerm,
44    /// Long-term data - 3 hours TTL
45    LongTerm,
46    /// Custom TTL
47    Custom(Duration),
48    /// Default strategy (5 minutes)
49    Default,
50}
51
52impl CacheStrategy {
53    /// Convert strategy to duration
54    pub fn to_duration(&self) -> Duration {
55        match self {
56            Self::RealTime => Duration::from_secs(10),
57            Self::ShortTerm => Duration::from_secs(300), // 5 minutes
58            Self::MediumTerm => Duration::from_secs(3600), // 1 hour
59            Self::LongTerm => Duration::from_secs(10800), // 3 hours
60            Self::Custom(duration) => *duration,
61            Self::Default => Duration::from_secs(300),
62        }
63    }
64}
65
66/// Statistics for a single cache tier
67#[derive(Debug, Clone)]
68pub struct TierStats {
69    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
70    pub tier_level: usize,
71    /// Number of cache hits at this tier
72    pub hits: Arc<AtomicU64>,
73    /// Backend name for identification
74    pub backend_name: String,
75}
76
77impl TierStats {
78    fn new(tier_level: usize, backend_name: String) -> Self {
79        Self {
80            tier_level,
81            hits: Arc::new(AtomicU64::new(0)),
82            backend_name,
83        }
84    }
85
86    /// Get current hit count
87    pub fn hit_count(&self) -> u64 {
88        self.hits.load(Ordering::Relaxed)
89    }
90}
91
92/// A single cache tier in the multi-tier architecture
93pub struct CacheTier {
94    /// Cache backend for this tier
95    backend: Arc<dyn L2CacheBackend>,
96    /// Tier level (1 = hottest/fastest, higher = colder/slower)
97    tier_level: usize,
98    /// Enable automatic promotion to upper tiers on cache hit
99    promotion_enabled: bool,
100    /// TTL scale factor (multiplier for TTL when storing/promoting)
101    ttl_scale: f64,
102    /// Statistics for this tier
103    stats: TierStats,
104}
105
106impl CacheTier {
107    /// Create a new cache tier
108    pub fn new(
109        backend: Arc<dyn L2CacheBackend>,
110        tier_level: usize,
111        promotion_enabled: bool,
112        ttl_scale: f64,
113    ) -> Self {
114        let backend_name = backend.name().to_string();
115        Self {
116            backend,
117            tier_level,
118            promotion_enabled,
119            ttl_scale,
120            stats: TierStats::new(tier_level, backend_name),
121        }
122    }
123
124    /// Get value with TTL from this tier
125    async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
126        self.backend.get_with_ttl(key).await
127    }
128
129    /// Set value with TTL in this tier
130    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
131        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
132        self.backend.set_with_ttl(key, value, scaled_ttl).await
133    }
134
135    /// Remove value from this tier
136    async fn remove(&self, key: &str) -> Result<()> {
137        self.backend.remove(key).await
138    }
139
140    /// Record a cache hit for this tier
141    fn record_hit(&self) {
142        self.stats.hits.fetch_add(1, Ordering::Relaxed);
143    }
144}
145
146/// Configuration for a cache tier (used in builder pattern)
147#[derive(Debug, Clone)]
148pub struct TierConfig {
149    /// Tier level (1, 2, 3, 4...)
150    pub tier_level: usize,
151    /// Enable promotion to upper tiers on hit
152    pub promotion_enabled: bool,
153    /// TTL scale factor (1.0 = same as base TTL)
154    pub ttl_scale: f64,
155}
156
157impl TierConfig {
158    /// Create new tier configuration
159    pub fn new(tier_level: usize) -> Self {
160        Self {
161            tier_level,
162            promotion_enabled: true,
163            ttl_scale: 1.0,
164        }
165    }
166
167    /// Configure as L1 (hot tier)
168    pub fn as_l1() -> Self {
169        Self {
170            tier_level: 1,
171            promotion_enabled: false, // L1 is already top tier
172            ttl_scale: 1.0,
173        }
174    }
175
176    /// Configure as L2 (warm tier)
177    pub fn as_l2() -> Self {
178        Self {
179            tier_level: 2,
180            promotion_enabled: true,
181            ttl_scale: 1.0,
182        }
183    }
184
185    /// Configure as L3 (cold tier) with longer TTL
186    pub fn as_l3() -> Self {
187        Self {
188            tier_level: 3,
189            promotion_enabled: true,
190            ttl_scale: 2.0, // Keep data 2x longer
191        }
192    }
193
194    /// Configure as L4 (archive tier) with much longer TTL
195    pub fn as_l4() -> Self {
196        Self {
197            tier_level: 4,
198            promotion_enabled: true,
199            ttl_scale: 8.0, // Keep data 8x longer
200        }
201    }
202
203    /// Set promotion enabled
204    pub fn with_promotion(mut self, enabled: bool) -> Self {
205        self.promotion_enabled = enabled;
206        self
207    }
208
209    /// Set TTL scale factor
210    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
211        self.ttl_scale = scale;
212        self
213    }
214
215    /// Set tier level
216    pub fn with_level(mut self, level: usize) -> Self {
217        self.tier_level = level;
218        self
219    }
220}
221
222/// Proxy wrapper to convert L2CacheBackend to CacheBackend
223/// (Rust doesn't support automatic trait upcasting for trait objects)
224struct ProxyCacheBackend {
225    backend: Arc<dyn L2CacheBackend>,
226}
227
228#[async_trait::async_trait]
229impl CacheBackend for ProxyCacheBackend {
230    async fn get(&self, key: &str) -> Option<serde_json::Value> {
231        self.backend.get(key).await
232    }
233
234    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
235        self.backend.set_with_ttl(key, value, ttl).await
236    }
237
238    async fn remove(&self, key: &str) -> Result<()> {
239        self.backend.remove(key).await
240    }
241
242    async fn health_check(&self) -> bool {
243        self.backend.health_check().await
244    }
245
246    fn name(&self) -> &str {
247        self.backend.name()
248    }
249}
250
251/// Cache Manager - Unified operations across multiple cache tiers
252///
253/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
254/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
255/// legacy L1+L2 behavior for backward compatibility.
256pub struct CacheManager {
257    /// Dynamic multi-tier cache architecture (v0.5.0+)
258    /// If Some, this takes precedence over l1_cache/l2_cache fields
259    tiers: Option<Vec<CacheTier>>,
260
261    // ===== Legacy fields (v0.1.0 - v0.4.x) =====
262    // Maintained for backward compatibility
263    /// L1 Cache (trait object for pluggable backends)
264    l1_cache: Arc<dyn CacheBackend>,
265    /// L2 Cache (trait object for pluggable backends)
266    l2_cache: Arc<dyn L2CacheBackend>,
267    /// L2 Cache concrete instance (for invalidation scan_keys)
268    l2_cache_concrete: Option<Arc<L2Cache>>,
269
270    /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
271    streaming_backend: Option<Arc<dyn StreamingBackend>>,
272    /// Statistics
273    total_requests: Arc<AtomicU64>,
274    l1_hits: Arc<AtomicU64>,
275    l2_hits: Arc<AtomicU64>,
276    misses: Arc<AtomicU64>,
277    promotions: Arc<AtomicUsize>,
278    /// In-flight requests to prevent Cache Stampede on L2/compute operations
279    in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
280    /// Invalidation publisher (for broadcasting invalidation messages)
281    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
282    /// Invalidation subscriber (for receiving invalidation messages)
283    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
284    /// Invalidation statistics
285    invalidation_stats: Arc<AtomicInvalidationStats>,
286}
287
288impl CacheManager {
289    /// Create new cache manager with trait objects (pluggable backends)
290    ///
291    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
292    ///
293    /// # Arguments
294    ///
295    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
296    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
297    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
298    ///
299    /// # Example
300    ///
301    /// ```rust,ignore
302    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
303    /// use std::sync::Arc;
304    ///
305    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
306    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
307    ///
308    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
309    /// ```
310    pub async fn new_with_backends(
311        l1_cache: Arc<dyn CacheBackend>,
312        l2_cache: Arc<dyn L2CacheBackend>,
313        streaming_backend: Option<Arc<dyn StreamingBackend>>,
314    ) -> Result<Self> {
315        println!("  🎯 Initializing Cache Manager with custom backends...");
316        println!("    L1: {}", l1_cache.name());
317        println!("    L2: {}", l2_cache.name());
318        if streaming_backend.is_some() {
319            println!("    Streaming: enabled");
320        } else {
321            println!("    Streaming: disabled");
322        }
323
324        Ok(Self {
325            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
326            l1_cache,
327            l2_cache,
328            l2_cache_concrete: None,
329            streaming_backend,
330            total_requests: Arc::new(AtomicU64::new(0)),
331            l1_hits: Arc::new(AtomicU64::new(0)),
332            l2_hits: Arc::new(AtomicU64::new(0)),
333            misses: Arc::new(AtomicU64::new(0)),
334            promotions: Arc::new(AtomicUsize::new(0)),
335            in_flight_requests: Arc::new(DashMap::new()),
336            invalidation_publisher: None,
337            invalidation_subscriber: None,
338            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
339        })
340    }
341
342    /// Create new cache manager with default backends (backward compatible)
343    ///
344    /// This is the legacy constructor maintained for backward compatibility.
345    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
346    ///
347    /// # Arguments
348    ///
349    /// * `l1_cache` - Moka L1 cache instance
350    /// * `l2_cache` - Redis L2 cache instance
351    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
352        println!("  🎯 Initializing Cache Manager...");
353
354        // Convert concrete types to trait objects
355        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
356        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
357
358        // Create RedisStreams backend for streaming functionality
359        let redis_url = std::env::var("REDIS_URL")
360            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
361        let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
362        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
363
364        Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
365    }
366
367    /// Create new cache manager with invalidation support
368    ///
369    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
370    ///
371    /// # Arguments
372    ///
373    /// * `l1_cache` - Moka L1 cache instance
374    /// * `l2_cache` - Redis L2 cache instance
375    /// * `redis_url` - Redis connection URL for Pub/Sub
376    /// * `config` - Invalidation configuration
377    ///
378    /// # Example
379    ///
380    /// ```rust,ignore
381    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
382    ///
383    /// let config = InvalidationConfig {
384    ///     channel: "my_app:cache:invalidate".to_string(),
385    ///     ..Default::default()
386    /// };
387    ///
388    /// let manager = CacheManager::new_with_invalidation(
389    ///     l1, l2, "redis://localhost", config
390    /// ).await?;
391    /// ```
392    pub async fn new_with_invalidation(
393        l1_cache: Arc<L1Cache>,
394        l2_cache: Arc<L2Cache>,
395        redis_url: &str,
396        config: InvalidationConfig,
397    ) -> Result<Self> {
398        println!("  🎯 Initializing Cache Manager with Invalidation...");
399        println!("    Pub/Sub channel: {}", config.channel);
400
401        // Convert concrete types to trait objects
402        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
403        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
404
405        // Create RedisStreams backend for streaming functionality
406        let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
407        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
408
409        // Create publisher
410        let client = redis::Client::open(redis_url)?;
411        let conn_manager = redis::aio::ConnectionManager::new(client).await?;
412        let publisher = InvalidationPublisher::new(conn_manager, config.clone());
413
414        // Create subscriber
415        let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
416        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
417
418        let manager = Self {
419            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
420            l1_cache: l1_backend,
421            l2_cache: l2_backend,
422            l2_cache_concrete: Some(l2_cache),
423            streaming_backend: Some(streaming_backend),
424            total_requests: Arc::new(AtomicU64::new(0)),
425            l1_hits: Arc::new(AtomicU64::new(0)),
426            l2_hits: Arc::new(AtomicU64::new(0)),
427            misses: Arc::new(AtomicU64::new(0)),
428            promotions: Arc::new(AtomicUsize::new(0)),
429            in_flight_requests: Arc::new(DashMap::new()),
430            invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
431            invalidation_subscriber: Some(Arc::new(subscriber)),
432            invalidation_stats,
433        };
434
435        // Start subscriber with handler
436        manager.start_invalidation_subscriber();
437
438        println!("  ✅ Cache Manager initialized with invalidation support");
439
440        Ok(manager)
441    }
442
443    /// Create new cache manager with multi-tier architecture (v0.5.0+)
444    ///
445    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
446    /// Tiers are checked in order (lower tier_level = faster/hotter).
447    ///
448    /// # Arguments
449    ///
450    /// * `tiers` - Vector of configured cache tiers (must be sorted by tier_level ascending)
451    /// * `streaming_backend` - Optional streaming backend
452    ///
453    /// # Example
454    ///
455    /// ```rust,ignore
456    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
457    /// use std::sync::Arc;
458    ///
459    /// // L1 + L2 + L3 setup
460    /// let l1 = Arc::new(L1Cache::new().await?);
461    /// let l2 = Arc::new(L2Cache::new().await?);
462    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
463    ///
464    /// let tiers = vec![
465    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
466    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
467    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
468    /// ];
469    ///
470    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
471    /// ```
472    pub async fn new_with_tiers(
473        tiers: Vec<CacheTier>,
474        streaming_backend: Option<Arc<dyn StreamingBackend>>,
475    ) -> Result<Self> {
476        println!("  🎯 Initializing Multi-Tier Cache Manager...");
477        println!("    Tier count: {}", tiers.len());
478        for tier in &tiers {
479            println!(
480                "    L{}: {} (promotion={}, ttl_scale={})",
481                tier.tier_level,
482                tier.stats.backend_name,
483                tier.promotion_enabled,
484                tier.ttl_scale
485            );
486        }
487
488        // Validate tiers are sorted by level
489        for i in 1..tiers.len() {
490            if tiers[i].tier_level <= tiers[i - 1].tier_level {
491                anyhow::bail!(
492                    "Tiers must be sorted by tier_level ascending (found L{} after L{})",
493                    tiers[i].tier_level,
494                    tiers[i - 1].tier_level
495                );
496            }
497        }
498
499        // For backward compatibility with legacy code, we need dummy l1/l2 caches
500        // Use first tier as l1, second tier as l2 if available
501        let (l1_cache, l2_cache) = if tiers.len() >= 2 {
502            (tiers[0].backend.clone(), tiers[1].backend.clone())
503        } else if tiers.len() == 1 {
504            // Only one tier - use it for both
505            let tier = tiers[0].backend.clone();
506            (tier.clone(), tier)
507        } else {
508            anyhow::bail!("At least one cache tier is required");
509        };
510
511        // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
512        let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
513            backend: l1_cache.clone(),
514        });
515
516        Ok(Self {
517            tiers: Some(tiers),
518            l1_cache: l1_backend,
519            l2_cache,
520            l2_cache_concrete: None,
521            streaming_backend,
522            total_requests: Arc::new(AtomicU64::new(0)),
523            l1_hits: Arc::new(AtomicU64::new(0)),
524            l2_hits: Arc::new(AtomicU64::new(0)),
525            misses: Arc::new(AtomicU64::new(0)),
526            promotions: Arc::new(AtomicUsize::new(0)),
527            in_flight_requests: Arc::new(DashMap::new()),
528            invalidation_publisher: None,
529            invalidation_subscriber: None,
530            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
531        })
532    }
533
534    /// Start the invalidation subscriber background task
535    fn start_invalidation_subscriber(&self) {
536        if let Some(subscriber) = &self.invalidation_subscriber {
537            let l1_cache = Arc::clone(&self.l1_cache);
538            let l2_cache_concrete = self.l2_cache_concrete.clone();
539
540            subscriber.start(move |msg| {
541                let l1 = Arc::clone(&l1_cache);
542                let _l2 = l2_cache_concrete.clone();
543
544                async move {
545                    match msg {
546                        InvalidationMessage::Remove { key } => {
547                            // Remove from L1
548                            l1.remove(&key).await?;
549                            println!("🗑️  [Invalidation] Removed '{}' from L1", key);
550                        }
551                        InvalidationMessage::Update { key, value, ttl_secs } => {
552                            // Update L1 with new value
553                            let ttl = ttl_secs
554                                .map(Duration::from_secs)
555                                .unwrap_or_else(|| Duration::from_secs(300));
556                            l1.set_with_ttl(&key, value, ttl).await?;
557                            println!("🔄 [Invalidation] Updated '{}' in L1", key);
558                        }
559                        InvalidationMessage::RemovePattern { pattern } => {
560                            // For pattern-based invalidation, we can't easily iterate L1 cache
561                            // So we just log it. The pattern invalidation is mainly for L2.
562                            // L1 entries will naturally expire via TTL.
563                            println!("🔍 [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
564                        }
565                        InvalidationMessage::RemoveBulk { keys } => {
566                            // Remove multiple keys from L1
567                            for key in keys {
568                                if let Err(e) = l1.remove(&key).await {
569                                    eprintln!("⚠️  Failed to remove '{}' from L1: {}", key, e);
570                                }
571                            }
572                            println!("🗑️  [Invalidation] Bulk removed keys from L1");
573                        }
574                    }
575                    Ok(())
576                }
577            });
578
579            println!("📡 Invalidation subscriber started");
580        }
581    }
582
583    /// Get value from cache using multi-tier architecture (v0.5.0+)
584    ///
585    /// This method iterates through all configured tiers and automatically promotes
586    /// to upper tiers on cache hit.
587    async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
588        let tiers = self.tiers.as_ref().unwrap(); // Safe: only called when tiers is Some
589
590        // Try each tier sequentially (sorted by tier_level)
591        for (tier_index, tier) in tiers.iter().enumerate() {
592            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
593                // Cache hit!
594                tier.record_hit();
595
596                // Promote to all upper tiers (if promotion enabled)
597                if tier.promotion_enabled && tier_index > 0 {
598                    let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
599
600                    // Promote to all tiers above this one
601                    for upper_tier in tiers[..tier_index].iter().rev() {
602                        if let Err(e) = upper_tier.set_with_ttl(key, value.clone(), promotion_ttl).await {
603                            eprintln!(
604                                "⚠️  Failed to promote '{}' from L{} to L{}: {}",
605                                key, tier.tier_level, upper_tier.tier_level, e
606                            );
607                        } else {
608                            self.promotions.fetch_add(1, Ordering::Relaxed);
609                            println!(
610                                "⬆️  Promoted '{}' from L{} to L{} (TTL: {:?})",
611                                key, tier.tier_level, upper_tier.tier_level, promotion_ttl
612                            );
613                        }
614                    }
615                }
616
617                return Ok(Some(value));
618            }
619        }
620
621        // Cache miss across all tiers
622        Ok(None)
623    }
624
625    /// Get value from cache (L1 first, then L2 fallback with promotion)
626    ///
627    /// This method now includes built-in Cache Stampede protection when cache misses occur.
628    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
629    /// unnecessary duplicate work on external data sources.
630    ///
631    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
632    ///
633    /// # Arguments
634    /// * `key` - Cache key to retrieve
635    ///
636    /// # Returns
637    /// * `Ok(Some(value))` - Cache hit, value found in any tier
638    /// * `Ok(None)` - Cache miss, value not found in any cache
639    /// * `Err(error)` - Cache operation failed
640    pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
641        self.total_requests.fetch_add(1, Ordering::Relaxed);
642
643        // NEW: Multi-tier mode (v0.5.0+)
644        if self.tiers.is_some() {
645            // Fast path for L1 (first tier) - no locking needed
646            if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
647                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
648                    tier1.record_hit();
649                    // Update legacy stats for backward compatibility
650                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
651                    return Ok(Some(value));
652                }
653            }
654
655            // L1 miss - use stampede protection for lower tiers
656            let key_owned = key.to_string();
657            let lock_guard = self.in_flight_requests
658                .entry(key_owned.clone())
659                .or_insert_with(|| Arc::new(Mutex::new(())))
660                .clone();
661
662            let _guard = lock_guard.lock().await;
663            let cleanup_guard = CleanupGuard {
664                map: &self.in_flight_requests,
665                key: key_owned.clone(),
666            };
667
668            // Double-check L1 after acquiring lock
669            if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
670                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
671                    tier1.record_hit();
672                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
673                    return Ok(Some(value));
674                }
675            }
676
677            // Check remaining tiers with promotion
678            let result = self.get_multi_tier(key).await?;
679
680            if result.is_some() {
681                // Hit in L2+ tier - update legacy stats
682                if self.tiers.as_ref().unwrap().len() >= 2 {
683                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
684                }
685            } else {
686                self.misses.fetch_add(1, Ordering::Relaxed);
687            }
688
689            drop(cleanup_guard);
690            return Ok(result);
691        }
692
693        // LEGACY: 2-tier mode (L1 + L2)
694        // Fast path: Try L1 first (no locking needed)
695        if let Some(value) = self.l1_cache.get(key).await {
696            self.l1_hits.fetch_add(1, Ordering::Relaxed);
697            return Ok(Some(value));
698        }
699        
700        // L1 miss - implement Cache Stampede protection for L2 lookup
701        let key_owned = key.to_string();
702        let lock_guard = self.in_flight_requests
703            .entry(key_owned.clone())
704            .or_insert_with(|| Arc::new(Mutex::new(())))
705            .clone();
706        
707        let _guard = lock_guard.lock().await;
708        
709        // RAII cleanup guard - ensures entry is removed even on early return or panic
710        let cleanup_guard = CleanupGuard {
711            map: &self.in_flight_requests,
712            key: key_owned.clone(),
713        };
714        
715        // Double-check L1 cache after acquiring lock
716        // (Another concurrent request might have populated it while we were waiting)
717        if let Some(value) = self.l1_cache.get(key).await {
718            self.l1_hits.fetch_add(1, Ordering::Relaxed);
719            // cleanup_guard will auto-remove entry on drop
720            return Ok(Some(value));
721        }
722        
723        // Check L2 cache with TTL information
724        if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
725            self.l2_hits.fetch_add(1, Ordering::Relaxed);
726
727            // Promote to L1 with same TTL as Redis (or default if no TTL)
728            let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
729
730            if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
731                // L1 promotion failed, but we still have the data
732                eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
733            } else {
734                self.promotions.fetch_add(1, Ordering::Relaxed);
735                println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
736            }
737
738            // cleanup_guard will auto-remove entry on drop
739            return Ok(Some(value));
740        }
741        
742        // Both L1 and L2 miss
743        self.misses.fetch_add(1, Ordering::Relaxed);
744        
745        // cleanup_guard will auto-remove entry on drop
746        drop(cleanup_guard);
747        
748        Ok(None)
749    }
750    
751    /// Get value from cache with fallback computation (enhanced backward compatibility)
752    /// 
753    /// This is a convenience method that combines `get()` with optional computation.
754    /// If the value is not found in cache, it will execute the compute function
755    /// and cache the result automatically.
756    /// 
757    /// # Arguments
758    /// * `key` - Cache key
759    /// * `compute_fn` - Optional function to compute value if not in cache
760    /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
761    /// 
762    /// # Returns
763    /// * `Ok(Some(value))` - Value found in cache or computed successfully
764    /// * `Ok(None)` - Value not in cache and no compute function provided
765    /// * `Err(error)` - Cache operation or computation failed
766    /// 
767    /// # Example
768    /// ```ignore
769    /// // Simple cache get (existing behavior)
770    /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
771    ///
772    /// // Get with computation fallback (new enhanced behavior)
773    /// let api_data = cache_manager.get_with_fallback(
774    ///     "api_response",
775    ///     Some(|| async { fetch_data_from_api().await }),
776    ///     Some(CacheStrategy::RealTime)
777    /// ).await?;
778    /// ```
779    
780    /// Set value with specific cache strategy (all tiers)
781    ///
782    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
783    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
784    pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
785        let ttl = strategy.to_duration();
786
787        // NEW: Multi-tier mode (v0.5.0+)
788        if let Some(tiers) = &self.tiers {
789            // Store in ALL tiers with their respective TTL scaling
790            let mut success_count = 0;
791            let mut last_error = None;
792
793            for tier in tiers {
794                match tier.set_with_ttl(key, value.clone(), ttl).await {
795                    Ok(_) => {
796                        success_count += 1;
797                    }
798                    Err(e) => {
799                        eprintln!("⚠️ L{} cache set failed for key '{}': {}", tier.tier_level, key, e);
800                        last_error = Some(e);
801                    }
802                }
803            }
804
805            if success_count > 0 {
806                println!("💾 [Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
807                         key, success_count, tiers.len(), ttl);
808                return Ok(());
809            } else {
810                return Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{}'", key)));
811            }
812        }
813
814        // LEGACY: 2-tier mode (L1 + L2)
815        // Store in both L1 and L2
816        let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
817        let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
818
819        // Return success if at least one cache succeeded
820        match (l1_result, l2_result) {
821            (Ok(_), Ok(_)) => {
822                // Both succeeded
823                println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
824                Ok(())
825            }
826            (Ok(_), Err(_)) => {
827                // L1 succeeded, L2 failed
828                eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
829                println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
830                Ok(())
831            }
832            (Err(_), Ok(_)) => {
833                // L1 failed, L2 succeeded
834                eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
835                println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
836                Ok(())
837            }
838            (Err(e1), Err(_e2)) => {
839                // Both failed
840                Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
841            }
842        }
843    }
844    
845    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
846    /// 
847    /// This method provides comprehensive Cache Stampede protection:
848    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
849    /// 2. Check L2 cache with mutex-based coalescing
850    /// 3. Compute fresh data with protection against concurrent computations
851    /// 
852    /// # Arguments
853    /// * `key` - Cache key
854    /// * `strategy` - Cache strategy for TTL and storage behavior
855    /// * `compute_fn` - Async function to compute the value if not in any cache
856    /// 
857    /// # Example
858    /// ```ignore
859    /// let api_data = cache_manager.get_or_compute_with(
860    ///     "api_response",
861    ///     CacheStrategy::RealTime,
862    ///     || async {
863    ///         fetch_data_from_api().await
864    ///     }
865    /// ).await?;
866    /// ```
867    #[allow(dead_code)]
868    pub async fn get_or_compute_with<F, Fut>(
869        &self,
870        key: &str,
871        strategy: CacheStrategy,
872        compute_fn: F,
873    ) -> Result<serde_json::Value>
874    where
875        F: FnOnce() -> Fut + Send,
876        Fut: Future<Output = Result<serde_json::Value>> + Send,
877    {
878        self.total_requests.fetch_add(1, Ordering::Relaxed);
879        
880        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
881        if let Some(value) = self.l1_cache.get(key).await {
882            self.l1_hits.fetch_add(1, Ordering::Relaxed);
883            return Ok(value);
884        }
885        
886        // 2. L1 miss - try L2 with Cache Stampede protection
887        let key_owned = key.to_string();
888        let lock_guard = self.in_flight_requests
889            .entry(key_owned.clone())
890            .or_insert_with(|| Arc::new(Mutex::new(())))
891            .clone();
892        
893        let _guard = lock_guard.lock().await;
894        
895        // RAII cleanup guard - ensures entry is removed even on early return or panic
896        let _cleanup_guard = CleanupGuard {
897            map: &self.in_flight_requests,
898            key: key_owned,
899        };
900        
901        // 3. Double-check L1 cache after acquiring lock
902        // (Another request might have populated it while we were waiting)
903        if let Some(value) = self.l1_cache.get(key).await {
904            self.l1_hits.fetch_add(1, Ordering::Relaxed);
905            // _cleanup_guard will auto-remove entry on drop
906            return Ok(value);
907        }
908
909        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
910        if let Some(tiers) = &self.tiers {
911            // Check tiers starting from index 1 (skip L1 since already checked)
912            for tier in tiers.iter().skip(1) {
913                if let Some((value, ttl)) = tier.get_with_ttl(key).await {
914                    tier.record_hit();
915
916                    // Promote to L1 (first tier)
917                    let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
918                    if let Err(e) = tiers[0].set_with_ttl(key, value.clone(), promotion_ttl).await {
919                        eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}", key, tier.tier_level, e);
920                    } else {
921                        self.promotions.fetch_add(1, Ordering::Relaxed);
922                        println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
923                                key, tier.tier_level, promotion_ttl);
924                    }
925
926                    // _cleanup_guard will auto-remove entry on drop
927                    return Ok(value);
928                }
929            }
930        } else {
931            // LEGACY: Check L2 cache with TTL
932            if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
933                self.l2_hits.fetch_add(1, Ordering::Relaxed);
934
935                // Promote to L1 using Redis TTL (or strategy TTL as fallback)
936                let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
937
938                if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
939                    eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
940                } else {
941                    self.promotions.fetch_add(1, Ordering::Relaxed);
942                    println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
943                }
944
945                // _cleanup_guard will auto-remove entry on drop
946                return Ok(value);
947            }
948        }
949
950        // 5. Cache miss across all tiers - compute fresh data
951        println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
952        let fresh_data = compute_fn().await?;
953        
954        // 6. Store in both caches
955        if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
956            eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
957        }
958        
959        // 7. _cleanup_guard will auto-remove entry on drop
960
961        Ok(fresh_data)
962    }
963
964    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
965    ///
966    /// This method provides the same functionality as `get_or_compute_with()` but with
967    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
968    /// API calls, or any computation that returns structured data.
969    ///
970    /// # Type Safety
971    ///
972    /// - Returns your actual type `T` instead of `serde_json::Value`
973    /// - Compiler enforces Serialize + DeserializeOwned bounds
974    /// - No manual JSON conversion needed
975    ///
976    /// # Cache Flow
977    ///
978    /// 1. Check L1 cache → deserialize if found
979    /// 2. Check L2 cache → deserialize + promote to L1 if found
980    /// 3. Execute compute_fn → serialize → store in L1+L2
981    /// 4. Full stampede protection (only ONE request computes)
982    ///
983    /// # Arguments
984    ///
985    /// * `key` - Cache key
986    /// * `strategy` - Cache strategy for TTL
987    /// * `compute_fn` - Async function returning `Result<T>`
988    ///
989    /// # Example - Database Query
990    ///
991    /// ```no_run
992    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
993    /// # use std::sync::Arc;
994    /// # use serde::{Serialize, Deserialize};
995    /// # async fn example() -> anyhow::Result<()> {
996    /// # let l1 = Arc::new(L1Cache::new().await?);
997    /// # let l2 = Arc::new(L2Cache::new().await?);
998    /// # let cache_manager = CacheManager::new(l1, l2);
999    ///
1000    /// #[derive(Serialize, Deserialize)]
1001    /// struct User {
1002    ///     id: i64,
1003    ///     name: String,
1004    /// }
1005    ///
1006    /// // Type-safe database caching (example - requires sqlx)
1007    /// // let user: User = cache_manager.get_or_compute_typed(
1008    /// //     "user:123",
1009    /// //     CacheStrategy::MediumTerm,
1010    /// //     || async {
1011    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1012    /// //             .bind(123)
1013    /// //             .fetch_one(&pool)
1014    /// //             .await
1015    /// //     }
1016    /// // ).await?;
1017    /// # Ok(())
1018    /// # }
1019    /// ```
1020    ///
1021    /// # Example - API Call
1022    ///
1023    /// ```no_run
1024    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1025    /// # use std::sync::Arc;
1026    /// # use serde::{Serialize, Deserialize};
1027    /// # async fn example() -> anyhow::Result<()> {
1028    /// # let l1 = Arc::new(L1Cache::new().await?);
1029    /// # let l2 = Arc::new(L2Cache::new().await?);
1030    /// # let cache_manager = CacheManager::new(l1, l2);
1031    /// #[derive(Serialize, Deserialize)]
1032    /// struct ApiResponse {
1033    ///     data: String,
1034    ///     timestamp: i64,
1035    /// }
1036    ///
1037    /// // API call caching (example - requires reqwest)
1038    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1039    /// //     "api:endpoint",
1040    /// //     CacheStrategy::RealTime,
1041    /// //     || async {
1042    /// //         reqwest::get("https://api.example.com/data")
1043    /// //             .await?
1044    /// //             .json::<ApiResponse>()
1045    /// //             .await
1046    /// //     }
1047    /// // ).await?;
1048    /// # Ok(())
1049    /// # }
1050    /// ```
1051    ///
1052    /// # Performance
1053    ///
1054    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1055    /// - L2 hit: 2-5ms + deserialization + L1 promotion
1056    /// - Compute: Your function time + serialization + L1+L2 storage
1057    /// - Stampede protection: 99.6% latency reduction under high concurrency
1058    ///
1059    /// # Errors
1060    ///
1061    /// Returns error if:
1062    /// - Compute function fails
1063    /// - Serialization fails (invalid type for JSON)
1064    /// - Deserialization fails (cache data doesn't match type T)
1065    /// - Cache operations fail (Redis connection issues)
1066    pub async fn get_or_compute_typed<T, F, Fut>(
1067        &self,
1068        key: &str,
1069        strategy: CacheStrategy,
1070        compute_fn: F,
1071    ) -> Result<T>
1072    where
1073        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1074        F: FnOnce() -> Fut + Send,
1075        Fut: Future<Output = Result<T>> + Send,
1076    {
1077        self.total_requests.fetch_add(1, Ordering::Relaxed);
1078
1079        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1080        if let Some(cached_json) = self.l1_cache.get(key).await {
1081            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1082
1083            // Attempt to deserialize from JSON to type T
1084            match serde_json::from_value::<T>(cached_json) {
1085                Ok(typed_value) => {
1086                    println!("✅ [L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
1087                    return Ok(typed_value);
1088                }
1089                Err(e) => {
1090                    // Deserialization failed - cache data may be stale or corrupt
1091                    eprintln!("⚠️ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1092                    // Fall through to recompute
1093                }
1094            }
1095        }
1096
1097        // 2. L1 miss - try L2 with Cache Stampede protection
1098        let key_owned = key.to_string();
1099        let lock_guard = self.in_flight_requests
1100            .entry(key_owned.clone())
1101            .or_insert_with(|| Arc::new(Mutex::new(())))
1102            .clone();
1103
1104        let _guard = lock_guard.lock().await;
1105
1106        // RAII cleanup guard - ensures entry is removed even on early return or panic
1107        let _cleanup_guard = CleanupGuard {
1108            map: &self.in_flight_requests,
1109            key: key_owned,
1110        };
1111
1112        // 3. Double-check L1 cache after acquiring lock
1113        // (Another request might have populated it while we were waiting)
1114        if let Some(cached_json) = self.l1_cache.get(key).await {
1115            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1116            if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1117                println!("✅ [L1 HIT] Deserialized '{}' after lock acquisition", key);
1118                return Ok(typed_value);
1119            }
1120        }
1121
1122        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1123        if let Some(tiers) = &self.tiers {
1124            // Check tiers starting from index 1 (skip L1 since already checked)
1125            for tier in tiers.iter().skip(1) {
1126                if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1127                    tier.record_hit();
1128
1129                    // Attempt to deserialize
1130                    match serde_json::from_value::<T>(cached_json.clone()) {
1131                        Ok(typed_value) => {
1132                            println!("✅ [L{} HIT] Deserialized '{}' to type {}",
1133                                    tier.tier_level, key, std::any::type_name::<T>());
1134
1135                            // Promote to L1 (first tier)
1136                            let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1137                            if let Err(e) = tiers[0].set_with_ttl(key, cached_json, promotion_ttl).await {
1138                                eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}",
1139                                         key, tier.tier_level, e);
1140                            } else {
1141                                self.promotions.fetch_add(1, Ordering::Relaxed);
1142                                println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1143                                        key, tier.tier_level, promotion_ttl);
1144                            }
1145
1146                            return Ok(typed_value);
1147                        }
1148                        Err(e) => {
1149                            eprintln!("⚠️ L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1150                                     tier.tier_level, key, e);
1151                            // Continue to next tier
1152                        }
1153                    }
1154                }
1155            }
1156        } else {
1157            // LEGACY: Check L2 cache with TTL
1158            if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1159                self.l2_hits.fetch_add(1, Ordering::Relaxed);
1160
1161                // Attempt to deserialize
1162                match serde_json::from_value::<T>(cached_json.clone()) {
1163                    Ok(typed_value) => {
1164                        println!("✅ [L2 HIT] Deserialized '{}' from Redis", key);
1165
1166                        // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1167                        let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1168
1169                        if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
1170                            eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
1171                        } else {
1172                            self.promotions.fetch_add(1, Ordering::Relaxed);
1173                            println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
1174                        }
1175
1176                        return Ok(typed_value);
1177                    }
1178                    Err(e) => {
1179                        eprintln!("⚠️ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1180                        // Fall through to recompute
1181                    }
1182                }
1183            }
1184        }
1185
1186        // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1187        println!("💻 Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
1188        let typed_value = compute_fn().await?;
1189
1190        // 6. Serialize to JSON for storage
1191        let json_value = serde_json::to_value(&typed_value)
1192            .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
1193
1194        // 7. Store in both L1 and L2 caches
1195        if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1196            eprintln!("⚠️ Failed to cache computed typed data for key '{}': {}", key, e);
1197        } else {
1198            println!("💾 Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
1199        }
1200
1201        // 8. _cleanup_guard will auto-remove entry on drop
1202
1203        Ok(typed_value)
1204    }
1205
1206    /// Get comprehensive cache statistics
1207    ///
1208    /// In multi-tier mode, aggregates statistics from all tiers.
1209    /// In legacy mode, returns L1 and L2 stats.
1210    #[allow(dead_code)]
1211    pub fn get_stats(&self) -> CacheManagerStats {
1212        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1213        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1214        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1215        let misses = self.misses.load(Ordering::Relaxed);
1216
1217        CacheManagerStats {
1218            total_requests: total_reqs,
1219            l1_hits,
1220            l2_hits,
1221            total_hits: l1_hits + l2_hits,
1222            misses,
1223            hit_rate: if total_reqs > 0 {
1224                ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1225            } else { 0.0 },
1226            l1_hit_rate: if total_reqs > 0 {
1227                (l1_hits as f64 / total_reqs as f64) * 100.0
1228            } else { 0.0 },
1229            promotions: self.promotions.load(Ordering::Relaxed),
1230            in_flight_requests: self.in_flight_requests.len(),
1231        }
1232    }
1233
1234    /// Get per-tier statistics (v0.5.0+)
1235    ///
1236    /// Returns statistics for each tier if multi-tier mode is enabled.
1237    /// Returns None if using legacy 2-tier mode.
1238    ///
1239    /// # Example
1240    /// ```rust,ignore
1241    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1242    ///     for stats in tier_stats {
1243    ///         println!("L{}: {} hits ({})",
1244    ///                  stats.tier_level,
1245    ///                  stats.hit_count(),
1246    ///                  stats.backend_name);
1247    ///     }
1248    /// }
1249    /// ```
1250    pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1251        self.tiers.as_ref().map(|tiers| {
1252            tiers.iter().map(|tier| tier.stats.clone()).collect()
1253        })
1254    }
1255    
1256    // ===== Redis Streams Methods =====
1257    
1258    /// Publish data to Redis Stream
1259    ///
1260    /// # Arguments
1261    /// * `stream_key` - Name of the stream (e.g., "events_stream")
1262    /// * `fields` - Field-value pairs to publish
1263    /// * `maxlen` - Optional max length for stream trimming
1264    ///
1265    /// # Returns
1266    /// The entry ID generated by Redis
1267    ///
1268    /// # Errors
1269    /// Returns error if streaming backend is not configured
1270    pub async fn publish_to_stream(
1271        &self,
1272        stream_key: &str,
1273        fields: Vec<(String, String)>,
1274        maxlen: Option<usize>
1275    ) -> Result<String> {
1276        match &self.streaming_backend {
1277            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1278            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1279        }
1280    }
1281    
1282    /// Read latest entries from Redis Stream
1283    ///
1284    /// # Arguments
1285    /// * `stream_key` - Name of the stream
1286    /// * `count` - Number of latest entries to retrieve
1287    ///
1288    /// # Returns
1289    /// Vector of (entry_id, fields) tuples (newest first)
1290    ///
1291    /// # Errors
1292    /// Returns error if streaming backend is not configured
1293    pub async fn read_stream_latest(
1294        &self,
1295        stream_key: &str,
1296        count: usize
1297    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1298        match &self.streaming_backend {
1299            Some(backend) => backend.stream_read_latest(stream_key, count).await,
1300            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1301        }
1302    }
1303    
1304    /// Read from Redis Stream with optional blocking
1305    ///
1306    /// # Arguments
1307    /// * `stream_key` - Name of the stream
1308    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1309    /// * `count` - Max entries to retrieve
1310    /// * `block_ms` - Optional blocking timeout in ms
1311    ///
1312    /// # Returns
1313    /// Vector of (entry_id, fields) tuples
1314    ///
1315    /// # Errors
1316    /// Returns error if streaming backend is not configured
1317    pub async fn read_stream(
1318        &self,
1319        stream_key: &str,
1320        last_id: &str,
1321        count: usize,
1322        block_ms: Option<usize>
1323    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1324        match &self.streaming_backend {
1325            Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
1326            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1327        }
1328    }
1329
1330    // ===== Cache Invalidation Methods =====
1331
1332    /// Invalidate a cache key across all instances
1333    ///
1334    /// This removes the key from all cache tiers and broadcasts
1335    /// the invalidation to all other cache instances via Redis Pub/Sub.
1336    ///
1337    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1338    ///
1339    /// # Arguments
1340    /// * `key` - Cache key to invalidate
1341    ///
1342    /// # Example
1343    /// ```rust,ignore
1344    /// // Invalidate user cache after profile update
1345    /// cache_manager.invalidate("user:123").await?;
1346    /// ```
1347    pub async fn invalidate(&self, key: &str) -> Result<()> {
1348        // NEW: Multi-tier mode (v0.5.0+)
1349        if let Some(tiers) = &self.tiers {
1350            // Remove from ALL tiers
1351            for tier in tiers {
1352                if let Err(e) = tier.remove(key).await {
1353                    eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1354                }
1355            }
1356        } else {
1357            // LEGACY: 2-tier mode
1358            self.l1_cache.remove(key).await?;
1359            self.l2_cache.remove(key).await?;
1360        }
1361
1362        // Broadcast to other instances
1363        if let Some(publisher) = &self.invalidation_publisher {
1364            let mut pub_lock = publisher.lock().await;
1365            let msg = InvalidationMessage::remove(key);
1366            pub_lock.publish(&msg).await?;
1367            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1368        }
1369
1370        println!("🗑️  Invalidated '{}' across all instances", key);
1371        Ok(())
1372    }
1373
1374    /// Update cache value across all instances
1375    ///
1376    /// This updates the key in all cache tiers and broadcasts
1377    /// the update to all other cache instances, avoiding cache misses.
1378    ///
1379    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1380    ///
1381    /// # Arguments
1382    /// * `key` - Cache key to update
1383    /// * `value` - New value
1384    /// * `ttl` - Optional TTL (uses default if None)
1385    ///
1386    /// # Example
1387    /// ```rust,ignore
1388    /// // Update user cache with new data
1389    /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1390    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1391    /// ```
1392    pub async fn update_cache(
1393        &self,
1394        key: &str,
1395        value: serde_json::Value,
1396        ttl: Option<Duration>,
1397    ) -> Result<()> {
1398        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1399
1400        // NEW: Multi-tier mode (v0.5.0+)
1401        if let Some(tiers) = &self.tiers {
1402            // Update ALL tiers with their respective TTL scaling
1403            for tier in tiers {
1404                if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1405                    eprintln!("⚠️ Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1406                }
1407            }
1408        } else {
1409            // LEGACY: 2-tier mode
1410            self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1411            self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1412        }
1413
1414        // Broadcast update to other instances
1415        if let Some(publisher) = &self.invalidation_publisher {
1416            let mut pub_lock = publisher.lock().await;
1417            let msg = InvalidationMessage::update(key, value, Some(ttl));
1418            pub_lock.publish(&msg).await?;
1419            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1420        }
1421
1422        println!("🔄 Updated '{}' across all instances", key);
1423        Ok(())
1424    }
1425
1426    /// Invalidate all keys matching a pattern
1427    ///
1428    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1429    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1430    ///
1431    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1432    ///
1433    /// **Note**: Pattern scanning requires a concrete L2Cache instance with `scan_keys()`.
1434    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1435    ///
1436    /// # Arguments
1437    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1438    ///
1439    /// # Example
1440    /// ```rust,ignore
1441    /// // Invalidate all user caches
1442    /// cache_manager.invalidate_pattern("user:*").await?;
1443    ///
1444    /// // Invalidate specific user's related caches
1445    /// cache_manager.invalidate_pattern("user:123:*").await?;
1446    /// ```
1447    pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1448        // Scan L2 for matching keys
1449        // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1450        let keys = if let Some(l2) = &self.l2_cache_concrete {
1451            l2.scan_keys(pattern).await?
1452        } else {
1453            return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
1454        };
1455
1456        if keys.is_empty() {
1457            println!("🔍 No keys found matching pattern '{}'", pattern);
1458            return Ok(());
1459        }
1460
1461        // NEW: Multi-tier mode (v0.5.0+)
1462        if let Some(tiers) = &self.tiers {
1463            // Remove from ALL tiers
1464            for key in &keys {
1465                for tier in tiers {
1466                    if let Err(e) = tier.remove(key).await {
1467                        eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1468                    }
1469                }
1470            }
1471        } else {
1472            // LEGACY: 2-tier mode - Remove from L2 in bulk
1473            if let Some(l2) = &self.l2_cache_concrete {
1474                l2.remove_bulk(&keys).await?;
1475            }
1476        }
1477
1478        // Broadcast pattern invalidation
1479        if let Some(publisher) = &self.invalidation_publisher {
1480            let mut pub_lock = publisher.lock().await;
1481            let msg = InvalidationMessage::remove_bulk(keys.clone());
1482            pub_lock.publish(&msg).await?;
1483            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1484        }
1485
1486        println!("🔍 Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
1487        Ok(())
1488    }
1489
1490    /// Set value with automatic broadcast to all instances
1491    ///
1492    /// This is a write-through operation that updates the cache and
1493    /// broadcasts the update to all other instances automatically.
1494    ///
1495    /// # Arguments
1496    /// * `key` - Cache key
1497    /// * `value` - Value to cache
1498    /// * `strategy` - Cache strategy (determines TTL)
1499    ///
1500    /// # Example
1501    /// ```rust,ignore
1502    /// // Update and broadcast in one call
1503    /// let data = serde_json::json!({"status": "active"});
1504    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1505    /// ```
1506    pub async fn set_with_broadcast(
1507        &self,
1508        key: &str,
1509        value: serde_json::Value,
1510        strategy: CacheStrategy,
1511    ) -> Result<()> {
1512        let ttl = strategy.to_duration();
1513
1514        // Set in local caches
1515        self.set_with_strategy(key, value.clone(), strategy).await?;
1516
1517        // Broadcast update if invalidation is enabled
1518        if let Some(publisher) = &self.invalidation_publisher {
1519            let mut pub_lock = publisher.lock().await;
1520            let msg = InvalidationMessage::update(key, value, Some(ttl));
1521            pub_lock.publish(&msg).await?;
1522            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1523        }
1524
1525        Ok(())
1526    }
1527
1528    /// Get invalidation statistics
1529    ///
1530    /// Returns statistics about invalidation operations if invalidation is enabled.
1531    pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1532        if self.invalidation_subscriber.is_some() {
1533            Some(self.invalidation_stats.snapshot())
1534        } else {
1535            None
1536        }
1537    }
1538}
1539
1540/// Cache Manager statistics
1541#[allow(dead_code)]
1542#[derive(Debug, Clone)]
1543pub struct CacheManagerStats {
1544    pub total_requests: u64,
1545    pub l1_hits: u64,
1546    pub l2_hits: u64,
1547    pub total_hits: u64,
1548    pub misses: u64,
1549    pub hit_rate: f64,
1550    pub l1_hit_rate: f64,
1551    pub promotions: usize,
1552    pub in_flight_requests: usize,
1553}