multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use crate::backends::{L1Cache, L2Cache};
15use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
16use super::invalidation::{
17    InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
18    InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
19};
20
21/// RAII cleanup guard for in-flight request tracking
22/// Ensures that entries are removed from DashMap even on early return or panic
23struct CleanupGuard<'a> {
24    map: &'a DashMap<String, Arc<Mutex<()>>>,
25    key: String,
26}
27
28impl<'a> Drop for CleanupGuard<'a> {
29    fn drop(&mut self) {
30        self.map.remove(&self.key);
31    }
32}
33
34/// Cache strategies for different data types
35#[derive(Debug, Clone)]
36#[allow(dead_code)]
37pub enum CacheStrategy {
38    /// Real-time data - 10 seconds TTL
39    RealTime,
40    /// Short-term data - 5 minutes TTL  
41    ShortTerm,
42    /// Medium-term data - 1 hour TTL
43    MediumTerm,
44    /// Long-term data - 3 hours TTL
45    LongTerm,
46    /// Custom TTL
47    Custom(Duration),
48    /// Default strategy (5 minutes)
49    Default,
50}
51
52impl CacheStrategy {
53    /// Convert strategy to duration
54    pub fn to_duration(&self) -> Duration {
55        match self {
56            Self::RealTime => Duration::from_secs(10),
57            Self::ShortTerm => Duration::from_secs(300), // 5 minutes
58            Self::MediumTerm => Duration::from_secs(3600), // 1 hour
59            Self::LongTerm => Duration::from_secs(10800), // 3 hours
60            Self::Custom(duration) => *duration,
61            Self::Default => Duration::from_secs(300),
62        }
63    }
64}
65
66/// Statistics for a single cache tier
67#[derive(Debug)]
68pub struct TierStats {
69    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
70    pub tier_level: usize,
71    /// Number of cache hits at this tier
72    pub hits: AtomicU64,
73    /// Backend name for identification
74    pub backend_name: String,
75}
76
77impl Clone for TierStats {
78    fn clone(&self) -> Self {
79        Self {
80            tier_level: self.tier_level,
81            hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
82            backend_name: self.backend_name.clone(),
83        }
84    }
85}
86
87impl TierStats {
88    fn new(tier_level: usize, backend_name: String) -> Self {
89        Self {
90            tier_level,
91            hits: AtomicU64::new(0),
92            backend_name,
93        }
94    }
95
96    /// Get current hit count
97    pub fn hit_count(&self) -> u64 {
98        self.hits.load(Ordering::Relaxed)
99    }
100}
101
102/// A single cache tier in the multi-tier architecture
103pub struct CacheTier {
104    /// Cache backend for this tier
105    backend: Arc<dyn L2CacheBackend>,
106    /// Tier level (1 = hottest/fastest, higher = colder/slower)
107    tier_level: usize,
108    /// Enable automatic promotion to upper tiers on cache hit
109    promotion_enabled: bool,
110    /// TTL scale factor (multiplier for TTL when storing/promoting)
111    ttl_scale: f64,
112    /// Statistics for this tier
113    stats: TierStats,
114}
115
116impl CacheTier {
117    /// Create a new cache tier
118    pub fn new(
119        backend: Arc<dyn L2CacheBackend>,
120        tier_level: usize,
121        promotion_enabled: bool,
122        ttl_scale: f64,
123    ) -> Self {
124        let backend_name = backend.name().to_string();
125        Self {
126            backend,
127            tier_level,
128            promotion_enabled,
129            ttl_scale,
130            stats: TierStats::new(tier_level, backend_name),
131        }
132    }
133
134    /// Get value with TTL from this tier
135    async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
136        self.backend.get_with_ttl(key).await
137    }
138
139    /// Set value with TTL in this tier
140    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
141        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
142        self.backend.set_with_ttl(key, value, scaled_ttl).await
143    }
144
145    /// Remove value from this tier
146    async fn remove(&self, key: &str) -> Result<()> {
147        self.backend.remove(key).await
148    }
149
150    /// Record a cache hit for this tier
151    fn record_hit(&self) {
152        self.stats.hits.fetch_add(1, Ordering::Relaxed);
153    }
154}
155
156/// Configuration for a cache tier (used in builder pattern)
157#[derive(Debug, Clone)]
158pub struct TierConfig {
159    /// Tier level (1, 2, 3, 4...)
160    pub tier_level: usize,
161    /// Enable promotion to upper tiers on hit
162    pub promotion_enabled: bool,
163    /// TTL scale factor (1.0 = same as base TTL)
164    pub ttl_scale: f64,
165}
166
167impl TierConfig {
168    /// Create new tier configuration
169    pub fn new(tier_level: usize) -> Self {
170        Self {
171            tier_level,
172            promotion_enabled: true,
173            ttl_scale: 1.0,
174        }
175    }
176
177    /// Configure as L1 (hot tier)
178    pub fn as_l1() -> Self {
179        Self {
180            tier_level: 1,
181            promotion_enabled: false, // L1 is already top tier
182            ttl_scale: 1.0,
183        }
184    }
185
186    /// Configure as L2 (warm tier)
187    pub fn as_l2() -> Self {
188        Self {
189            tier_level: 2,
190            promotion_enabled: true,
191            ttl_scale: 1.0,
192        }
193    }
194
195    /// Configure as L3 (cold tier) with longer TTL
196    pub fn as_l3() -> Self {
197        Self {
198            tier_level: 3,
199            promotion_enabled: true,
200            ttl_scale: 2.0, // Keep data 2x longer
201        }
202    }
203
204    /// Configure as L4 (archive tier) with much longer TTL
205    pub fn as_l4() -> Self {
206        Self {
207            tier_level: 4,
208            promotion_enabled: true,
209            ttl_scale: 8.0, // Keep data 8x longer
210        }
211    }
212
213    /// Set promotion enabled
214    pub fn with_promotion(mut self, enabled: bool) -> Self {
215        self.promotion_enabled = enabled;
216        self
217    }
218
219    /// Set TTL scale factor
220    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
221        self.ttl_scale = scale;
222        self
223    }
224
225    /// Set tier level
226    pub fn with_level(mut self, level: usize) -> Self {
227        self.tier_level = level;
228        self
229    }
230}
231
232/// Proxy wrapper to convert L2CacheBackend to CacheBackend
233/// (Rust doesn't support automatic trait upcasting for trait objects)
234struct ProxyCacheBackend {
235    backend: Arc<dyn L2CacheBackend>,
236}
237
238#[async_trait::async_trait]
239impl CacheBackend for ProxyCacheBackend {
240    async fn get(&self, key: &str) -> Option<serde_json::Value> {
241        self.backend.get(key).await
242    }
243
244    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
245        self.backend.set_with_ttl(key, value, ttl).await
246    }
247
248    async fn remove(&self, key: &str) -> Result<()> {
249        self.backend.remove(key).await
250    }
251
252    async fn health_check(&self) -> bool {
253        self.backend.health_check().await
254    }
255
256    fn name(&self) -> &str {
257        self.backend.name()
258    }
259}
260
261/// Cache Manager - Unified operations across multiple cache tiers
262///
263/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
264/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
265/// legacy L1+L2 behavior for backward compatibility.
266pub struct CacheManager {
267    /// Dynamic multi-tier cache architecture (v0.5.0+)
268    /// If Some, this takes precedence over l1_cache/l2_cache fields
269    tiers: Option<Vec<CacheTier>>,
270
271    // ===== Legacy fields (v0.1.0 - v0.4.x) =====
272    // Maintained for backward compatibility
273    /// L1 Cache (trait object for pluggable backends)
274    l1_cache: Arc<dyn CacheBackend>,
275    /// L2 Cache (trait object for pluggable backends)
276    l2_cache: Arc<dyn L2CacheBackend>,
277    /// L2 Cache concrete instance (for invalidation scan_keys)
278    l2_cache_concrete: Option<Arc<L2Cache>>,
279
280    /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
281    streaming_backend: Option<Arc<dyn StreamingBackend>>,
282    /// Statistics (AtomicU64 is already thread-safe, no Arc needed)
283    total_requests: AtomicU64,
284    l1_hits: AtomicU64,
285    l2_hits: AtomicU64,
286    misses: AtomicU64,
287    promotions: AtomicUsize,
288    /// In-flight requests to prevent Cache Stampede on L2/compute operations
289    in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
290    /// Invalidation publisher (for broadcasting invalidation messages)
291    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
292    /// Invalidation subscriber (for receiving invalidation messages)
293    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
294    /// Invalidation statistics
295    invalidation_stats: Arc<AtomicInvalidationStats>,
296}
297
298impl CacheManager {
299    /// Create new cache manager with trait objects (pluggable backends)
300    ///
301    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
302    ///
303    /// # Arguments
304    ///
305    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
306    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
307    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
308    ///
309    /// # Example
310    ///
311    /// ```rust,ignore
312    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
313    /// use std::sync::Arc;
314    ///
315    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
316    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
317    ///
318    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
319    /// ```
320    pub async fn new_with_backends(
321        l1_cache: Arc<dyn CacheBackend>,
322        l2_cache: Arc<dyn L2CacheBackend>,
323        streaming_backend: Option<Arc<dyn StreamingBackend>>,
324    ) -> Result<Self> {
325        println!("  🎯 Initializing Cache Manager with custom backends...");
326        println!("    L1: {}", l1_cache.name());
327        println!("    L2: {}", l2_cache.name());
328        if streaming_backend.is_some() {
329            println!("    Streaming: enabled");
330        } else {
331            println!("    Streaming: disabled");
332        }
333
334        Ok(Self {
335            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
336            l1_cache,
337            l2_cache,
338            l2_cache_concrete: None,
339            streaming_backend,
340            total_requests: AtomicU64::new(0),
341            l1_hits: AtomicU64::new(0),
342            l2_hits: AtomicU64::new(0),
343            misses: AtomicU64::new(0),
344            promotions: AtomicUsize::new(0),
345            in_flight_requests: Arc::new(DashMap::new()),
346            invalidation_publisher: None,
347            invalidation_subscriber: None,
348            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
349        })
350    }
351
352    /// Create new cache manager with default backends (backward compatible)
353    ///
354    /// This is the legacy constructor maintained for backward compatibility.
355    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
356    ///
357    /// # Arguments
358    ///
359    /// * `l1_cache` - Moka L1 cache instance
360    /// * `l2_cache` - Redis L2 cache instance
361    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
362        println!("  🎯 Initializing Cache Manager...");
363
364        // Convert concrete types to trait objects
365        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
366        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
367
368        // Create RedisStreams backend for streaming functionality
369        let redis_url = std::env::var("REDIS_URL")
370            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
371        let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
372        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
373
374        Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
375    }
376
377    /// Create new cache manager with invalidation support
378    ///
379    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
380    ///
381    /// # Arguments
382    ///
383    /// * `l1_cache` - Moka L1 cache instance
384    /// * `l2_cache` - Redis L2 cache instance
385    /// * `redis_url` - Redis connection URL for Pub/Sub
386    /// * `config` - Invalidation configuration
387    ///
388    /// # Example
389    ///
390    /// ```rust,ignore
391    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
392    ///
393    /// let config = InvalidationConfig {
394    ///     channel: "my_app:cache:invalidate".to_string(),
395    ///     ..Default::default()
396    /// };
397    ///
398    /// let manager = CacheManager::new_with_invalidation(
399    ///     l1, l2, "redis://localhost", config
400    /// ).await?;
401    /// ```
402    pub async fn new_with_invalidation(
403        l1_cache: Arc<L1Cache>,
404        l2_cache: Arc<L2Cache>,
405        redis_url: &str,
406        config: InvalidationConfig,
407    ) -> Result<Self> {
408        println!("  🎯 Initializing Cache Manager with Invalidation...");
409        println!("    Pub/Sub channel: {}", config.channel);
410
411        // Convert concrete types to trait objects
412        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
413        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
414
415        // Create RedisStreams backend for streaming functionality
416        let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
417        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
418
419        // Create publisher
420        let client = redis::Client::open(redis_url)?;
421        let conn_manager = redis::aio::ConnectionManager::new(client).await?;
422        let publisher = InvalidationPublisher::new(conn_manager, config.clone());
423
424        // Create subscriber
425        let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
426        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
427
428        let manager = Self {
429            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
430            l1_cache: l1_backend,
431            l2_cache: l2_backend,
432            l2_cache_concrete: Some(l2_cache),
433            streaming_backend: Some(streaming_backend),
434            total_requests: AtomicU64::new(0),
435            l1_hits: AtomicU64::new(0),
436            l2_hits: AtomicU64::new(0),
437            misses: AtomicU64::new(0),
438            promotions: AtomicUsize::new(0),
439            in_flight_requests: Arc::new(DashMap::new()),
440            invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
441            invalidation_subscriber: Some(Arc::new(subscriber)),
442            invalidation_stats,
443        };
444
445        // Start subscriber with handler
446        manager.start_invalidation_subscriber();
447
448        println!("  ✅ Cache Manager initialized with invalidation support");
449
450        Ok(manager)
451    }
452
453    /// Create new cache manager with multi-tier architecture (v0.5.0+)
454    ///
455    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
456    /// Tiers are checked in order (lower tier_level = faster/hotter).
457    ///
458    /// # Arguments
459    ///
460    /// * `tiers` - Vector of configured cache tiers (must be sorted by tier_level ascending)
461    /// * `streaming_backend` - Optional streaming backend
462    ///
463    /// # Example
464    ///
465    /// ```rust,ignore
466    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
467    /// use std::sync::Arc;
468    ///
469    /// // L1 + L2 + L3 setup
470    /// let l1 = Arc::new(L1Cache::new().await?);
471    /// let l2 = Arc::new(L2Cache::new().await?);
472    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
473    ///
474    /// let tiers = vec![
475    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
476    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
477    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
478    /// ];
479    ///
480    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
481    /// ```
482    pub async fn new_with_tiers(
483        tiers: Vec<CacheTier>,
484        streaming_backend: Option<Arc<dyn StreamingBackend>>,
485    ) -> Result<Self> {
486        println!("  🎯 Initializing Multi-Tier Cache Manager...");
487        println!("    Tier count: {}", tiers.len());
488        for tier in &tiers {
489            println!(
490                "    L{}: {} (promotion={}, ttl_scale={})",
491                tier.tier_level,
492                tier.stats.backend_name,
493                tier.promotion_enabled,
494                tier.ttl_scale
495            );
496        }
497
498        // Validate tiers are sorted by level
499        for i in 1..tiers.len() {
500            if tiers[i].tier_level <= tiers[i - 1].tier_level {
501                anyhow::bail!(
502                    "Tiers must be sorted by tier_level ascending (found L{} after L{})",
503                    tiers[i].tier_level,
504                    tiers[i - 1].tier_level
505                );
506            }
507        }
508
509        // For backward compatibility with legacy code, we need dummy l1/l2 caches
510        // Use first tier as l1, second tier as l2 if available
511        let (l1_cache, l2_cache) = if tiers.len() >= 2 {
512            (tiers[0].backend.clone(), tiers[1].backend.clone())
513        } else if tiers.len() == 1 {
514            // Only one tier - use it for both
515            let tier = tiers[0].backend.clone();
516            (tier.clone(), tier)
517        } else {
518            anyhow::bail!("At least one cache tier is required");
519        };
520
521        // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
522        let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
523            backend: l1_cache.clone(),
524        });
525
526        Ok(Self {
527            tiers: Some(tiers),
528            l1_cache: l1_backend,
529            l2_cache,
530            l2_cache_concrete: None,
531            streaming_backend,
532            total_requests: AtomicU64::new(0),
533            l1_hits: AtomicU64::new(0),
534            l2_hits: AtomicU64::new(0),
535            misses: AtomicU64::new(0),
536            promotions: AtomicUsize::new(0),
537            in_flight_requests: Arc::new(DashMap::new()),
538            invalidation_publisher: None,
539            invalidation_subscriber: None,
540            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
541        })
542    }
543
544    /// Start the invalidation subscriber background task
545    fn start_invalidation_subscriber(&self) {
546        if let Some(subscriber) = &self.invalidation_subscriber {
547            let l1_cache = Arc::clone(&self.l1_cache);
548            let l2_cache_concrete = self.l2_cache_concrete.clone();
549
550            subscriber.start(move |msg| {
551                let l1 = Arc::clone(&l1_cache);
552                let _l2 = l2_cache_concrete.clone();
553
554                async move {
555                    match msg {
556                        InvalidationMessage::Remove { key } => {
557                            // Remove from L1
558                            l1.remove(&key).await?;
559                            println!("🗑️  [Invalidation] Removed '{}' from L1", key);
560                        }
561                        InvalidationMessage::Update { key, value, ttl_secs } => {
562                            // Update L1 with new value
563                            let ttl = ttl_secs
564                                .map(Duration::from_secs)
565                                .unwrap_or_else(|| Duration::from_secs(300));
566                            l1.set_with_ttl(&key, value, ttl).await?;
567                            println!("🔄 [Invalidation] Updated '{}' in L1", key);
568                        }
569                        InvalidationMessage::RemovePattern { pattern } => {
570                            // For pattern-based invalidation, we can't easily iterate L1 cache
571                            // So we just log it. The pattern invalidation is mainly for L2.
572                            // L1 entries will naturally expire via TTL.
573                            println!("🔍 [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
574                        }
575                        InvalidationMessage::RemoveBulk { keys } => {
576                            // Remove multiple keys from L1
577                            for key in keys {
578                                if let Err(e) = l1.remove(&key).await {
579                                    eprintln!("⚠️  Failed to remove '{}' from L1: {}", key, e);
580                                }
581                            }
582                            println!("🗑️  [Invalidation] Bulk removed keys from L1");
583                        }
584                    }
585                    Ok(())
586                }
587            });
588
589            println!("📡 Invalidation subscriber started");
590        }
591    }
592
593    /// Get value from cache using multi-tier architecture (v0.5.0+)
594    ///
595    /// This method iterates through all configured tiers and automatically promotes
596    /// to upper tiers on cache hit.
597    async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
598        let tiers = self.tiers.as_ref().unwrap(); // Safe: only called when tiers is Some
599
600        // Try each tier sequentially (sorted by tier_level)
601        for (tier_index, tier) in tiers.iter().enumerate() {
602            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
603                // Cache hit!
604                tier.record_hit();
605
606                // Promote to all upper tiers (if promotion enabled)
607                if tier.promotion_enabled && tier_index > 0 {
608                    let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
609
610                    // Promote to all tiers above this one
611                    for upper_tier in tiers[..tier_index].iter().rev() {
612                        if let Err(e) = upper_tier.set_with_ttl(key, value.clone(), promotion_ttl).await {
613                            eprintln!(
614                                "⚠️  Failed to promote '{}' from L{} to L{}: {}",
615                                key, tier.tier_level, upper_tier.tier_level, e
616                            );
617                        } else {
618                            self.promotions.fetch_add(1, Ordering::Relaxed);
619                            println!(
620                                "⬆️  Promoted '{}' from L{} to L{} (TTL: {:?})",
621                                key, tier.tier_level, upper_tier.tier_level, promotion_ttl
622                            );
623                        }
624                    }
625                }
626
627                return Ok(Some(value));
628            }
629        }
630
631        // Cache miss across all tiers
632        Ok(None)
633    }
634
635    /// Get value from cache (L1 first, then L2 fallback with promotion)
636    ///
637    /// This method now includes built-in Cache Stampede protection when cache misses occur.
638    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
639    /// unnecessary duplicate work on external data sources.
640    ///
641    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
642    ///
643    /// # Arguments
644    /// * `key` - Cache key to retrieve
645    ///
646    /// # Returns
647    /// * `Ok(Some(value))` - Cache hit, value found in any tier
648    /// * `Ok(None)` - Cache miss, value not found in any cache
649    /// * `Err(error)` - Cache operation failed
650    pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
651        self.total_requests.fetch_add(1, Ordering::Relaxed);
652
653        // NEW: Multi-tier mode (v0.5.0+)
654        if self.tiers.is_some() {
655            // Fast path for L1 (first tier) - no locking needed
656            if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
657                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
658                    tier1.record_hit();
659                    // Update legacy stats for backward compatibility
660                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
661                    return Ok(Some(value));
662                }
663            }
664
665            // L1 miss - use stampede protection for lower tiers
666            let key_owned = key.to_string();
667            let lock_guard = self.in_flight_requests
668                .entry(key_owned.clone())
669                .or_insert_with(|| Arc::new(Mutex::new(())))
670                .clone();
671
672            let _guard = lock_guard.lock().await;
673            let cleanup_guard = CleanupGuard {
674                map: &self.in_flight_requests,
675                key: key_owned.clone(),
676            };
677
678            // Double-check L1 after acquiring lock
679            if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
680                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
681                    tier1.record_hit();
682                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
683                    return Ok(Some(value));
684                }
685            }
686
687            // Check remaining tiers with promotion
688            let result = self.get_multi_tier(key).await?;
689
690            if result.is_some() {
691                // Hit in L2+ tier - update legacy stats
692                if self.tiers.as_ref().unwrap().len() >= 2 {
693                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
694                }
695            } else {
696                self.misses.fetch_add(1, Ordering::Relaxed);
697            }
698
699            drop(cleanup_guard);
700            return Ok(result);
701        }
702
703        // LEGACY: 2-tier mode (L1 + L2)
704        // Fast path: Try L1 first (no locking needed)
705        if let Some(value) = self.l1_cache.get(key).await {
706            self.l1_hits.fetch_add(1, Ordering::Relaxed);
707            return Ok(Some(value));
708        }
709        
710        // L1 miss - implement Cache Stampede protection for L2 lookup
711        let key_owned = key.to_string();
712        let lock_guard = self.in_flight_requests
713            .entry(key_owned.clone())
714            .or_insert_with(|| Arc::new(Mutex::new(())))
715            .clone();
716        
717        let _guard = lock_guard.lock().await;
718        
719        // RAII cleanup guard - ensures entry is removed even on early return or panic
720        let cleanup_guard = CleanupGuard {
721            map: &self.in_flight_requests,
722            key: key_owned.clone(),
723        };
724        
725        // Double-check L1 cache after acquiring lock
726        // (Another concurrent request might have populated it while we were waiting)
727        if let Some(value) = self.l1_cache.get(key).await {
728            self.l1_hits.fetch_add(1, Ordering::Relaxed);
729            // cleanup_guard will auto-remove entry on drop
730            return Ok(Some(value));
731        }
732        
733        // Check L2 cache with TTL information
734        if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
735            self.l2_hits.fetch_add(1, Ordering::Relaxed);
736
737            // Promote to L1 with same TTL as Redis (or default if no TTL)
738            let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
739
740            if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
741                // L1 promotion failed, but we still have the data
742                eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
743            } else {
744                self.promotions.fetch_add(1, Ordering::Relaxed);
745                println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
746            }
747
748            // cleanup_guard will auto-remove entry on drop
749            return Ok(Some(value));
750        }
751        
752        // Both L1 and L2 miss
753        self.misses.fetch_add(1, Ordering::Relaxed);
754        
755        // cleanup_guard will auto-remove entry on drop
756        drop(cleanup_guard);
757        
758        Ok(None)
759    }
760    
761    /// Get value from cache with fallback computation (enhanced backward compatibility)
762    /// 
763    /// This is a convenience method that combines `get()` with optional computation.
764    /// If the value is not found in cache, it will execute the compute function
765    /// and cache the result automatically.
766    /// 
767    /// # Arguments
768    /// * `key` - Cache key
769    /// * `compute_fn` - Optional function to compute value if not in cache
770    /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
771    /// 
772    /// # Returns
773    /// * `Ok(Some(value))` - Value found in cache or computed successfully
774    /// * `Ok(None)` - Value not in cache and no compute function provided
775    /// * `Err(error)` - Cache operation or computation failed
776    /// 
777    /// # Example
778    /// ```ignore
779    /// // Simple cache get (existing behavior)
780    /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
781    ///
782    /// // Get with computation fallback (new enhanced behavior)
783    /// let api_data = cache_manager.get_with_fallback(
784    ///     "api_response",
785    ///     Some(|| async { fetch_data_from_api().await }),
786    ///     Some(CacheStrategy::RealTime)
787    /// ).await?;
788    /// ```
789    
790    /// Set value with specific cache strategy (all tiers)
791    ///
792    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
793    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
794    pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
795        let ttl = strategy.to_duration();
796
797        // NEW: Multi-tier mode (v0.5.0+)
798        if let Some(tiers) = &self.tiers {
799            // Store in ALL tiers with their respective TTL scaling
800            let mut success_count = 0;
801            let mut last_error = None;
802
803            for tier in tiers {
804                match tier.set_with_ttl(key, value.clone(), ttl).await {
805                    Ok(_) => {
806                        success_count += 1;
807                    }
808                    Err(e) => {
809                        eprintln!("⚠️ L{} cache set failed for key '{}': {}", tier.tier_level, key, e);
810                        last_error = Some(e);
811                    }
812                }
813            }
814
815            if success_count > 0 {
816                println!("💾 [Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
817                         key, success_count, tiers.len(), ttl);
818                return Ok(());
819            } else {
820                return Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{}'", key)));
821            }
822        }
823
824        // LEGACY: 2-tier mode (L1 + L2)
825        // Store in both L1 and L2
826        let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
827        let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
828
829        // Return success if at least one cache succeeded
830        match (l1_result, l2_result) {
831            (Ok(_), Ok(_)) => {
832                // Both succeeded
833                println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
834                Ok(())
835            }
836            (Ok(_), Err(_)) => {
837                // L1 succeeded, L2 failed
838                eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
839                println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
840                Ok(())
841            }
842            (Err(_), Ok(_)) => {
843                // L1 failed, L2 succeeded
844                eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
845                println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
846                Ok(())
847            }
848            (Err(e1), Err(_e2)) => {
849                // Both failed
850                Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
851            }
852        }
853    }
854    
855    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
856    /// 
857    /// This method provides comprehensive Cache Stampede protection:
858    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
859    /// 2. Check L2 cache with mutex-based coalescing
860    /// 3. Compute fresh data with protection against concurrent computations
861    /// 
862    /// # Arguments
863    /// * `key` - Cache key
864    /// * `strategy` - Cache strategy for TTL and storage behavior
865    /// * `compute_fn` - Async function to compute the value if not in any cache
866    /// 
867    /// # Example
868    /// ```ignore
869    /// let api_data = cache_manager.get_or_compute_with(
870    ///     "api_response",
871    ///     CacheStrategy::RealTime,
872    ///     || async {
873    ///         fetch_data_from_api().await
874    ///     }
875    /// ).await?;
876    /// ```
877    #[allow(dead_code)]
878    pub async fn get_or_compute_with<F, Fut>(
879        &self,
880        key: &str,
881        strategy: CacheStrategy,
882        compute_fn: F,
883    ) -> Result<serde_json::Value>
884    where
885        F: FnOnce() -> Fut + Send,
886        Fut: Future<Output = Result<serde_json::Value>> + Send,
887    {
888        self.total_requests.fetch_add(1, Ordering::Relaxed);
889        
890        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
891        if let Some(value) = self.l1_cache.get(key).await {
892            self.l1_hits.fetch_add(1, Ordering::Relaxed);
893            return Ok(value);
894        }
895        
896        // 2. L1 miss - try L2 with Cache Stampede protection
897        let key_owned = key.to_string();
898        let lock_guard = self.in_flight_requests
899            .entry(key_owned.clone())
900            .or_insert_with(|| Arc::new(Mutex::new(())))
901            .clone();
902        
903        let _guard = lock_guard.lock().await;
904        
905        // RAII cleanup guard - ensures entry is removed even on early return or panic
906        let _cleanup_guard = CleanupGuard {
907            map: &self.in_flight_requests,
908            key: key_owned,
909        };
910        
911        // 3. Double-check L1 cache after acquiring lock
912        // (Another request might have populated it while we were waiting)
913        if let Some(value) = self.l1_cache.get(key).await {
914            self.l1_hits.fetch_add(1, Ordering::Relaxed);
915            // _cleanup_guard will auto-remove entry on drop
916            return Ok(value);
917        }
918
919        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
920        if let Some(tiers) = &self.tiers {
921            // Check tiers starting from index 1 (skip L1 since already checked)
922            for tier in tiers.iter().skip(1) {
923                if let Some((value, ttl)) = tier.get_with_ttl(key).await {
924                    tier.record_hit();
925
926                    // Promote to L1 (first tier)
927                    let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
928                    if let Err(e) = tiers[0].set_with_ttl(key, value.clone(), promotion_ttl).await {
929                        eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}", key, tier.tier_level, e);
930                    } else {
931                        self.promotions.fetch_add(1, Ordering::Relaxed);
932                        println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
933                                key, tier.tier_level, promotion_ttl);
934                    }
935
936                    // _cleanup_guard will auto-remove entry on drop
937                    return Ok(value);
938                }
939            }
940        } else {
941            // LEGACY: Check L2 cache with TTL
942            if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
943                self.l2_hits.fetch_add(1, Ordering::Relaxed);
944
945                // Promote to L1 using Redis TTL (or strategy TTL as fallback)
946                let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
947
948                if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
949                    eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
950                } else {
951                    self.promotions.fetch_add(1, Ordering::Relaxed);
952                    println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
953                }
954
955                // _cleanup_guard will auto-remove entry on drop
956                return Ok(value);
957            }
958        }
959
960        // 5. Cache miss across all tiers - compute fresh data
961        println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
962        let fresh_data = compute_fn().await?;
963        
964        // 6. Store in both caches
965        if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
966            eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
967        }
968        
969        // 7. _cleanup_guard will auto-remove entry on drop
970
971        Ok(fresh_data)
972    }
973
974    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
975    ///
976    /// This method provides the same functionality as `get_or_compute_with()` but with
977    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
978    /// API calls, or any computation that returns structured data.
979    ///
980    /// # Type Safety
981    ///
982    /// - Returns your actual type `T` instead of `serde_json::Value`
983    /// - Compiler enforces Serialize + DeserializeOwned bounds
984    /// - No manual JSON conversion needed
985    ///
986    /// # Cache Flow
987    ///
988    /// 1. Check L1 cache → deserialize if found
989    /// 2. Check L2 cache → deserialize + promote to L1 if found
990    /// 3. Execute compute_fn → serialize → store in L1+L2
991    /// 4. Full stampede protection (only ONE request computes)
992    ///
993    /// # Arguments
994    ///
995    /// * `key` - Cache key
996    /// * `strategy` - Cache strategy for TTL
997    /// * `compute_fn` - Async function returning `Result<T>`
998    ///
999    /// # Example - Database Query
1000    ///
1001    /// ```no_run
1002    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1003    /// # use std::sync::Arc;
1004    /// # use serde::{Serialize, Deserialize};
1005    /// # async fn example() -> anyhow::Result<()> {
1006    /// # let l1 = Arc::new(L1Cache::new().await?);
1007    /// # let l2 = Arc::new(L2Cache::new().await?);
1008    /// # let cache_manager = CacheManager::new(l1, l2);
1009    ///
1010    /// #[derive(Serialize, Deserialize)]
1011    /// struct User {
1012    ///     id: i64,
1013    ///     name: String,
1014    /// }
1015    ///
1016    /// // Type-safe database caching (example - requires sqlx)
1017    /// // let user: User = cache_manager.get_or_compute_typed(
1018    /// //     "user:123",
1019    /// //     CacheStrategy::MediumTerm,
1020    /// //     || async {
1021    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1022    /// //             .bind(123)
1023    /// //             .fetch_one(&pool)
1024    /// //             .await
1025    /// //     }
1026    /// // ).await?;
1027    /// # Ok(())
1028    /// # }
1029    /// ```
1030    ///
1031    /// # Example - API Call
1032    ///
1033    /// ```no_run
1034    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1035    /// # use std::sync::Arc;
1036    /// # use serde::{Serialize, Deserialize};
1037    /// # async fn example() -> anyhow::Result<()> {
1038    /// # let l1 = Arc::new(L1Cache::new().await?);
1039    /// # let l2 = Arc::new(L2Cache::new().await?);
1040    /// # let cache_manager = CacheManager::new(l1, l2);
1041    /// #[derive(Serialize, Deserialize)]
1042    /// struct ApiResponse {
1043    ///     data: String,
1044    ///     timestamp: i64,
1045    /// }
1046    ///
1047    /// // API call caching (example - requires reqwest)
1048    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1049    /// //     "api:endpoint",
1050    /// //     CacheStrategy::RealTime,
1051    /// //     || async {
1052    /// //         reqwest::get("https://api.example.com/data")
1053    /// //             .await?
1054    /// //             .json::<ApiResponse>()
1055    /// //             .await
1056    /// //     }
1057    /// // ).await?;
1058    /// # Ok(())
1059    /// # }
1060    /// ```
1061    ///
1062    /// # Performance
1063    ///
1064    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1065    /// - L2 hit: 2-5ms + deserialization + L1 promotion
1066    /// - Compute: Your function time + serialization + L1+L2 storage
1067    /// - Stampede protection: 99.6% latency reduction under high concurrency
1068    ///
1069    /// # Errors
1070    ///
1071    /// Returns error if:
1072    /// - Compute function fails
1073    /// - Serialization fails (invalid type for JSON)
1074    /// - Deserialization fails (cache data doesn't match type T)
1075    /// - Cache operations fail (Redis connection issues)
1076    pub async fn get_or_compute_typed<T, F, Fut>(
1077        &self,
1078        key: &str,
1079        strategy: CacheStrategy,
1080        compute_fn: F,
1081    ) -> Result<T>
1082    where
1083        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1084        F: FnOnce() -> Fut + Send,
1085        Fut: Future<Output = Result<T>> + Send,
1086    {
1087        self.total_requests.fetch_add(1, Ordering::Relaxed);
1088
1089        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1090        if let Some(cached_json) = self.l1_cache.get(key).await {
1091            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1092
1093            // Attempt to deserialize from JSON to type T
1094            match serde_json::from_value::<T>(cached_json) {
1095                Ok(typed_value) => {
1096                    println!("✅ [L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
1097                    return Ok(typed_value);
1098                }
1099                Err(e) => {
1100                    // Deserialization failed - cache data may be stale or corrupt
1101                    eprintln!("⚠️ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1102                    // Fall through to recompute
1103                }
1104            }
1105        }
1106
1107        // 2. L1 miss - try L2 with Cache Stampede protection
1108        let key_owned = key.to_string();
1109        let lock_guard = self.in_flight_requests
1110            .entry(key_owned.clone())
1111            .or_insert_with(|| Arc::new(Mutex::new(())))
1112            .clone();
1113
1114        let _guard = lock_guard.lock().await;
1115
1116        // RAII cleanup guard - ensures entry is removed even on early return or panic
1117        let _cleanup_guard = CleanupGuard {
1118            map: &self.in_flight_requests,
1119            key: key_owned,
1120        };
1121
1122        // 3. Double-check L1 cache after acquiring lock
1123        // (Another request might have populated it while we were waiting)
1124        if let Some(cached_json) = self.l1_cache.get(key).await {
1125            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1126            if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1127                println!("✅ [L1 HIT] Deserialized '{}' after lock acquisition", key);
1128                return Ok(typed_value);
1129            }
1130        }
1131
1132        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1133        if let Some(tiers) = &self.tiers {
1134            // Check tiers starting from index 1 (skip L1 since already checked)
1135            for tier in tiers.iter().skip(1) {
1136                if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1137                    tier.record_hit();
1138
1139                    // Attempt to deserialize
1140                    match serde_json::from_value::<T>(cached_json.clone()) {
1141                        Ok(typed_value) => {
1142                            println!("✅ [L{} HIT] Deserialized '{}' to type {}",
1143                                    tier.tier_level, key, std::any::type_name::<T>());
1144
1145                            // Promote to L1 (first tier)
1146                            let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1147                            if let Err(e) = tiers[0].set_with_ttl(key, cached_json, promotion_ttl).await {
1148                                eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}",
1149                                         key, tier.tier_level, e);
1150                            } else {
1151                                self.promotions.fetch_add(1, Ordering::Relaxed);
1152                                println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1153                                        key, tier.tier_level, promotion_ttl);
1154                            }
1155
1156                            return Ok(typed_value);
1157                        }
1158                        Err(e) => {
1159                            eprintln!("⚠️ L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1160                                     tier.tier_level, key, e);
1161                            // Continue to next tier
1162                        }
1163                    }
1164                }
1165            }
1166        } else {
1167            // LEGACY: Check L2 cache with TTL
1168            if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1169                self.l2_hits.fetch_add(1, Ordering::Relaxed);
1170
1171                // Attempt to deserialize
1172                match serde_json::from_value::<T>(cached_json.clone()) {
1173                    Ok(typed_value) => {
1174                        println!("✅ [L2 HIT] Deserialized '{}' from Redis", key);
1175
1176                        // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1177                        let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1178
1179                        if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
1180                            eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
1181                        } else {
1182                            self.promotions.fetch_add(1, Ordering::Relaxed);
1183                            println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
1184                        }
1185
1186                        return Ok(typed_value);
1187                    }
1188                    Err(e) => {
1189                        eprintln!("⚠️ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1190                        // Fall through to recompute
1191                    }
1192                }
1193            }
1194        }
1195
1196        // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1197        println!("💻 Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
1198        let typed_value = compute_fn().await?;
1199
1200        // 6. Serialize to JSON for storage
1201        let json_value = serde_json::to_value(&typed_value)
1202            .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
1203
1204        // 7. Store in both L1 and L2 caches
1205        if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1206            eprintln!("⚠️ Failed to cache computed typed data for key '{}': {}", key, e);
1207        } else {
1208            println!("💾 Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
1209        }
1210
1211        // 8. _cleanup_guard will auto-remove entry on drop
1212
1213        Ok(typed_value)
1214    }
1215
1216    /// Get comprehensive cache statistics
1217    ///
1218    /// In multi-tier mode, aggregates statistics from all tiers.
1219    /// In legacy mode, returns L1 and L2 stats.
1220    #[allow(dead_code)]
1221    pub fn get_stats(&self) -> CacheManagerStats {
1222        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1223        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1224        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1225        let misses = self.misses.load(Ordering::Relaxed);
1226
1227        CacheManagerStats {
1228            total_requests: total_reqs,
1229            l1_hits,
1230            l2_hits,
1231            total_hits: l1_hits + l2_hits,
1232            misses,
1233            hit_rate: if total_reqs > 0 {
1234                ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1235            } else { 0.0 },
1236            l1_hit_rate: if total_reqs > 0 {
1237                (l1_hits as f64 / total_reqs as f64) * 100.0
1238            } else { 0.0 },
1239            promotions: self.promotions.load(Ordering::Relaxed),
1240            in_flight_requests: self.in_flight_requests.len(),
1241        }
1242    }
1243
1244    /// Get per-tier statistics (v0.5.0+)
1245    ///
1246    /// Returns statistics for each tier if multi-tier mode is enabled.
1247    /// Returns None if using legacy 2-tier mode.
1248    ///
1249    /// # Example
1250    /// ```rust,ignore
1251    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1252    ///     for stats in tier_stats {
1253    ///         println!("L{}: {} hits ({})",
1254    ///                  stats.tier_level,
1255    ///                  stats.hit_count(),
1256    ///                  stats.backend_name);
1257    ///     }
1258    /// }
1259    /// ```
1260    pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1261        self.tiers.as_ref().map(|tiers| {
1262            tiers.iter().map(|tier| tier.stats.clone()).collect()
1263        })
1264    }
1265    
1266    // ===== Redis Streams Methods =====
1267    
1268    /// Publish data to Redis Stream
1269    ///
1270    /// # Arguments
1271    /// * `stream_key` - Name of the stream (e.g., "events_stream")
1272    /// * `fields` - Field-value pairs to publish
1273    /// * `maxlen` - Optional max length for stream trimming
1274    ///
1275    /// # Returns
1276    /// The entry ID generated by Redis
1277    ///
1278    /// # Errors
1279    /// Returns error if streaming backend is not configured
1280    pub async fn publish_to_stream(
1281        &self,
1282        stream_key: &str,
1283        fields: Vec<(String, String)>,
1284        maxlen: Option<usize>
1285    ) -> Result<String> {
1286        match &self.streaming_backend {
1287            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1288            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1289        }
1290    }
1291    
1292    /// Read latest entries from Redis Stream
1293    ///
1294    /// # Arguments
1295    /// * `stream_key` - Name of the stream
1296    /// * `count` - Number of latest entries to retrieve
1297    ///
1298    /// # Returns
1299    /// Vector of (entry_id, fields) tuples (newest first)
1300    ///
1301    /// # Errors
1302    /// Returns error if streaming backend is not configured
1303    pub async fn read_stream_latest(
1304        &self,
1305        stream_key: &str,
1306        count: usize
1307    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1308        match &self.streaming_backend {
1309            Some(backend) => backend.stream_read_latest(stream_key, count).await,
1310            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1311        }
1312    }
1313    
1314    /// Read from Redis Stream with optional blocking
1315    ///
1316    /// # Arguments
1317    /// * `stream_key` - Name of the stream
1318    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1319    /// * `count` - Max entries to retrieve
1320    /// * `block_ms` - Optional blocking timeout in ms
1321    ///
1322    /// # Returns
1323    /// Vector of (entry_id, fields) tuples
1324    ///
1325    /// # Errors
1326    /// Returns error if streaming backend is not configured
1327    pub async fn read_stream(
1328        &self,
1329        stream_key: &str,
1330        last_id: &str,
1331        count: usize,
1332        block_ms: Option<usize>
1333    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1334        match &self.streaming_backend {
1335            Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
1336            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1337        }
1338    }
1339
1340    // ===== Cache Invalidation Methods =====
1341
1342    /// Invalidate a cache key across all instances
1343    ///
1344    /// This removes the key from all cache tiers and broadcasts
1345    /// the invalidation to all other cache instances via Redis Pub/Sub.
1346    ///
1347    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1348    ///
1349    /// # Arguments
1350    /// * `key` - Cache key to invalidate
1351    ///
1352    /// # Example
1353    /// ```rust,ignore
1354    /// // Invalidate user cache after profile update
1355    /// cache_manager.invalidate("user:123").await?;
1356    /// ```
1357    pub async fn invalidate(&self, key: &str) -> Result<()> {
1358        // NEW: Multi-tier mode (v0.5.0+)
1359        if let Some(tiers) = &self.tiers {
1360            // Remove from ALL tiers
1361            for tier in tiers {
1362                if let Err(e) = tier.remove(key).await {
1363                    eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1364                }
1365            }
1366        } else {
1367            // LEGACY: 2-tier mode
1368            self.l1_cache.remove(key).await?;
1369            self.l2_cache.remove(key).await?;
1370        }
1371
1372        // Broadcast to other instances
1373        if let Some(publisher) = &self.invalidation_publisher {
1374            let mut pub_lock = publisher.lock().await;
1375            let msg = InvalidationMessage::remove(key);
1376            pub_lock.publish(&msg).await?;
1377            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1378        }
1379
1380        println!("🗑️  Invalidated '{}' across all instances", key);
1381        Ok(())
1382    }
1383
1384    /// Update cache value across all instances
1385    ///
1386    /// This updates the key in all cache tiers and broadcasts
1387    /// the update to all other cache instances, avoiding cache misses.
1388    ///
1389    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1390    ///
1391    /// # Arguments
1392    /// * `key` - Cache key to update
1393    /// * `value` - New value
1394    /// * `ttl` - Optional TTL (uses default if None)
1395    ///
1396    /// # Example
1397    /// ```rust,ignore
1398    /// // Update user cache with new data
1399    /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1400    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1401    /// ```
1402    pub async fn update_cache(
1403        &self,
1404        key: &str,
1405        value: serde_json::Value,
1406        ttl: Option<Duration>,
1407    ) -> Result<()> {
1408        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1409
1410        // NEW: Multi-tier mode (v0.5.0+)
1411        if let Some(tiers) = &self.tiers {
1412            // Update ALL tiers with their respective TTL scaling
1413            for tier in tiers {
1414                if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1415                    eprintln!("⚠️ Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1416                }
1417            }
1418        } else {
1419            // LEGACY: 2-tier mode
1420            self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1421            self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1422        }
1423
1424        // Broadcast update to other instances
1425        if let Some(publisher) = &self.invalidation_publisher {
1426            let mut pub_lock = publisher.lock().await;
1427            let msg = InvalidationMessage::update(key, value, Some(ttl));
1428            pub_lock.publish(&msg).await?;
1429            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1430        }
1431
1432        println!("🔄 Updated '{}' across all instances", key);
1433        Ok(())
1434    }
1435
1436    /// Invalidate all keys matching a pattern
1437    ///
1438    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1439    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1440    ///
1441    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1442    ///
1443    /// **Note**: Pattern scanning requires a concrete L2Cache instance with `scan_keys()`.
1444    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1445    ///
1446    /// # Arguments
1447    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1448    ///
1449    /// # Example
1450    /// ```rust,ignore
1451    /// // Invalidate all user caches
1452    /// cache_manager.invalidate_pattern("user:*").await?;
1453    ///
1454    /// // Invalidate specific user's related caches
1455    /// cache_manager.invalidate_pattern("user:123:*").await?;
1456    /// ```
1457    pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1458        // Scan L2 for matching keys
1459        // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1460        let keys = if let Some(l2) = &self.l2_cache_concrete {
1461            l2.scan_keys(pattern).await?
1462        } else {
1463            return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
1464        };
1465
1466        if keys.is_empty() {
1467            println!("🔍 No keys found matching pattern '{}'", pattern);
1468            return Ok(());
1469        }
1470
1471        // NEW: Multi-tier mode (v0.5.0+)
1472        if let Some(tiers) = &self.tiers {
1473            // Remove from ALL tiers
1474            for key in &keys {
1475                for tier in tiers {
1476                    if let Err(e) = tier.remove(key).await {
1477                        eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1478                    }
1479                }
1480            }
1481        } else {
1482            // LEGACY: 2-tier mode - Remove from L2 in bulk
1483            if let Some(l2) = &self.l2_cache_concrete {
1484                l2.remove_bulk(&keys).await?;
1485            }
1486        }
1487
1488        // Broadcast pattern invalidation
1489        if let Some(publisher) = &self.invalidation_publisher {
1490            let mut pub_lock = publisher.lock().await;
1491            let msg = InvalidationMessage::remove_bulk(keys.clone());
1492            pub_lock.publish(&msg).await?;
1493            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1494        }
1495
1496        println!("🔍 Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
1497        Ok(())
1498    }
1499
1500    /// Set value with automatic broadcast to all instances
1501    ///
1502    /// This is a write-through operation that updates the cache and
1503    /// broadcasts the update to all other instances automatically.
1504    ///
1505    /// # Arguments
1506    /// * `key` - Cache key
1507    /// * `value` - Value to cache
1508    /// * `strategy` - Cache strategy (determines TTL)
1509    ///
1510    /// # Example
1511    /// ```rust,ignore
1512    /// // Update and broadcast in one call
1513    /// let data = serde_json::json!({"status": "active"});
1514    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1515    /// ```
1516    pub async fn set_with_broadcast(
1517        &self,
1518        key: &str,
1519        value: serde_json::Value,
1520        strategy: CacheStrategy,
1521    ) -> Result<()> {
1522        let ttl = strategy.to_duration();
1523
1524        // Set in local caches
1525        self.set_with_strategy(key, value.clone(), strategy).await?;
1526
1527        // Broadcast update if invalidation is enabled
1528        if let Some(publisher) = &self.invalidation_publisher {
1529            let mut pub_lock = publisher.lock().await;
1530            let msg = InvalidationMessage::update(key, value, Some(ttl));
1531            pub_lock.publish(&msg).await?;
1532            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1533        }
1534
1535        Ok(())
1536    }
1537
1538    /// Get invalidation statistics
1539    ///
1540    /// Returns statistics about invalidation operations if invalidation is enabled.
1541    pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1542        if self.invalidation_subscriber.is_some() {
1543            Some(self.invalidation_stats.snapshot())
1544        } else {
1545            None
1546        }
1547    }
1548}
1549
1550/// Cache Manager statistics
1551#[allow(dead_code)]
1552#[derive(Debug, Clone)]
1553pub struct CacheManagerStats {
1554    pub total_requests: u64,
1555    pub l1_hits: u64,
1556    pub l2_hits: u64,
1557    pub total_hits: u64,
1558    pub misses: u64,
1559    pub hit_rate: f64,
1560    pub l1_hit_rate: f64,
1561    pub promotions: usize,
1562    pub in_flight_requests: usize,
1563}