multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use super::l1_cache::L1Cache;
15use super::l2_cache::L2Cache;
16use super::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
17use super::invalidation::{
18    InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
19    InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
20};
21
22/// RAII cleanup guard for in-flight request tracking
23/// Ensures that entries are removed from DashMap even on early return or panic
24struct CleanupGuard<'a> {
25    map: &'a DashMap<String, Arc<Mutex<()>>>,
26    key: String,
27}
28
29impl<'a> Drop for CleanupGuard<'a> {
30    fn drop(&mut self) {
31        self.map.remove(&self.key);
32    }
33}
34
35/// Cache strategies for different data types
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub enum CacheStrategy {
39    /// Real-time data - 10 seconds TTL
40    RealTime,
41    /// Short-term data - 5 minutes TTL  
42    ShortTerm,
43    /// Medium-term data - 1 hour TTL
44    MediumTerm,
45    /// Long-term data - 3 hours TTL
46    LongTerm,
47    /// Custom TTL
48    Custom(Duration),
49    /// Default strategy (5 minutes)
50    Default,
51}
52
53impl CacheStrategy {
54    /// Convert strategy to duration
55    pub fn to_duration(&self) -> Duration {
56        match self {
57            Self::RealTime => Duration::from_secs(10),
58            Self::ShortTerm => Duration::from_secs(300), // 5 minutes
59            Self::MediumTerm => Duration::from_secs(3600), // 1 hour
60            Self::LongTerm => Duration::from_secs(10800), // 3 hours
61            Self::Custom(duration) => *duration,
62            Self::Default => Duration::from_secs(300),
63        }
64    }
65}
66
67/// Statistics for a single cache tier
68#[derive(Debug, Clone)]
69pub struct TierStats {
70    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
71    pub tier_level: usize,
72    /// Number of cache hits at this tier
73    pub hits: Arc<AtomicU64>,
74    /// Backend name for identification
75    pub backend_name: String,
76}
77
78impl TierStats {
79    fn new(tier_level: usize, backend_name: String) -> Self {
80        Self {
81            tier_level,
82            hits: Arc::new(AtomicU64::new(0)),
83            backend_name,
84        }
85    }
86
87    /// Get current hit count
88    pub fn hit_count(&self) -> u64 {
89        self.hits.load(Ordering::Relaxed)
90    }
91}
92
93/// A single cache tier in the multi-tier architecture
94pub struct CacheTier {
95    /// Cache backend for this tier
96    backend: Arc<dyn L2CacheBackend>,
97    /// Tier level (1 = hottest/fastest, higher = colder/slower)
98    tier_level: usize,
99    /// Enable automatic promotion to upper tiers on cache hit
100    promotion_enabled: bool,
101    /// TTL scale factor (multiplier for TTL when storing/promoting)
102    ttl_scale: f64,
103    /// Statistics for this tier
104    stats: TierStats,
105}
106
107impl CacheTier {
108    /// Create a new cache tier
109    pub fn new(
110        backend: Arc<dyn L2CacheBackend>,
111        tier_level: usize,
112        promotion_enabled: bool,
113        ttl_scale: f64,
114    ) -> Self {
115        let backend_name = backend.name().to_string();
116        Self {
117            backend,
118            tier_level,
119            promotion_enabled,
120            ttl_scale,
121            stats: TierStats::new(tier_level, backend_name),
122        }
123    }
124
125    /// Get value with TTL from this tier
126    async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
127        self.backend.get_with_ttl(key).await
128    }
129
130    /// Set value with TTL in this tier
131    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
132        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
133        self.backend.set_with_ttl(key, value, scaled_ttl).await
134    }
135
136    /// Remove value from this tier
137    async fn remove(&self, key: &str) -> Result<()> {
138        self.backend.remove(key).await
139    }
140
141    /// Record a cache hit for this tier
142    fn record_hit(&self) {
143        self.stats.hits.fetch_add(1, Ordering::Relaxed);
144    }
145}
146
147/// Configuration for a cache tier (used in builder pattern)
148#[derive(Debug, Clone)]
149pub struct TierConfig {
150    /// Tier level (1, 2, 3, 4...)
151    pub tier_level: usize,
152    /// Enable promotion to upper tiers on hit
153    pub promotion_enabled: bool,
154    /// TTL scale factor (1.0 = same as base TTL)
155    pub ttl_scale: f64,
156}
157
158impl TierConfig {
159    /// Create new tier configuration
160    pub fn new(tier_level: usize) -> Self {
161        Self {
162            tier_level,
163            promotion_enabled: true,
164            ttl_scale: 1.0,
165        }
166    }
167
168    /// Configure as L1 (hot tier)
169    pub fn as_l1() -> Self {
170        Self {
171            tier_level: 1,
172            promotion_enabled: false, // L1 is already top tier
173            ttl_scale: 1.0,
174        }
175    }
176
177    /// Configure as L2 (warm tier)
178    pub fn as_l2() -> Self {
179        Self {
180            tier_level: 2,
181            promotion_enabled: true,
182            ttl_scale: 1.0,
183        }
184    }
185
186    /// Configure as L3 (cold tier) with longer TTL
187    pub fn as_l3() -> Self {
188        Self {
189            tier_level: 3,
190            promotion_enabled: true,
191            ttl_scale: 2.0, // Keep data 2x longer
192        }
193    }
194
195    /// Configure as L4 (archive tier) with much longer TTL
196    pub fn as_l4() -> Self {
197        Self {
198            tier_level: 4,
199            promotion_enabled: true,
200            ttl_scale: 8.0, // Keep data 8x longer
201        }
202    }
203
204    /// Set promotion enabled
205    pub fn with_promotion(mut self, enabled: bool) -> Self {
206        self.promotion_enabled = enabled;
207        self
208    }
209
210    /// Set TTL scale factor
211    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
212        self.ttl_scale = scale;
213        self
214    }
215
216    /// Set tier level
217    pub fn with_level(mut self, level: usize) -> Self {
218        self.tier_level = level;
219        self
220    }
221}
222
223/// Proxy wrapper to convert L2CacheBackend to CacheBackend
224/// (Rust doesn't support automatic trait upcasting for trait objects)
225struct ProxyCacheBackend {
226    backend: Arc<dyn L2CacheBackend>,
227}
228
229#[async_trait::async_trait]
230impl CacheBackend for ProxyCacheBackend {
231    async fn get(&self, key: &str) -> Option<serde_json::Value> {
232        self.backend.get(key).await
233    }
234
235    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
236        self.backend.set_with_ttl(key, value, ttl).await
237    }
238
239    async fn remove(&self, key: &str) -> Result<()> {
240        self.backend.remove(key).await
241    }
242
243    async fn health_check(&self) -> bool {
244        self.backend.health_check().await
245    }
246
247    fn name(&self) -> &str {
248        self.backend.name()
249    }
250}
251
252/// Cache Manager - Unified operations across multiple cache tiers
253///
254/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
255/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
256/// legacy L1+L2 behavior for backward compatibility.
257pub struct CacheManager {
258    /// Dynamic multi-tier cache architecture (v0.5.0+)
259    /// If Some, this takes precedence over l1_cache/l2_cache fields
260    tiers: Option<Vec<CacheTier>>,
261
262    // ===== Legacy fields (v0.1.0 - v0.4.x) =====
263    // Maintained for backward compatibility
264    /// L1 Cache (trait object for pluggable backends)
265    l1_cache: Arc<dyn CacheBackend>,
266    /// L2 Cache (trait object for pluggable backends)
267    l2_cache: Arc<dyn L2CacheBackend>,
268    /// L2 Cache concrete instance (for invalidation scan_keys)
269    l2_cache_concrete: Option<Arc<L2Cache>>,
270
271    /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
272    streaming_backend: Option<Arc<dyn StreamingBackend>>,
273    /// Statistics
274    total_requests: Arc<AtomicU64>,
275    l1_hits: Arc<AtomicU64>,
276    l2_hits: Arc<AtomicU64>,
277    misses: Arc<AtomicU64>,
278    promotions: Arc<AtomicUsize>,
279    /// In-flight requests to prevent Cache Stampede on L2/compute operations
280    in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
281    /// Invalidation publisher (for broadcasting invalidation messages)
282    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
283    /// Invalidation subscriber (for receiving invalidation messages)
284    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
285    /// Invalidation statistics
286    invalidation_stats: Arc<AtomicInvalidationStats>,
287}
288
289impl CacheManager {
290    /// Create new cache manager with trait objects (pluggable backends)
291    ///
292    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
293    ///
294    /// # Arguments
295    ///
296    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
297    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
298    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
299    ///
300    /// # Example
301    ///
302    /// ```rust,ignore
303    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
304    /// use std::sync::Arc;
305    ///
306    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
307    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
308    ///
309    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
310    /// ```
311    pub async fn new_with_backends(
312        l1_cache: Arc<dyn CacheBackend>,
313        l2_cache: Arc<dyn L2CacheBackend>,
314        streaming_backend: Option<Arc<dyn StreamingBackend>>,
315    ) -> Result<Self> {
316        println!("  🎯 Initializing Cache Manager with custom backends...");
317        println!("    L1: {}", l1_cache.name());
318        println!("    L2: {}", l2_cache.name());
319        if streaming_backend.is_some() {
320            println!("    Streaming: enabled");
321        } else {
322            println!("    Streaming: disabled");
323        }
324
325        Ok(Self {
326            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
327            l1_cache,
328            l2_cache,
329            l2_cache_concrete: None,
330            streaming_backend,
331            total_requests: Arc::new(AtomicU64::new(0)),
332            l1_hits: Arc::new(AtomicU64::new(0)),
333            l2_hits: Arc::new(AtomicU64::new(0)),
334            misses: Arc::new(AtomicU64::new(0)),
335            promotions: Arc::new(AtomicUsize::new(0)),
336            in_flight_requests: Arc::new(DashMap::new()),
337            invalidation_publisher: None,
338            invalidation_subscriber: None,
339            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
340        })
341    }
342
343    /// Create new cache manager with default backends (backward compatible)
344    ///
345    /// This is the legacy constructor maintained for backward compatibility.
346    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
347    ///
348    /// # Arguments
349    ///
350    /// * `l1_cache` - Moka L1 cache instance
351    /// * `l2_cache` - Redis L2 cache instance
352    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
353        println!("  🎯 Initializing Cache Manager...");
354
355        // Convert concrete types to trait objects
356        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
357        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
358
359        // Create RedisStreams backend for streaming functionality
360        let redis_url = std::env::var("REDIS_URL")
361            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
362        let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
363        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
364
365        Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
366    }
367
368    /// Create new cache manager with invalidation support
369    ///
370    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
371    ///
372    /// # Arguments
373    ///
374    /// * `l1_cache` - Moka L1 cache instance
375    /// * `l2_cache` - Redis L2 cache instance
376    /// * `redis_url` - Redis connection URL for Pub/Sub
377    /// * `config` - Invalidation configuration
378    ///
379    /// # Example
380    ///
381    /// ```rust,ignore
382    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
383    ///
384    /// let config = InvalidationConfig {
385    ///     channel: "my_app:cache:invalidate".to_string(),
386    ///     ..Default::default()
387    /// };
388    ///
389    /// let manager = CacheManager::new_with_invalidation(
390    ///     l1, l2, "redis://localhost", config
391    /// ).await?;
392    /// ```
393    pub async fn new_with_invalidation(
394        l1_cache: Arc<L1Cache>,
395        l2_cache: Arc<L2Cache>,
396        redis_url: &str,
397        config: InvalidationConfig,
398    ) -> Result<Self> {
399        println!("  🎯 Initializing Cache Manager with Invalidation...");
400        println!("    Pub/Sub channel: {}", config.channel);
401
402        // Convert concrete types to trait objects
403        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
404        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
405
406        // Create RedisStreams backend for streaming functionality
407        let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
408        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
409
410        // Create publisher
411        let client = redis::Client::open(redis_url)?;
412        let conn_manager = redis::aio::ConnectionManager::new(client).await?;
413        let publisher = InvalidationPublisher::new(conn_manager, config.clone());
414
415        // Create subscriber
416        let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
417        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
418
419        let manager = Self {
420            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
421            l1_cache: l1_backend,
422            l2_cache: l2_backend,
423            l2_cache_concrete: Some(l2_cache),
424            streaming_backend: Some(streaming_backend),
425            total_requests: Arc::new(AtomicU64::new(0)),
426            l1_hits: Arc::new(AtomicU64::new(0)),
427            l2_hits: Arc::new(AtomicU64::new(0)),
428            misses: Arc::new(AtomicU64::new(0)),
429            promotions: Arc::new(AtomicUsize::new(0)),
430            in_flight_requests: Arc::new(DashMap::new()),
431            invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
432            invalidation_subscriber: Some(Arc::new(subscriber)),
433            invalidation_stats,
434        };
435
436        // Start subscriber with handler
437        manager.start_invalidation_subscriber();
438
439        println!("  ✅ Cache Manager initialized with invalidation support");
440
441        Ok(manager)
442    }
443
444    /// Create new cache manager with multi-tier architecture (v0.5.0+)
445    ///
446    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
447    /// Tiers are checked in order (lower tier_level = faster/hotter).
448    ///
449    /// # Arguments
450    ///
451    /// * `tiers` - Vector of configured cache tiers (must be sorted by tier_level ascending)
452    /// * `streaming_backend` - Optional streaming backend
453    ///
454    /// # Example
455    ///
456    /// ```rust,ignore
457    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
458    /// use std::sync::Arc;
459    ///
460    /// // L1 + L2 + L3 setup
461    /// let l1 = Arc::new(L1Cache::new().await?);
462    /// let l2 = Arc::new(L2Cache::new().await?);
463    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
464    ///
465    /// let tiers = vec![
466    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
467    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
468    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
469    /// ];
470    ///
471    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
472    /// ```
473    pub async fn new_with_tiers(
474        tiers: Vec<CacheTier>,
475        streaming_backend: Option<Arc<dyn StreamingBackend>>,
476    ) -> Result<Self> {
477        println!("  🎯 Initializing Multi-Tier Cache Manager...");
478        println!("    Tier count: {}", tiers.len());
479        for tier in &tiers {
480            println!(
481                "    L{}: {} (promotion={}, ttl_scale={})",
482                tier.tier_level,
483                tier.stats.backend_name,
484                tier.promotion_enabled,
485                tier.ttl_scale
486            );
487        }
488
489        // Validate tiers are sorted by level
490        for i in 1..tiers.len() {
491            if tiers[i].tier_level <= tiers[i - 1].tier_level {
492                anyhow::bail!(
493                    "Tiers must be sorted by tier_level ascending (found L{} after L{})",
494                    tiers[i].tier_level,
495                    tiers[i - 1].tier_level
496                );
497            }
498        }
499
500        // For backward compatibility with legacy code, we need dummy l1/l2 caches
501        // Use first tier as l1, second tier as l2 if available
502        let (l1_cache, l2_cache) = if tiers.len() >= 2 {
503            (tiers[0].backend.clone(), tiers[1].backend.clone())
504        } else if tiers.len() == 1 {
505            // Only one tier - use it for both
506            let tier = tiers[0].backend.clone();
507            (tier.clone(), tier)
508        } else {
509            anyhow::bail!("At least one cache tier is required");
510        };
511
512        // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
513        let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
514            backend: l1_cache.clone(),
515        });
516
517        Ok(Self {
518            tiers: Some(tiers),
519            l1_cache: l1_backend,
520            l2_cache,
521            l2_cache_concrete: None,
522            streaming_backend,
523            total_requests: Arc::new(AtomicU64::new(0)),
524            l1_hits: Arc::new(AtomicU64::new(0)),
525            l2_hits: Arc::new(AtomicU64::new(0)),
526            misses: Arc::new(AtomicU64::new(0)),
527            promotions: Arc::new(AtomicUsize::new(0)),
528            in_flight_requests: Arc::new(DashMap::new()),
529            invalidation_publisher: None,
530            invalidation_subscriber: None,
531            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
532        })
533    }
534
535    /// Start the invalidation subscriber background task
536    fn start_invalidation_subscriber(&self) {
537        if let Some(subscriber) = &self.invalidation_subscriber {
538            let l1_cache = Arc::clone(&self.l1_cache);
539            let l2_cache_concrete = self.l2_cache_concrete.clone();
540
541            subscriber.start(move |msg| {
542                let l1 = Arc::clone(&l1_cache);
543                let _l2 = l2_cache_concrete.clone();
544
545                async move {
546                    match msg {
547                        InvalidationMessage::Remove { key } => {
548                            // Remove from L1
549                            l1.remove(&key).await?;
550                            println!("🗑️  [Invalidation] Removed '{}' from L1", key);
551                        }
552                        InvalidationMessage::Update { key, value, ttl_secs } => {
553                            // Update L1 with new value
554                            let ttl = ttl_secs
555                                .map(Duration::from_secs)
556                                .unwrap_or_else(|| Duration::from_secs(300));
557                            l1.set_with_ttl(&key, value, ttl).await?;
558                            println!("🔄 [Invalidation] Updated '{}' in L1", key);
559                        }
560                        InvalidationMessage::RemovePattern { pattern } => {
561                            // For pattern-based invalidation, we can't easily iterate L1 cache
562                            // So we just log it. The pattern invalidation is mainly for L2.
563                            // L1 entries will naturally expire via TTL.
564                            println!("🔍 [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
565                        }
566                        InvalidationMessage::RemoveBulk { keys } => {
567                            // Remove multiple keys from L1
568                            for key in keys {
569                                if let Err(e) = l1.remove(&key).await {
570                                    eprintln!("⚠️  Failed to remove '{}' from L1: {}", key, e);
571                                }
572                            }
573                            println!("🗑️  [Invalidation] Bulk removed keys from L1");
574                        }
575                    }
576                    Ok(())
577                }
578            });
579
580            println!("📡 Invalidation subscriber started");
581        }
582    }
583
584    /// Get value from cache using multi-tier architecture (v0.5.0+)
585    ///
586    /// This method iterates through all configured tiers and automatically promotes
587    /// to upper tiers on cache hit.
588    async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
589        let tiers = self.tiers.as_ref().unwrap(); // Safe: only called when tiers is Some
590
591        // Try each tier sequentially (sorted by tier_level)
592        for (tier_index, tier) in tiers.iter().enumerate() {
593            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
594                // Cache hit!
595                tier.record_hit();
596
597                // Promote to all upper tiers (if promotion enabled)
598                if tier.promotion_enabled && tier_index > 0 {
599                    let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
600
601                    // Promote to all tiers above this one
602                    for upper_tier in tiers[..tier_index].iter().rev() {
603                        if let Err(e) = upper_tier.set_with_ttl(key, value.clone(), promotion_ttl).await {
604                            eprintln!(
605                                "⚠️  Failed to promote '{}' from L{} to L{}: {}",
606                                key, tier.tier_level, upper_tier.tier_level, e
607                            );
608                        } else {
609                            self.promotions.fetch_add(1, Ordering::Relaxed);
610                            println!(
611                                "⬆️  Promoted '{}' from L{} to L{} (TTL: {:?})",
612                                key, tier.tier_level, upper_tier.tier_level, promotion_ttl
613                            );
614                        }
615                    }
616                }
617
618                return Ok(Some(value));
619            }
620        }
621
622        // Cache miss across all tiers
623        Ok(None)
624    }
625
626    /// Get value from cache (L1 first, then L2 fallback with promotion)
627    ///
628    /// This method now includes built-in Cache Stampede protection when cache misses occur.
629    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
630    /// unnecessary duplicate work on external data sources.
631    ///
632    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
633    ///
634    /// # Arguments
635    /// * `key` - Cache key to retrieve
636    ///
637    /// # Returns
638    /// * `Ok(Some(value))` - Cache hit, value found in any tier
639    /// * `Ok(None)` - Cache miss, value not found in any cache
640    /// * `Err(error)` - Cache operation failed
641    pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
642        self.total_requests.fetch_add(1, Ordering::Relaxed);
643
644        // NEW: Multi-tier mode (v0.5.0+)
645        if self.tiers.is_some() {
646            // Fast path for L1 (first tier) - no locking needed
647            if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
648                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
649                    tier1.record_hit();
650                    // Update legacy stats for backward compatibility
651                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
652                    return Ok(Some(value));
653                }
654            }
655
656            // L1 miss - use stampede protection for lower tiers
657            let key_owned = key.to_string();
658            let lock_guard = self.in_flight_requests
659                .entry(key_owned.clone())
660                .or_insert_with(|| Arc::new(Mutex::new(())))
661                .clone();
662
663            let _guard = lock_guard.lock().await;
664            let cleanup_guard = CleanupGuard {
665                map: &self.in_flight_requests,
666                key: key_owned.clone(),
667            };
668
669            // Double-check L1 after acquiring lock
670            if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
671                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
672                    tier1.record_hit();
673                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
674                    return Ok(Some(value));
675                }
676            }
677
678            // Check remaining tiers with promotion
679            let result = self.get_multi_tier(key).await?;
680
681            if result.is_some() {
682                // Hit in L2+ tier - update legacy stats
683                if self.tiers.as_ref().unwrap().len() >= 2 {
684                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
685                }
686            } else {
687                self.misses.fetch_add(1, Ordering::Relaxed);
688            }
689
690            drop(cleanup_guard);
691            return Ok(result);
692        }
693
694        // LEGACY: 2-tier mode (L1 + L2)
695        // Fast path: Try L1 first (no locking needed)
696        if let Some(value) = self.l1_cache.get(key).await {
697            self.l1_hits.fetch_add(1, Ordering::Relaxed);
698            return Ok(Some(value));
699        }
700        
701        // L1 miss - implement Cache Stampede protection for L2 lookup
702        let key_owned = key.to_string();
703        let lock_guard = self.in_flight_requests
704            .entry(key_owned.clone())
705            .or_insert_with(|| Arc::new(Mutex::new(())))
706            .clone();
707        
708        let _guard = lock_guard.lock().await;
709        
710        // RAII cleanup guard - ensures entry is removed even on early return or panic
711        let cleanup_guard = CleanupGuard {
712            map: &self.in_flight_requests,
713            key: key_owned.clone(),
714        };
715        
716        // Double-check L1 cache after acquiring lock
717        // (Another concurrent request might have populated it while we were waiting)
718        if let Some(value) = self.l1_cache.get(key).await {
719            self.l1_hits.fetch_add(1, Ordering::Relaxed);
720            // cleanup_guard will auto-remove entry on drop
721            return Ok(Some(value));
722        }
723        
724        // Check L2 cache with TTL information
725        if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
726            self.l2_hits.fetch_add(1, Ordering::Relaxed);
727
728            // Promote to L1 with same TTL as Redis (or default if no TTL)
729            let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
730
731            if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
732                // L1 promotion failed, but we still have the data
733                eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
734            } else {
735                self.promotions.fetch_add(1, Ordering::Relaxed);
736                println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
737            }
738
739            // cleanup_guard will auto-remove entry on drop
740            return Ok(Some(value));
741        }
742        
743        // Both L1 and L2 miss
744        self.misses.fetch_add(1, Ordering::Relaxed);
745        
746        // cleanup_guard will auto-remove entry on drop
747        drop(cleanup_guard);
748        
749        Ok(None)
750    }
751    
752    /// Get value from cache with fallback computation (enhanced backward compatibility)
753    /// 
754    /// This is a convenience method that combines `get()` with optional computation.
755    /// If the value is not found in cache, it will execute the compute function
756    /// and cache the result automatically.
757    /// 
758    /// # Arguments
759    /// * `key` - Cache key
760    /// * `compute_fn` - Optional function to compute value if not in cache
761    /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
762    /// 
763    /// # Returns
764    /// * `Ok(Some(value))` - Value found in cache or computed successfully
765    /// * `Ok(None)` - Value not in cache and no compute function provided
766    /// * `Err(error)` - Cache operation or computation failed
767    /// 
768    /// # Example
769    /// ```ignore
770    /// // Simple cache get (existing behavior)
771    /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
772    ///
773    /// // Get with computation fallback (new enhanced behavior)
774    /// let api_data = cache_manager.get_with_fallback(
775    ///     "api_response",
776    ///     Some(|| async { fetch_data_from_api().await }),
777    ///     Some(CacheStrategy::RealTime)
778    /// ).await?;
779    /// ```
780    
781    /// Set value with specific cache strategy (all tiers)
782    ///
783    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
784    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
785    pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
786        let ttl = strategy.to_duration();
787
788        // NEW: Multi-tier mode (v0.5.0+)
789        if let Some(tiers) = &self.tiers {
790            // Store in ALL tiers with their respective TTL scaling
791            let mut success_count = 0;
792            let mut last_error = None;
793
794            for tier in tiers {
795                match tier.set_with_ttl(key, value.clone(), ttl).await {
796                    Ok(_) => {
797                        success_count += 1;
798                    }
799                    Err(e) => {
800                        eprintln!("⚠️ L{} cache set failed for key '{}': {}", tier.tier_level, key, e);
801                        last_error = Some(e);
802                    }
803                }
804            }
805
806            if success_count > 0 {
807                println!("💾 [Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
808                         key, success_count, tiers.len(), ttl);
809                return Ok(());
810            } else {
811                return Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{}'", key)));
812            }
813        }
814
815        // LEGACY: 2-tier mode (L1 + L2)
816        // Store in both L1 and L2
817        let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
818        let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
819
820        // Return success if at least one cache succeeded
821        match (l1_result, l2_result) {
822            (Ok(_), Ok(_)) => {
823                // Both succeeded
824                println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
825                Ok(())
826            }
827            (Ok(_), Err(_)) => {
828                // L1 succeeded, L2 failed
829                eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
830                println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
831                Ok(())
832            }
833            (Err(_), Ok(_)) => {
834                // L1 failed, L2 succeeded
835                eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
836                println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
837                Ok(())
838            }
839            (Err(e1), Err(_e2)) => {
840                // Both failed
841                Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
842            }
843        }
844    }
845    
846    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
847    /// 
848    /// This method provides comprehensive Cache Stampede protection:
849    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
850    /// 2. Check L2 cache with mutex-based coalescing
851    /// 3. Compute fresh data with protection against concurrent computations
852    /// 
853    /// # Arguments
854    /// * `key` - Cache key
855    /// * `strategy` - Cache strategy for TTL and storage behavior
856    /// * `compute_fn` - Async function to compute the value if not in any cache
857    /// 
858    /// # Example
859    /// ```ignore
860    /// let api_data = cache_manager.get_or_compute_with(
861    ///     "api_response",
862    ///     CacheStrategy::RealTime,
863    ///     || async {
864    ///         fetch_data_from_api().await
865    ///     }
866    /// ).await?;
867    /// ```
868    #[allow(dead_code)]
869    pub async fn get_or_compute_with<F, Fut>(
870        &self,
871        key: &str,
872        strategy: CacheStrategy,
873        compute_fn: F,
874    ) -> Result<serde_json::Value>
875    where
876        F: FnOnce() -> Fut + Send,
877        Fut: Future<Output = Result<serde_json::Value>> + Send,
878    {
879        self.total_requests.fetch_add(1, Ordering::Relaxed);
880        
881        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
882        if let Some(value) = self.l1_cache.get(key).await {
883            self.l1_hits.fetch_add(1, Ordering::Relaxed);
884            return Ok(value);
885        }
886        
887        // 2. L1 miss - try L2 with Cache Stampede protection
888        let key_owned = key.to_string();
889        let lock_guard = self.in_flight_requests
890            .entry(key_owned.clone())
891            .or_insert_with(|| Arc::new(Mutex::new(())))
892            .clone();
893        
894        let _guard = lock_guard.lock().await;
895        
896        // RAII cleanup guard - ensures entry is removed even on early return or panic
897        let _cleanup_guard = CleanupGuard {
898            map: &self.in_flight_requests,
899            key: key_owned,
900        };
901        
902        // 3. Double-check L1 cache after acquiring lock
903        // (Another request might have populated it while we were waiting)
904        if let Some(value) = self.l1_cache.get(key).await {
905            self.l1_hits.fetch_add(1, Ordering::Relaxed);
906            // _cleanup_guard will auto-remove entry on drop
907            return Ok(value);
908        }
909
910        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
911        if let Some(tiers) = &self.tiers {
912            // Check tiers starting from index 1 (skip L1 since already checked)
913            for tier in tiers.iter().skip(1) {
914                if let Some((value, ttl)) = tier.get_with_ttl(key).await {
915                    tier.record_hit();
916
917                    // Promote to L1 (first tier)
918                    let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
919                    if let Err(e) = tiers[0].set_with_ttl(key, value.clone(), promotion_ttl).await {
920                        eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}", key, tier.tier_level, e);
921                    } else {
922                        self.promotions.fetch_add(1, Ordering::Relaxed);
923                        println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
924                                key, tier.tier_level, promotion_ttl);
925                    }
926
927                    // _cleanup_guard will auto-remove entry on drop
928                    return Ok(value);
929                }
930            }
931        } else {
932            // LEGACY: Check L2 cache with TTL
933            if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
934                self.l2_hits.fetch_add(1, Ordering::Relaxed);
935
936                // Promote to L1 using Redis TTL (or strategy TTL as fallback)
937                let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
938
939                if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
940                    eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
941                } else {
942                    self.promotions.fetch_add(1, Ordering::Relaxed);
943                    println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
944                }
945
946                // _cleanup_guard will auto-remove entry on drop
947                return Ok(value);
948            }
949        }
950
951        // 5. Cache miss across all tiers - compute fresh data
952        println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
953        let fresh_data = compute_fn().await?;
954        
955        // 6. Store in both caches
956        if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
957            eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
958        }
959        
960        // 7. _cleanup_guard will auto-remove entry on drop
961
962        Ok(fresh_data)
963    }
964
965    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
966    ///
967    /// This method provides the same functionality as `get_or_compute_with()` but with
968    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
969    /// API calls, or any computation that returns structured data.
970    ///
971    /// # Type Safety
972    ///
973    /// - Returns your actual type `T` instead of `serde_json::Value`
974    /// - Compiler enforces Serialize + DeserializeOwned bounds
975    /// - No manual JSON conversion needed
976    ///
977    /// # Cache Flow
978    ///
979    /// 1. Check L1 cache → deserialize if found
980    /// 2. Check L2 cache → deserialize + promote to L1 if found
981    /// 3. Execute compute_fn → serialize → store in L1+L2
982    /// 4. Full stampede protection (only ONE request computes)
983    ///
984    /// # Arguments
985    ///
986    /// * `key` - Cache key
987    /// * `strategy` - Cache strategy for TTL
988    /// * `compute_fn` - Async function returning `Result<T>`
989    ///
990    /// # Example - Database Query
991    ///
992    /// ```no_run
993    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
994    /// # use std::sync::Arc;
995    /// # use serde::{Serialize, Deserialize};
996    /// # async fn example() -> anyhow::Result<()> {
997    /// # let l1 = Arc::new(L1Cache::new().await?);
998    /// # let l2 = Arc::new(L2Cache::new().await?);
999    /// # let cache_manager = CacheManager::new(l1, l2);
1000    ///
1001    /// #[derive(Serialize, Deserialize)]
1002    /// struct User {
1003    ///     id: i64,
1004    ///     name: String,
1005    /// }
1006    ///
1007    /// // Type-safe database caching (example - requires sqlx)
1008    /// // let user: User = cache_manager.get_or_compute_typed(
1009    /// //     "user:123",
1010    /// //     CacheStrategy::MediumTerm,
1011    /// //     || async {
1012    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1013    /// //             .bind(123)
1014    /// //             .fetch_one(&pool)
1015    /// //             .await
1016    /// //     }
1017    /// // ).await?;
1018    /// # Ok(())
1019    /// # }
1020    /// ```
1021    ///
1022    /// # Example - API Call
1023    ///
1024    /// ```no_run
1025    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1026    /// # use std::sync::Arc;
1027    /// # use serde::{Serialize, Deserialize};
1028    /// # async fn example() -> anyhow::Result<()> {
1029    /// # let l1 = Arc::new(L1Cache::new().await?);
1030    /// # let l2 = Arc::new(L2Cache::new().await?);
1031    /// # let cache_manager = CacheManager::new(l1, l2);
1032    /// #[derive(Serialize, Deserialize)]
1033    /// struct ApiResponse {
1034    ///     data: String,
1035    ///     timestamp: i64,
1036    /// }
1037    ///
1038    /// // API call caching (example - requires reqwest)
1039    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1040    /// //     "api:endpoint",
1041    /// //     CacheStrategy::RealTime,
1042    /// //     || async {
1043    /// //         reqwest::get("https://api.example.com/data")
1044    /// //             .await?
1045    /// //             .json::<ApiResponse>()
1046    /// //             .await
1047    /// //     }
1048    /// // ).await?;
1049    /// # Ok(())
1050    /// # }
1051    /// ```
1052    ///
1053    /// # Performance
1054    ///
1055    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1056    /// - L2 hit: 2-5ms + deserialization + L1 promotion
1057    /// - Compute: Your function time + serialization + L1+L2 storage
1058    /// - Stampede protection: 99.6% latency reduction under high concurrency
1059    ///
1060    /// # Errors
1061    ///
1062    /// Returns error if:
1063    /// - Compute function fails
1064    /// - Serialization fails (invalid type for JSON)
1065    /// - Deserialization fails (cache data doesn't match type T)
1066    /// - Cache operations fail (Redis connection issues)
1067    pub async fn get_or_compute_typed<T, F, Fut>(
1068        &self,
1069        key: &str,
1070        strategy: CacheStrategy,
1071        compute_fn: F,
1072    ) -> Result<T>
1073    where
1074        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1075        F: FnOnce() -> Fut + Send,
1076        Fut: Future<Output = Result<T>> + Send,
1077    {
1078        self.total_requests.fetch_add(1, Ordering::Relaxed);
1079
1080        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1081        if let Some(cached_json) = self.l1_cache.get(key).await {
1082            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1083
1084            // Attempt to deserialize from JSON to type T
1085            match serde_json::from_value::<T>(cached_json) {
1086                Ok(typed_value) => {
1087                    println!("✅ [L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
1088                    return Ok(typed_value);
1089                }
1090                Err(e) => {
1091                    // Deserialization failed - cache data may be stale or corrupt
1092                    eprintln!("⚠️ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1093                    // Fall through to recompute
1094                }
1095            }
1096        }
1097
1098        // 2. L1 miss - try L2 with Cache Stampede protection
1099        let key_owned = key.to_string();
1100        let lock_guard = self.in_flight_requests
1101            .entry(key_owned.clone())
1102            .or_insert_with(|| Arc::new(Mutex::new(())))
1103            .clone();
1104
1105        let _guard = lock_guard.lock().await;
1106
1107        // RAII cleanup guard - ensures entry is removed even on early return or panic
1108        let _cleanup_guard = CleanupGuard {
1109            map: &self.in_flight_requests,
1110            key: key_owned,
1111        };
1112
1113        // 3. Double-check L1 cache after acquiring lock
1114        // (Another request might have populated it while we were waiting)
1115        if let Some(cached_json) = self.l1_cache.get(key).await {
1116            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1117            if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1118                println!("✅ [L1 HIT] Deserialized '{}' after lock acquisition", key);
1119                return Ok(typed_value);
1120            }
1121        }
1122
1123        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1124        if let Some(tiers) = &self.tiers {
1125            // Check tiers starting from index 1 (skip L1 since already checked)
1126            for tier in tiers.iter().skip(1) {
1127                if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1128                    tier.record_hit();
1129
1130                    // Attempt to deserialize
1131                    match serde_json::from_value::<T>(cached_json.clone()) {
1132                        Ok(typed_value) => {
1133                            println!("✅ [L{} HIT] Deserialized '{}' to type {}",
1134                                    tier.tier_level, key, std::any::type_name::<T>());
1135
1136                            // Promote to L1 (first tier)
1137                            let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1138                            if let Err(e) = tiers[0].set_with_ttl(key, cached_json, promotion_ttl).await {
1139                                eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}",
1140                                         key, tier.tier_level, e);
1141                            } else {
1142                                self.promotions.fetch_add(1, Ordering::Relaxed);
1143                                println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1144                                        key, tier.tier_level, promotion_ttl);
1145                            }
1146
1147                            return Ok(typed_value);
1148                        }
1149                        Err(e) => {
1150                            eprintln!("⚠️ L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1151                                     tier.tier_level, key, e);
1152                            // Continue to next tier
1153                        }
1154                    }
1155                }
1156            }
1157        } else {
1158            // LEGACY: Check L2 cache with TTL
1159            if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1160                self.l2_hits.fetch_add(1, Ordering::Relaxed);
1161
1162                // Attempt to deserialize
1163                match serde_json::from_value::<T>(cached_json.clone()) {
1164                    Ok(typed_value) => {
1165                        println!("✅ [L2 HIT] Deserialized '{}' from Redis", key);
1166
1167                        // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1168                        let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1169
1170                        if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
1171                            eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
1172                        } else {
1173                            self.promotions.fetch_add(1, Ordering::Relaxed);
1174                            println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
1175                        }
1176
1177                        return Ok(typed_value);
1178                    }
1179                    Err(e) => {
1180                        eprintln!("⚠️ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1181                        // Fall through to recompute
1182                    }
1183                }
1184            }
1185        }
1186
1187        // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1188        println!("💻 Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
1189        let typed_value = compute_fn().await?;
1190
1191        // 6. Serialize to JSON for storage
1192        let json_value = serde_json::to_value(&typed_value)
1193            .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
1194
1195        // 7. Store in both L1 and L2 caches
1196        if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1197            eprintln!("⚠️ Failed to cache computed typed data for key '{}': {}", key, e);
1198        } else {
1199            println!("💾 Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
1200        }
1201
1202        // 8. _cleanup_guard will auto-remove entry on drop
1203
1204        Ok(typed_value)
1205    }
1206
1207    /// Get comprehensive cache statistics
1208    ///
1209    /// In multi-tier mode, aggregates statistics from all tiers.
1210    /// In legacy mode, returns L1 and L2 stats.
1211    #[allow(dead_code)]
1212    pub fn get_stats(&self) -> CacheManagerStats {
1213        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1214        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1215        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1216        let misses = self.misses.load(Ordering::Relaxed);
1217
1218        CacheManagerStats {
1219            total_requests: total_reqs,
1220            l1_hits,
1221            l2_hits,
1222            total_hits: l1_hits + l2_hits,
1223            misses,
1224            hit_rate: if total_reqs > 0 {
1225                ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1226            } else { 0.0 },
1227            l1_hit_rate: if total_reqs > 0 {
1228                (l1_hits as f64 / total_reqs as f64) * 100.0
1229            } else { 0.0 },
1230            promotions: self.promotions.load(Ordering::Relaxed),
1231            in_flight_requests: self.in_flight_requests.len(),
1232        }
1233    }
1234
1235    /// Get per-tier statistics (v0.5.0+)
1236    ///
1237    /// Returns statistics for each tier if multi-tier mode is enabled.
1238    /// Returns None if using legacy 2-tier mode.
1239    ///
1240    /// # Example
1241    /// ```rust,ignore
1242    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1243    ///     for stats in tier_stats {
1244    ///         println!("L{}: {} hits ({})",
1245    ///                  stats.tier_level,
1246    ///                  stats.hit_count(),
1247    ///                  stats.backend_name);
1248    ///     }
1249    /// }
1250    /// ```
1251    pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1252        self.tiers.as_ref().map(|tiers| {
1253            tiers.iter().map(|tier| tier.stats.clone()).collect()
1254        })
1255    }
1256    
1257    // ===== Redis Streams Methods =====
1258    
1259    /// Publish data to Redis Stream
1260    ///
1261    /// # Arguments
1262    /// * `stream_key` - Name of the stream (e.g., "events_stream")
1263    /// * `fields` - Field-value pairs to publish
1264    /// * `maxlen` - Optional max length for stream trimming
1265    ///
1266    /// # Returns
1267    /// The entry ID generated by Redis
1268    ///
1269    /// # Errors
1270    /// Returns error if streaming backend is not configured
1271    pub async fn publish_to_stream(
1272        &self,
1273        stream_key: &str,
1274        fields: Vec<(String, String)>,
1275        maxlen: Option<usize>
1276    ) -> Result<String> {
1277        match &self.streaming_backend {
1278            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1279            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1280        }
1281    }
1282    
1283    /// Read latest entries from Redis Stream
1284    ///
1285    /// # Arguments
1286    /// * `stream_key` - Name of the stream
1287    /// * `count` - Number of latest entries to retrieve
1288    ///
1289    /// # Returns
1290    /// Vector of (entry_id, fields) tuples (newest first)
1291    ///
1292    /// # Errors
1293    /// Returns error if streaming backend is not configured
1294    pub async fn read_stream_latest(
1295        &self,
1296        stream_key: &str,
1297        count: usize
1298    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1299        match &self.streaming_backend {
1300            Some(backend) => backend.stream_read_latest(stream_key, count).await,
1301            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1302        }
1303    }
1304    
1305    /// Read from Redis Stream with optional blocking
1306    ///
1307    /// # Arguments
1308    /// * `stream_key` - Name of the stream
1309    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1310    /// * `count` - Max entries to retrieve
1311    /// * `block_ms` - Optional blocking timeout in ms
1312    ///
1313    /// # Returns
1314    /// Vector of (entry_id, fields) tuples
1315    ///
1316    /// # Errors
1317    /// Returns error if streaming backend is not configured
1318    pub async fn read_stream(
1319        &self,
1320        stream_key: &str,
1321        last_id: &str,
1322        count: usize,
1323        block_ms: Option<usize>
1324    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1325        match &self.streaming_backend {
1326            Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
1327            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1328        }
1329    }
1330
1331    // ===== Cache Invalidation Methods =====
1332
1333    /// Invalidate a cache key across all instances
1334    ///
1335    /// This removes the key from all cache tiers and broadcasts
1336    /// the invalidation to all other cache instances via Redis Pub/Sub.
1337    ///
1338    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1339    ///
1340    /// # Arguments
1341    /// * `key` - Cache key to invalidate
1342    ///
1343    /// # Example
1344    /// ```rust,ignore
1345    /// // Invalidate user cache after profile update
1346    /// cache_manager.invalidate("user:123").await?;
1347    /// ```
1348    pub async fn invalidate(&self, key: &str) -> Result<()> {
1349        // NEW: Multi-tier mode (v0.5.0+)
1350        if let Some(tiers) = &self.tiers {
1351            // Remove from ALL tiers
1352            for tier in tiers {
1353                if let Err(e) = tier.remove(key).await {
1354                    eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1355                }
1356            }
1357        } else {
1358            // LEGACY: 2-tier mode
1359            self.l1_cache.remove(key).await?;
1360            self.l2_cache.remove(key).await?;
1361        }
1362
1363        // Broadcast to other instances
1364        if let Some(publisher) = &self.invalidation_publisher {
1365            let mut pub_lock = publisher.lock().await;
1366            let msg = InvalidationMessage::remove(key);
1367            pub_lock.publish(&msg).await?;
1368            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1369        }
1370
1371        println!("🗑️  Invalidated '{}' across all instances", key);
1372        Ok(())
1373    }
1374
1375    /// Update cache value across all instances
1376    ///
1377    /// This updates the key in all cache tiers and broadcasts
1378    /// the update to all other cache instances, avoiding cache misses.
1379    ///
1380    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1381    ///
1382    /// # Arguments
1383    /// * `key` - Cache key to update
1384    /// * `value` - New value
1385    /// * `ttl` - Optional TTL (uses default if None)
1386    ///
1387    /// # Example
1388    /// ```rust,ignore
1389    /// // Update user cache with new data
1390    /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1391    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1392    /// ```
1393    pub async fn update_cache(
1394        &self,
1395        key: &str,
1396        value: serde_json::Value,
1397        ttl: Option<Duration>,
1398    ) -> Result<()> {
1399        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1400
1401        // NEW: Multi-tier mode (v0.5.0+)
1402        if let Some(tiers) = &self.tiers {
1403            // Update ALL tiers with their respective TTL scaling
1404            for tier in tiers {
1405                if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1406                    eprintln!("⚠️ Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1407                }
1408            }
1409        } else {
1410            // LEGACY: 2-tier mode
1411            self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1412            self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1413        }
1414
1415        // Broadcast update to other instances
1416        if let Some(publisher) = &self.invalidation_publisher {
1417            let mut pub_lock = publisher.lock().await;
1418            let msg = InvalidationMessage::update(key, value, Some(ttl));
1419            pub_lock.publish(&msg).await?;
1420            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1421        }
1422
1423        println!("🔄 Updated '{}' across all instances", key);
1424        Ok(())
1425    }
1426
1427    /// Invalidate all keys matching a pattern
1428    ///
1429    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1430    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1431    ///
1432    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1433    ///
1434    /// **Note**: Pattern scanning requires a concrete L2Cache instance with `scan_keys()`.
1435    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1436    ///
1437    /// # Arguments
1438    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1439    ///
1440    /// # Example
1441    /// ```rust,ignore
1442    /// // Invalidate all user caches
1443    /// cache_manager.invalidate_pattern("user:*").await?;
1444    ///
1445    /// // Invalidate specific user's related caches
1446    /// cache_manager.invalidate_pattern("user:123:*").await?;
1447    /// ```
1448    pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1449        // Scan L2 for matching keys
1450        // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1451        let keys = if let Some(l2) = &self.l2_cache_concrete {
1452            l2.scan_keys(pattern).await?
1453        } else {
1454            return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
1455        };
1456
1457        if keys.is_empty() {
1458            println!("🔍 No keys found matching pattern '{}'", pattern);
1459            return Ok(());
1460        }
1461
1462        // NEW: Multi-tier mode (v0.5.0+)
1463        if let Some(tiers) = &self.tiers {
1464            // Remove from ALL tiers
1465            for key in &keys {
1466                for tier in tiers {
1467                    if let Err(e) = tier.remove(key).await {
1468                        eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1469                    }
1470                }
1471            }
1472        } else {
1473            // LEGACY: 2-tier mode - Remove from L2 in bulk
1474            if let Some(l2) = &self.l2_cache_concrete {
1475                l2.remove_bulk(&keys).await?;
1476            }
1477        }
1478
1479        // Broadcast pattern invalidation
1480        if let Some(publisher) = &self.invalidation_publisher {
1481            let mut pub_lock = publisher.lock().await;
1482            let msg = InvalidationMessage::remove_bulk(keys.clone());
1483            pub_lock.publish(&msg).await?;
1484            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1485        }
1486
1487        println!("🔍 Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
1488        Ok(())
1489    }
1490
1491    /// Set value with automatic broadcast to all instances
1492    ///
1493    /// This is a write-through operation that updates the cache and
1494    /// broadcasts the update to all other instances automatically.
1495    ///
1496    /// # Arguments
1497    /// * `key` - Cache key
1498    /// * `value` - Value to cache
1499    /// * `strategy` - Cache strategy (determines TTL)
1500    ///
1501    /// # Example
1502    /// ```rust,ignore
1503    /// // Update and broadcast in one call
1504    /// let data = serde_json::json!({"status": "active"});
1505    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1506    /// ```
1507    pub async fn set_with_broadcast(
1508        &self,
1509        key: &str,
1510        value: serde_json::Value,
1511        strategy: CacheStrategy,
1512    ) -> Result<()> {
1513        let ttl = strategy.to_duration();
1514
1515        // Set in local caches
1516        self.set_with_strategy(key, value.clone(), strategy).await?;
1517
1518        // Broadcast update if invalidation is enabled
1519        if let Some(publisher) = &self.invalidation_publisher {
1520            let mut pub_lock = publisher.lock().await;
1521            let msg = InvalidationMessage::update(key, value, Some(ttl));
1522            pub_lock.publish(&msg).await?;
1523            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1524        }
1525
1526        Ok(())
1527    }
1528
1529    /// Get invalidation statistics
1530    ///
1531    /// Returns statistics about invalidation operations if invalidation is enabled.
1532    pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1533        if self.invalidation_subscriber.is_some() {
1534            Some(self.invalidation_stats.snapshot())
1535        } else {
1536            None
1537        }
1538    }
1539}
1540
1541/// Cache Manager statistics
1542#[allow(dead_code)]
1543#[derive(Debug, Clone)]
1544pub struct CacheManagerStats {
1545    pub total_requests: u64,
1546    pub l1_hits: u64,
1547    pub l2_hits: u64,
1548    pub total_hits: u64,
1549    pub misses: u64,
1550    pub hit_rate: f64,
1551    pub l1_hit_rate: f64,
1552    pub promotions: usize,
1553    pub in_flight_requests: usize,
1554}