multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use tracing::{info, warn, error, debug};
15
16use crate::backends::{L1Cache, L2Cache};
17use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
18use super::invalidation::{
19    InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
20    InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
21};
22
23/// Type alias for the in-flight requests map
24type InFlightMap = DashMap<String, Arc<Mutex<()>>>;
25
26/// RAII cleanup guard for in-flight request tracking
27/// Ensures that entries are removed from DashMap even on early return or panic
28struct CleanupGuard<'a> {
29    map: &'a InFlightMap,
30    key: String,
31}
32
33impl<'a> Drop for CleanupGuard<'a> {
34    fn drop(&mut self) {
35        self.map.remove(&self.key);
36    }
37}
38
39/// Cache strategies for different data types
40#[derive(Debug, Clone)]
41#[allow(dead_code)]
42pub enum CacheStrategy {
43    /// Real-time data - 10 seconds TTL
44    RealTime,
45    /// Short-term data - 5 minutes TTL  
46    ShortTerm,
47    /// Medium-term data - 1 hour TTL
48    MediumTerm,
49    /// Long-term data - 3 hours TTL
50    LongTerm,
51    /// Custom TTL
52    Custom(Duration),
53    /// Default strategy (5 minutes)
54    Default,
55}
56
57impl CacheStrategy {
58    /// Convert strategy to duration
59    pub fn to_duration(&self) -> Duration {
60        match self {
61            Self::RealTime => Duration::from_secs(10),
62            Self::ShortTerm => Duration::from_secs(300), // 5 minutes
63            Self::MediumTerm => Duration::from_secs(3600), // 1 hour
64            Self::LongTerm => Duration::from_secs(10800), // 3 hours
65            Self::Custom(duration) => *duration,
66            Self::Default => Duration::from_secs(300),
67        }
68    }
69}
70
71/// Statistics for a single cache tier
72#[derive(Debug)]
73pub struct TierStats {
74    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
75    pub tier_level: usize,
76    /// Number of cache hits at this tier
77    pub hits: AtomicU64,
78    /// Backend name for identification
79    pub backend_name: String,
80}
81
82impl Clone for TierStats {
83    fn clone(&self) -> Self {
84        Self {
85            tier_level: self.tier_level,
86            hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
87            backend_name: self.backend_name.clone(),
88        }
89    }
90}
91
92impl TierStats {
93    fn new(tier_level: usize, backend_name: String) -> Self {
94        Self {
95            tier_level,
96            hits: AtomicU64::new(0),
97            backend_name,
98        }
99    }
100
101    /// Get current hit count
102    pub fn hit_count(&self) -> u64 {
103        self.hits.load(Ordering::Relaxed)
104    }
105}
106
107/// A single cache tier in the multi-tier architecture
108pub struct CacheTier {
109    /// Cache backend for this tier
110    backend: Arc<dyn L2CacheBackend>,
111    /// Tier level (1 = hottest/fastest, higher = colder/slower)
112    tier_level: usize,
113    /// Enable automatic promotion to upper tiers on cache hit
114    promotion_enabled: bool,
115    /// TTL scale factor (multiplier for TTL when storing/promoting)
116    ttl_scale: f64,
117    /// Statistics for this tier
118    stats: TierStats,
119}
120
121impl CacheTier {
122    /// Create a new cache tier
123    pub fn new(
124        backend: Arc<dyn L2CacheBackend>,
125        tier_level: usize,
126        promotion_enabled: bool,
127        ttl_scale: f64,
128    ) -> Self {
129        let backend_name = backend.name().to_string();
130        Self {
131            backend,
132            tier_level,
133            promotion_enabled,
134            ttl_scale,
135            stats: TierStats::new(tier_level, backend_name),
136        }
137    }
138
139    /// Get value with TTL from this tier
140    async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
141        self.backend.get_with_ttl(key).await
142    }
143
144    /// Set value with TTL in this tier
145    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
146        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
147        self.backend.set_with_ttl(key, value, scaled_ttl).await
148    }
149
150    /// Remove value from this tier
151    async fn remove(&self, key: &str) -> Result<()> {
152        self.backend.remove(key).await
153    }
154
155    /// Record a cache hit for this tier
156    fn record_hit(&self) {
157        self.stats.hits.fetch_add(1, Ordering::Relaxed);
158    }
159}
160
161/// Configuration for a cache tier (used in builder pattern)
162#[derive(Debug, Clone)]
163pub struct TierConfig {
164    /// Tier level (1, 2, 3, 4...)
165    pub tier_level: usize,
166    /// Enable promotion to upper tiers on hit
167    pub promotion_enabled: bool,
168    /// TTL scale factor (1.0 = same as base TTL)
169    pub ttl_scale: f64,
170}
171
172impl TierConfig {
173    /// Create new tier configuration
174    pub fn new(tier_level: usize) -> Self {
175        Self {
176            tier_level,
177            promotion_enabled: true,
178            ttl_scale: 1.0,
179        }
180    }
181
182    /// Configure as L1 (hot tier)
183    pub fn as_l1() -> Self {
184        Self {
185            tier_level: 1,
186            promotion_enabled: false, // L1 is already top tier
187            ttl_scale: 1.0,
188        }
189    }
190
191    /// Configure as L2 (warm tier)
192    pub fn as_l2() -> Self {
193        Self {
194            tier_level: 2,
195            promotion_enabled: true,
196            ttl_scale: 1.0,
197        }
198    }
199
200    /// Configure as L3 (cold tier) with longer TTL
201    pub fn as_l3() -> Self {
202        Self {
203            tier_level: 3,
204            promotion_enabled: true,
205            ttl_scale: 2.0, // Keep data 2x longer
206        }
207    }
208
209    /// Configure as L4 (archive tier) with much longer TTL
210    pub fn as_l4() -> Self {
211        Self {
212            tier_level: 4,
213            promotion_enabled: true,
214            ttl_scale: 8.0, // Keep data 8x longer
215        }
216    }
217
218    /// Set promotion enabled
219    pub fn with_promotion(mut self, enabled: bool) -> Self {
220        self.promotion_enabled = enabled;
221        self
222    }
223
224    /// Set TTL scale factor
225    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
226        self.ttl_scale = scale;
227        self
228    }
229
230    /// Set tier level
231    pub fn with_level(mut self, level: usize) -> Self {
232        self.tier_level = level;
233        self
234    }
235}
236
237/// Proxy wrapper to convert L2CacheBackend to CacheBackend
238/// (Rust doesn't support automatic trait upcasting for trait objects)
239struct ProxyCacheBackend {
240    backend: Arc<dyn L2CacheBackend>,
241}
242
243#[async_trait::async_trait]
244impl CacheBackend for ProxyCacheBackend {
245    async fn get(&self, key: &str) -> Option<serde_json::Value> {
246        self.backend.get(key).await
247    }
248
249    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
250        self.backend.set_with_ttl(key, value, ttl).await
251    }
252
253    async fn remove(&self, key: &str) -> Result<()> {
254        self.backend.remove(key).await
255    }
256
257    async fn health_check(&self) -> bool {
258        self.backend.health_check().await
259    }
260
261    fn name(&self) -> &str {
262        self.backend.name()
263    }
264}
265
266/// Cache Manager - Unified operations across multiple cache tiers
267///
268/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
269/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
270/// legacy L1+L2 behavior for backward compatibility.
271pub struct CacheManager {
272    /// Dynamic multi-tier cache architecture (v0.5.0+)
273    /// If Some, this takes precedence over l1_cache/l2_cache fields
274    tiers: Option<Vec<CacheTier>>,
275
276    // ===== Legacy fields (v0.1.0 - v0.4.x) =====
277    // Maintained for backward compatibility
278    /// L1 Cache (trait object for pluggable backends)
279    l1_cache: Arc<dyn CacheBackend>,
280    /// L2 Cache (trait object for pluggable backends)
281    l2_cache: Arc<dyn L2CacheBackend>,
282    /// L2 Cache concrete instance (for invalidation scan_keys)
283    l2_cache_concrete: Option<Arc<L2Cache>>,
284
285    /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
286    streaming_backend: Option<Arc<dyn StreamingBackend>>,
287    /// Statistics (AtomicU64 is already thread-safe, no Arc needed)
288    total_requests: AtomicU64,
289    l1_hits: AtomicU64,
290    l2_hits: AtomicU64,
291    misses: AtomicU64,
292    promotions: AtomicUsize,
293    /// In-flight requests to prevent Cache Stampede on L2/compute operations
294    in_flight_requests: Arc<InFlightMap>,
295    /// Invalidation publisher (for broadcasting invalidation messages)
296    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
297    /// Invalidation subscriber (for receiving invalidation messages)
298    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
299    /// Invalidation statistics
300    invalidation_stats: Arc<AtomicInvalidationStats>,
301}
302
303impl CacheManager {
304    /// Create new cache manager with trait objects (pluggable backends)
305    ///
306    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
307    ///
308    /// # Arguments
309    ///
310    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
311    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
312    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
313    ///
314    /// # Example
315    ///
316    /// ```rust,ignore
317    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
318    /// use std::sync::Arc;
319    ///
320    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
321    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
322    ///
323    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
324    /// ```
325    pub async fn new_with_backends(
326        l1_cache: Arc<dyn CacheBackend>,
327        l2_cache: Arc<dyn L2CacheBackend>,
328        streaming_backend: Option<Arc<dyn StreamingBackend>>,
329    ) -> Result<Self> {
330        debug!("Initializing Cache Manager with custom backends...");
331        debug!("  L1: {}", l1_cache.name());
332        debug!("  L2: {}", l2_cache.name());
333        if streaming_backend.is_some() {
334            debug!("  Streaming: enabled");
335        } else {
336            debug!("  Streaming: disabled");
337        }
338
339        Ok(Self {
340            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
341            l1_cache,
342            l2_cache,
343            l2_cache_concrete: None,
344            streaming_backend,
345            total_requests: AtomicU64::new(0),
346            l1_hits: AtomicU64::new(0),
347            l2_hits: AtomicU64::new(0),
348            misses: AtomicU64::new(0),
349            promotions: AtomicUsize::new(0),
350            in_flight_requests: Arc::new(DashMap::new()),
351            invalidation_publisher: None,
352            invalidation_subscriber: None,
353            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
354        })
355    }
356
357    /// Create new cache manager with default backends (backward compatible)
358    ///
359    /// This is the legacy constructor maintained for backward compatibility.
360    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
361    ///
362    /// # Arguments
363    ///
364    /// * `l1_cache` - Moka L1 cache instance
365    /// * `l2_cache` - Redis L2 cache instance
366    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
367        debug!("Initializing Cache Manager...");
368
369        // Convert concrete types to trait objects
370        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
371        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
372
373        // Create RedisStreams backend for streaming functionality
374        let redis_url = std::env::var("REDIS_URL")
375            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
376        let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
377        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
378
379        Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
380    }
381
382    /// Create new cache manager with invalidation support
383    ///
384    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
385    ///
386    /// # Arguments
387    ///
388    /// * `l1_cache` - Moka L1 cache instance
389    /// * `l2_cache` - Redis L2 cache instance
390    /// * `redis_url` - Redis connection URL for Pub/Sub
391    /// * `config` - Invalidation configuration
392    ///
393    /// # Example
394    ///
395    /// ```rust,ignore
396    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
397    ///
398    /// let config = InvalidationConfig {
399    ///     channel: "my_app:cache:invalidate".to_string(),
400    ///     ..Default::default()
401    /// };
402    ///
403    /// let manager = CacheManager::new_with_invalidation(
404    ///     l1, l2, "redis://localhost", config
405    /// ).await?;
406    /// ```
407    pub async fn new_with_invalidation(
408        l1_cache: Arc<L1Cache>,
409        l2_cache: Arc<L2Cache>,
410        redis_url: &str,
411        config: InvalidationConfig,
412    ) -> Result<Self> {
413        debug!("Initializing Cache Manager with Invalidation...");
414        debug!("  Pub/Sub channel: {}", config.channel);
415
416        // Convert concrete types to trait objects
417        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
418        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
419
420        // Create RedisStreams backend for streaming functionality
421        let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
422        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
423
424        // Create publisher
425        let client = redis::Client::open(redis_url)?;
426        let conn_manager = redis::aio::ConnectionManager::new(client).await?;
427        let publisher = InvalidationPublisher::new(conn_manager, config.clone());
428
429        // Create subscriber
430        let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
431        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
432
433        let manager = Self {
434            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
435            l1_cache: l1_backend,
436            l2_cache: l2_backend,
437            l2_cache_concrete: Some(l2_cache),
438            streaming_backend: Some(streaming_backend),
439            total_requests: AtomicU64::new(0),
440            l1_hits: AtomicU64::new(0),
441            l2_hits: AtomicU64::new(0),
442            misses: AtomicU64::new(0),
443            promotions: AtomicUsize::new(0),
444            in_flight_requests: Arc::new(DashMap::new()),
445            invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
446            invalidation_subscriber: Some(Arc::new(subscriber)),
447            invalidation_stats,
448        };
449
450        // Start subscriber with handler
451        manager.start_invalidation_subscriber();
452
453        info!("Cache Manager initialized with invalidation support");
454
455        Ok(manager)
456    }
457
458    /// Create new cache manager with multi-tier architecture (v0.5.0+)
459    ///
460    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
461    /// Tiers are checked in order (lower tier_level = faster/hotter).
462    ///
463    /// # Arguments
464    ///
465    /// * `tiers` - Vector of configured cache tiers (must be sorted by tier_level ascending)
466    /// * `streaming_backend` - Optional streaming backend
467    ///
468    /// # Example
469    ///
470    /// ```rust,ignore
471    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
472    /// use std::sync::Arc;
473    ///
474    /// // L1 + L2 + L3 setup
475    /// let l1 = Arc::new(L1Cache::new().await?);
476    /// let l2 = Arc::new(L2Cache::new().await?);
477    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
478    ///
479    /// let tiers = vec![
480    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
481    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
482    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
483    /// ];
484    ///
485    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
486    /// ```
487    pub async fn new_with_tiers(
488        tiers: Vec<CacheTier>,
489        streaming_backend: Option<Arc<dyn StreamingBackend>>,
490    ) -> Result<Self> {
491        debug!("Initializing Multi-Tier Cache Manager...");
492        debug!("  Tier count: {}", tiers.len());
493        for tier in &tiers {
494            debug!(
495                "  L{}: {} (promotion={}, ttl_scale={})",
496                tier.tier_level,
497                tier.stats.backend_name,
498                tier.promotion_enabled,
499                tier.ttl_scale
500            );
501        }
502
503        // Validate tiers are sorted by level
504        for i in 1..tiers.len() {
505            if tiers[i].tier_level <= tiers[i - 1].tier_level {
506                anyhow::bail!(
507                    "Tiers must be sorted by tier_level ascending (found L{} after L{})",
508                    tiers[i].tier_level,
509                    tiers[i - 1].tier_level
510                );
511            }
512        }
513
514        // For backward compatibility with legacy code, we need dummy l1/l2 caches
515        // Use first tier as l1, second tier as l2 if available
516        let (l1_cache, l2_cache) = if tiers.len() >= 2 {
517            (tiers[0].backend.clone(), tiers[1].backend.clone())
518        } else if tiers.len() == 1 {
519            // Only one tier - use it for both
520            let tier = tiers[0].backend.clone();
521            (tier.clone(), tier)
522        } else {
523            anyhow::bail!("At least one cache tier is required");
524        };
525
526        // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
527        let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
528            backend: l1_cache.clone(),
529        });
530
531        Ok(Self {
532            tiers: Some(tiers),
533            l1_cache: l1_backend,
534            l2_cache,
535            l2_cache_concrete: None,
536            streaming_backend,
537            total_requests: AtomicU64::new(0),
538            l1_hits: AtomicU64::new(0),
539            l2_hits: AtomicU64::new(0),
540            misses: AtomicU64::new(0),
541            promotions: AtomicUsize::new(0),
542            in_flight_requests: Arc::new(DashMap::new()),
543            invalidation_publisher: None,
544            invalidation_subscriber: None,
545            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
546        })
547    }
548
549    /// Start the invalidation subscriber background task
550    fn start_invalidation_subscriber(&self) {
551        if let Some(subscriber) = &self.invalidation_subscriber {
552            let l1_cache = Arc::clone(&self.l1_cache);
553            let l2_cache_concrete = self.l2_cache_concrete.clone();
554
555            subscriber.start(move |msg| {
556                let l1 = Arc::clone(&l1_cache);
557                let _l2 = l2_cache_concrete.clone();
558
559                async move {
560                    match msg {
561                        InvalidationMessage::Remove { key } => {
562                            // Remove from L1
563                            l1.remove(&key).await?;
564                            debug!("Invalidation: Removed '{}' from L1", key);
565                        }
566                        InvalidationMessage::Update { key, value, ttl_secs } => {
567                            // Update L1 with new value
568                            let ttl = ttl_secs
569                                .map(Duration::from_secs)
570                                .unwrap_or_else(|| Duration::from_secs(300));
571                            l1.set_with_ttl(&key, value, ttl).await?;
572                            debug!("Invalidation: Updated '{}' in L1", key);
573                        }
574                        InvalidationMessage::RemovePattern { pattern } => {
575                            // For pattern-based invalidation, we can't easily iterate L1 cache
576                            // So we just log it. The pattern invalidation is mainly for L2.
577                            // L1 entries will naturally expire via TTL.
578                            debug!("Invalidation: Pattern '{}' invalidated (L1 will expire naturally)", pattern);
579                        }
580                        InvalidationMessage::RemoveBulk { keys } => {
581                            // Remove multiple keys from L1
582                            for key in keys {
583                                if let Err(e) = l1.remove(&key).await {
584                                    warn!("Failed to remove '{}' from L1: {}", key, e);
585                                }
586                            }
587                            debug!("Invalidation: Bulk removed keys from L1");
588                        }
589                    }
590                    Ok(())
591                }
592            });
593
594            info!("Invalidation subscriber started");
595        }
596    }
597
598    /// Get value from cache using multi-tier architecture (v0.5.0+)
599    ///
600    /// This method iterates through all configured tiers and automatically promotes
601    /// to upper tiers on cache hit.
602    async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
603        let tiers = self.tiers.as_ref().unwrap(); // Safe: only called when tiers is Some
604
605        // Try each tier sequentially (sorted by tier_level)
606        for (tier_index, tier) in tiers.iter().enumerate() {
607            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
608                // Cache hit!
609                tier.record_hit();
610
611                // Promote to all upper tiers (if promotion enabled)
612                if tier.promotion_enabled && tier_index > 0 {
613                    let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
614
615                    // Promote to all tiers above this one
616                    for upper_tier in tiers[..tier_index].iter().rev() {
617                        if let Err(e) = upper_tier.set_with_ttl(key, value.clone(), promotion_ttl).await {
618                            warn!(
619                                "Failed to promote '{}' from L{} to L{}: {}",
620                                key, tier.tier_level, upper_tier.tier_level, e
621                            );
622                        } else {
623                            self.promotions.fetch_add(1, Ordering::Relaxed);
624                            debug!(
625                                "Promoted '{}' from L{} to L{} (TTL: {:?})",
626                                key, tier.tier_level, upper_tier.tier_level, promotion_ttl
627                            );
628                        }
629                    }
630                }
631
632                return Ok(Some(value));
633            }
634        }
635
636        // Cache miss across all tiers
637        Ok(None)
638    }
639
640    /// Get value from cache (L1 first, then L2 fallback with promotion)
641    ///
642    /// This method now includes built-in Cache Stampede protection when cache misses occur.
643    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
644    /// unnecessary duplicate work on external data sources.
645    ///
646    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
647    ///
648    /// # Arguments
649    /// * `key` - Cache key to retrieve
650    ///
651    /// # Returns
652    /// * `Ok(Some(value))` - Cache hit, value found in any tier
653    /// * `Ok(None)` - Cache miss, value not found in any cache
654    /// * `Err(error)` - Cache operation failed
655    pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
656        self.total_requests.fetch_add(1, Ordering::Relaxed);
657
658        // NEW: Multi-tier mode (v0.5.0+)
659        if self.tiers.is_some() {
660            // Fast path for L1 (first tier) - no locking needed
661            if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
662                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
663                    tier1.record_hit();
664                    // Update legacy stats for backward compatibility
665                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
666                    return Ok(Some(value));
667                }
668            }
669
670            // L1 miss - use stampede protection for lower tiers
671            let key_owned = key.to_string();
672            let lock_guard = self.in_flight_requests
673                .entry(key_owned.clone())
674                .or_insert_with(|| Arc::new(Mutex::new(())))
675                .clone();
676
677            let _guard = lock_guard.lock().await;
678            let cleanup_guard = CleanupGuard {
679                map: &self.in_flight_requests,
680                key: key_owned.clone(),
681            };
682
683            // Double-check L1 after acquiring lock
684            if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
685                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
686                    tier1.record_hit();
687                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
688                    return Ok(Some(value));
689                }
690            }
691
692            // Check remaining tiers with promotion
693            let result = self.get_multi_tier(key).await?;
694
695            if result.is_some() {
696                // Hit in L2+ tier - update legacy stats
697                if self.tiers.as_ref().unwrap().len() >= 2 {
698                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
699                }
700            } else {
701                self.misses.fetch_add(1, Ordering::Relaxed);
702            }
703
704            drop(cleanup_guard);
705            return Ok(result);
706        }
707
708        // LEGACY: 2-tier mode (L1 + L2)
709        // Fast path: Try L1 first (no locking needed)
710        if let Some(value) = self.l1_cache.get(key).await {
711            self.l1_hits.fetch_add(1, Ordering::Relaxed);
712            return Ok(Some(value));
713        }
714        
715        // L1 miss - implement Cache Stampede protection for L2 lookup
716        let key_owned = key.to_string();
717        let lock_guard = self.in_flight_requests
718            .entry(key_owned.clone())
719            .or_insert_with(|| Arc::new(Mutex::new(())))
720            .clone();
721        
722        let _guard = lock_guard.lock().await;
723        
724        // RAII cleanup guard - ensures entry is removed even on early return or panic
725        let cleanup_guard = CleanupGuard {
726            map: &self.in_flight_requests,
727            key: key_owned.clone(),
728        };
729        
730        // Double-check L1 cache after acquiring lock
731        // (Another concurrent request might have populated it while we were waiting)
732        if let Some(value) = self.l1_cache.get(key).await {
733            self.l1_hits.fetch_add(1, Ordering::Relaxed);
734            // cleanup_guard will auto-remove entry on drop
735            return Ok(Some(value));
736        }
737        
738        // Check L2 cache with TTL information
739        if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
740            self.l2_hits.fetch_add(1, Ordering::Relaxed);
741
742            // Promote to L1 with same TTL as Redis (or default if no TTL)
743            let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
744
745            if self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await.is_err() {
746                // L1 promotion failed, but we still have the data
747                warn!("Failed to promote key '{}' to L1 cache", key);
748            } else {
749                self.promotions.fetch_add(1, Ordering::Relaxed);
750                debug!("Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
751            }
752
753            // cleanup_guard will auto-remove entry on drop
754            return Ok(Some(value));
755        }
756        
757        // Both L1 and L2 miss
758        self.misses.fetch_add(1, Ordering::Relaxed);
759        
760        // cleanup_guard will auto-remove entry on drop
761        drop(cleanup_guard);
762        
763        Ok(None)
764    }
765    
766
767    /// Set value with specific cache strategy (all tiers)
768    ///
769    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
770    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
771    pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
772        let ttl = strategy.to_duration();
773
774        // NEW: Multi-tier mode (v0.5.0+)
775        if let Some(tiers) = &self.tiers {
776            // Store in ALL tiers with their respective TTL scaling
777            let mut success_count = 0;
778            let mut last_error = None;
779
780            for tier in tiers {
781                match tier.set_with_ttl(key, value.clone(), ttl).await {
782                    Ok(_) => {
783                        success_count += 1;
784                    }
785                    Err(e) => {
786                        error!("L{} cache set failed for key '{}': {}", tier.tier_level, key, e);
787                        last_error = Some(e);
788                    }
789                }
790            }
791
792            if success_count > 0 {
793                debug!("[Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
794                         key, success_count, tiers.len(), ttl);
795                return Ok(());
796            } else {
797                return Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{}'", key)));
798            }
799        }
800
801        // LEGACY: 2-tier mode (L1 + L2)
802        // Store in both L1 and L2
803        let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
804        let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
805
806        // Return success if at least one cache succeeded
807        match (l1_result, l2_result) {
808            (Ok(_), Ok(_)) => {
809                // Both succeeded
810                debug!("[L1+L2] Cached '{}' with TTL {:?}", key, ttl);
811                Ok(())
812            }
813            (Ok(_), Err(_)) => {
814                // L1 succeeded, L2 failed
815                warn!("L2 cache set failed for key '{}', continuing with L1", key);
816                debug!("[L1] Cached '{}' with TTL {:?}", key, ttl);
817                Ok(())
818            }
819            (Err(_), Ok(_)) => {
820                // L1 failed, L2 succeeded
821                warn!("L1 cache set failed for key '{}', continuing with L2", key);
822                debug!("[L2] Cached '{}' with TTL {:?}", key, ttl);
823                Ok(())
824            }
825            (Err(e1), Err(_e2)) => {
826                // Both failed
827                Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
828            }
829        }
830    }
831    
832    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
833    /// 
834    /// This method provides comprehensive Cache Stampede protection:
835    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
836    /// 2. Check L2 cache with mutex-based coalescing
837    /// 3. Compute fresh data with protection against concurrent computations
838    /// 
839    /// # Arguments
840    /// * `key` - Cache key
841    /// * `strategy` - Cache strategy for TTL and storage behavior
842    /// * `compute_fn` - Async function to compute the value if not in any cache
843    /// 
844    /// # Example
845    /// ```ignore
846    /// let api_data = cache_manager.get_or_compute_with(
847    ///     "api_response",
848    ///     CacheStrategy::RealTime,
849    ///     || async {
850    ///         fetch_data_from_api().await
851    ///     }
852    /// ).await?;
853    /// ```
854    #[allow(dead_code)]
855    pub async fn get_or_compute_with<F, Fut>(
856        &self,
857        key: &str,
858        strategy: CacheStrategy,
859        compute_fn: F,
860    ) -> Result<serde_json::Value>
861    where
862        F: FnOnce() -> Fut + Send,
863        Fut: Future<Output = Result<serde_json::Value>> + Send,
864    {
865        self.total_requests.fetch_add(1, Ordering::Relaxed);
866        
867        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
868        if let Some(value) = self.l1_cache.get(key).await {
869            self.l1_hits.fetch_add(1, Ordering::Relaxed);
870            return Ok(value);
871        }
872        
873        // 2. L1 miss - try L2 with Cache Stampede protection
874        let key_owned = key.to_string();
875        let lock_guard = self.in_flight_requests
876            .entry(key_owned.clone())
877            .or_insert_with(|| Arc::new(Mutex::new(())))
878            .clone();
879        
880        let _guard = lock_guard.lock().await;
881        
882        // RAII cleanup guard - ensures entry is removed even on early return or panic
883        let _cleanup_guard = CleanupGuard {
884            map: &self.in_flight_requests,
885            key: key_owned,
886        };
887        
888        // 3. Double-check L1 cache after acquiring lock
889        // (Another request might have populated it while we were waiting)
890        if let Some(value) = self.l1_cache.get(key).await {
891            self.l1_hits.fetch_add(1, Ordering::Relaxed);
892            // _cleanup_guard will auto-remove entry on drop
893            return Ok(value);
894        }
895
896        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
897        if let Some(tiers) = &self.tiers {
898            // Check tiers starting from index 1 (skip L1 since already checked)
899            for tier in tiers.iter().skip(1) {
900                if let Some((value, ttl)) = tier.get_with_ttl(key).await {
901                    tier.record_hit();
902
903
904                    let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
905                    if let Err(e) = tiers[0].set_with_ttl(key, value.clone(), promotion_ttl).await {
906                        warn!("Failed to promote '{}' from L{} to L1: {}", key, tier.tier_level, e);
907                    } else {
908                        self.promotions.fetch_add(1, Ordering::Relaxed);
909                        debug!("Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
910                                key, tier.tier_level, promotion_ttl);
911                    }
912
913                    // _cleanup_guard will auto-remove entry on drop
914                    return Ok(value);
915                }
916            }
917        } else {
918            // LEGACY: Check L2 cache with TTL
919            if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
920                self.l2_hits.fetch_add(1, Ordering::Relaxed);
921
922                // Promote to L1 using Redis TTL (or strategy TTL as fallback)
923                let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
924
925                if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
926                    warn!("Failed to promote key '{}' to L1: {}", key, e);
927                } else {
928                    self.promotions.fetch_add(1, Ordering::Relaxed);
929                    debug!("Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
930                }
931
932                // _cleanup_guard will auto-remove entry on drop
933                return Ok(value);
934            }
935        }
936
937        // 5. Cache miss across all tiers - compute fresh data
938        debug!("Computing fresh data for key: '{}' (Cache Stampede protected)", key);
939        let fresh_data = compute_fn().await?;
940        
941        // 6. Store in both caches
942        if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
943            warn!("Failed to cache computed data for key '{}': {}", key, e);
944        }
945        
946        // 7. _cleanup_guard will auto-remove entry on drop
947
948        Ok(fresh_data)
949    }
950
951    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
952    ///
953    /// This method provides the same functionality as `get_or_compute_with()` but with
954    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
955    /// API calls, or any computation that returns structured data.
956    ///
957    /// # Type Safety
958    ///
959    /// - Returns your actual type `T` instead of `serde_json::Value`
960    /// - Compiler enforces Serialize + DeserializeOwned bounds
961    /// - No manual JSON conversion needed
962    ///
963    /// # Cache Flow
964    ///
965    /// 1. Check L1 cache → deserialize if found
966    /// 2. Check L2 cache → deserialize + promote to L1 if found
967    /// 3. Execute compute_fn → serialize → store in L1+L2
968    /// 4. Full stampede protection (only ONE request computes)
969    ///
970    /// # Arguments
971    ///
972    /// * `key` - Cache key
973    /// * `strategy` - Cache strategy for TTL
974    /// * `compute_fn` - Async function returning `Result<T>`
975    ///
976    /// # Example - Database Query
977    ///
978    /// ```no_run
979    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
980    /// # use std::sync::Arc;
981    /// # use serde::{Serialize, Deserialize};
982    /// # async fn example() -> anyhow::Result<()> {
983    /// # let l1 = Arc::new(L1Cache::new().await?);
984    /// # let l2 = Arc::new(L2Cache::new().await?);
985    /// # let cache_manager = CacheManager::new(l1, l2);
986    ///
987    /// #[derive(Serialize, Deserialize)]
988    /// struct User {
989    ///     id: i64,
990    ///     name: String,
991    /// }
992    ///
993    /// // Type-safe database caching (example - requires sqlx)
994    /// // let user: User = cache_manager.get_or_compute_typed(
995    /// //     "user:123",
996    /// //     CacheStrategy::MediumTerm,
997    /// //     || async {
998    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
999    /// //             .bind(123)
1000    /// //             .fetch_one(&pool)
1001    /// //             .await
1002    /// //     }
1003    /// // ).await?;
1004    /// # Ok(())
1005    /// # }
1006    /// ```
1007    ///
1008    /// # Example - API Call
1009    ///
1010    /// ```no_run
1011    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1012    /// # use std::sync::Arc;
1013    /// # use serde::{Serialize, Deserialize};
1014    /// # async fn example() -> anyhow::Result<()> {
1015    /// # let l1 = Arc::new(L1Cache::new().await?);
1016    /// # let l2 = Arc::new(L2Cache::new().await?);
1017    /// # let cache_manager = CacheManager::new(l1, l2);
1018    /// #[derive(Serialize, Deserialize)]
1019    /// struct ApiResponse {
1020    ///     data: String,
1021    ///     timestamp: i64,
1022    /// }
1023    ///
1024    /// // API call caching (example - requires reqwest)
1025    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1026    /// //     "api:endpoint",
1027    /// //     CacheStrategy::RealTime,
1028    /// //     || async {
1029    /// //         reqwest::get("https://api.example.com/data")
1030    /// //             .await?
1031    /// //             .json::<ApiResponse>()
1032    /// //             .await
1033    /// //     }
1034    /// // ).await?;
1035    /// # Ok(())
1036    /// # }
1037    /// ```
1038    ///
1039    /// # Performance
1040    ///
1041    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1042    /// - L2 hit: 2-5ms + deserialization + L1 promotion
1043    /// - Compute: Your function time + serialization + L1+L2 storage
1044    /// - Stampede protection: 99.6% latency reduction under high concurrency
1045    ///
1046    /// # Errors
1047    ///
1048    /// Returns error if:
1049    /// - Compute function fails
1050    /// - Serialization fails (invalid type for JSON)
1051    /// - Deserialization fails (cache data doesn't match type T)
1052    /// - Cache operations fail (Redis connection issues)
1053    pub async fn get_or_compute_typed<T, F, Fut>(
1054        &self,
1055        key: &str,
1056        strategy: CacheStrategy,
1057        compute_fn: F,
1058    ) -> Result<T>
1059    where
1060        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1061        F: FnOnce() -> Fut + Send,
1062        Fut: Future<Output = Result<T>> + Send,
1063    {
1064        self.total_requests.fetch_add(1, Ordering::Relaxed);
1065
1066        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1067        if let Some(cached_json) = self.l1_cache.get(key).await {
1068            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1069
1070            // Attempt to deserialize from JSON to type T
1071            match serde_json::from_value::<T>(cached_json) {
1072                Ok(typed_value) => {
1073                    debug!("[L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
1074                    return Ok(typed_value);
1075                }
1076                Err(e) => {
1077                    // Deserialization failed - cache data may be stale or corrupt
1078                    warn!("L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1079                    // Fall through to recompute
1080                }
1081            }
1082        }
1083
1084        // 2. L1 miss - try L2 with Cache Stampede protection
1085        let key_owned = key.to_string();
1086        let lock_guard = self.in_flight_requests
1087            .entry(key_owned.clone())
1088            .or_insert_with(|| Arc::new(Mutex::new(())))
1089            .clone();
1090
1091        let _guard = lock_guard.lock().await;
1092
1093        // RAII cleanup guard - ensures entry is removed even on early return or panic
1094        let _cleanup_guard = CleanupGuard {
1095            map: &self.in_flight_requests,
1096            key: key_owned,
1097        };
1098
1099        // 3. Double-check L1 cache after acquiring lock
1100        // (Another request might have populated it while we were waiting)
1101        if let Some(cached_json) = self.l1_cache.get(key).await {
1102            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1103            if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1104                debug!("[L1 HIT] Deserialized '{}' after lock acquisition", key);
1105                return Ok(typed_value);
1106            }
1107        }
1108
1109        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1110        if let Some(tiers) = &self.tiers {
1111            // Check tiers starting from index 1 (skip L1 since already checked)
1112            for tier in tiers.iter().skip(1) {
1113                if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1114                    tier.record_hit();
1115
1116                    // Attempt to deserialize
1117                    match serde_json::from_value::<T>(cached_json.clone()) {
1118                        Ok(typed_value) => {
1119                            debug!("[L{} HIT] Deserialized '{}' to type {}",
1120                                    tier.tier_level, key, std::any::type_name::<T>());
1121
1122                            // Promote to L1 (first tier)
1123                            let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1124                            if let Err(e) = tiers[0].set_with_ttl(key, cached_json, promotion_ttl).await {
1125                                warn!("Failed to promote '{}' from L{} to L1: {}",
1126                                         key, tier.tier_level, e);
1127                            } else {
1128                                self.promotions.fetch_add(1, Ordering::Relaxed);
1129                                debug!("Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1130                                        key, tier.tier_level, promotion_ttl);
1131                            }
1132
1133                            return Ok(typed_value);
1134                        }
1135                        Err(e) => {
1136                            warn!("L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1137                                     tier.tier_level, key, e);
1138                            // Continue to next tier
1139                        }
1140                    }
1141                }
1142            }
1143        } else {
1144            // LEGACY: Check L2 cache with TTL
1145            if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1146                self.l2_hits.fetch_add(1, Ordering::Relaxed);
1147
1148                // Attempt to deserialize
1149                match serde_json::from_value::<T>(cached_json.clone()) {
1150                    Ok(typed_value) => {
1151                        debug!("[L2 HIT] Deserialized '{}' from Redis", key);
1152
1153                        // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1154                        let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1155
1156                        if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
1157                            warn!("Failed to promote key '{}' to L1: {}", key, e);
1158                        } else {
1159                            self.promotions.fetch_add(1, Ordering::Relaxed);
1160                            debug!("Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
1161                        }
1162
1163                        return Ok(typed_value);
1164                    }
1165                    Err(e) => {
1166                        warn!("L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1167                        // Fall through to recompute
1168                    }
1169                }
1170            }
1171        }
1172
1173        // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1174        debug!("Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
1175        let typed_value = compute_fn().await?;
1176
1177        // 6. Serialize to JSON for storage
1178        let json_value = serde_json::to_value(&typed_value)
1179            .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
1180
1181        // 7. Store in both L1 and L2 caches
1182        if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1183            warn!("Failed to cache computed typed data for key '{}': {}", key, e);
1184        } else {
1185            debug!("Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
1186        }
1187
1188        // 8. _cleanup_guard will auto-remove entry on drop
1189
1190        Ok(typed_value)
1191    }
1192
1193    /// Get comprehensive cache statistics
1194    ///
1195    /// In multi-tier mode, aggregates statistics from all tiers.
1196    /// In legacy mode, returns L1 and L2 stats.
1197    #[allow(dead_code)]
1198    pub fn get_stats(&self) -> CacheManagerStats {
1199        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1200        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1201        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1202        let misses = self.misses.load(Ordering::Relaxed);
1203
1204        CacheManagerStats {
1205            total_requests: total_reqs,
1206            l1_hits,
1207            l2_hits,
1208            total_hits: l1_hits + l2_hits,
1209            misses,
1210            hit_rate: if total_reqs > 0 {
1211                ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1212            } else { 0.0 },
1213            l1_hit_rate: if total_reqs > 0 {
1214                (l1_hits as f64 / total_reqs as f64) * 100.0
1215            } else { 0.0 },
1216            promotions: self.promotions.load(Ordering::Relaxed),
1217            in_flight_requests: self.in_flight_requests.len(),
1218        }
1219    }
1220
1221    /// Get per-tier statistics (v0.5.0+)
1222    ///
1223    /// Returns statistics for each tier if multi-tier mode is enabled.
1224    /// Returns None if using legacy 2-tier mode.
1225    ///
1226    /// # Example
1227    /// ```rust,ignore
1228    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1229    ///     for stats in tier_stats {
1230    ///         println!("L{}: {} hits ({})",
1231    ///                  stats.tier_level,
1232    ///                  stats.hit_count(),
1233    ///                  stats.backend_name);
1234    ///     }
1235    /// }
1236    /// ```
1237    pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1238        self.tiers.as_ref().map(|tiers| {
1239            tiers.iter().map(|tier| tier.stats.clone()).collect()
1240        })
1241    }
1242    
1243    // ===== Redis Streams Methods =====
1244    
1245    /// Publish data to Redis Stream
1246    ///
1247    /// # Arguments
1248    /// * `stream_key` - Name of the stream (e.g., "events_stream")
1249    /// * `fields` - Field-value pairs to publish
1250    /// * `maxlen` - Optional max length for stream trimming
1251    ///
1252    /// # Returns
1253    /// The entry ID generated by Redis
1254    ///
1255    /// # Errors
1256    /// Returns error if streaming backend is not configured
1257    pub async fn publish_to_stream(
1258        &self,
1259        stream_key: &str,
1260        fields: Vec<(String, String)>,
1261        maxlen: Option<usize>
1262    ) -> Result<String> {
1263        match &self.streaming_backend {
1264            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1265            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1266        }
1267    }
1268    
1269    /// Read latest entries from Redis Stream
1270    ///
1271    /// # Arguments
1272    /// * `stream_key` - Name of the stream
1273    /// * `count` - Number of latest entries to retrieve
1274    ///
1275    /// # Returns
1276    /// Vector of (entry_id, fields) tuples (newest first)
1277    ///
1278    /// # Errors
1279    /// Returns error if streaming backend is not configured
1280    pub async fn read_stream_latest(
1281        &self,
1282        stream_key: &str,
1283        count: usize
1284    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1285        match &self.streaming_backend {
1286            Some(backend) => backend.stream_read_latest(stream_key, count).await,
1287            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1288        }
1289    }
1290    
1291    /// Read from Redis Stream with optional blocking
1292    ///
1293    /// # Arguments
1294    /// * `stream_key` - Name of the stream
1295    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1296    /// * `count` - Max entries to retrieve
1297    /// * `block_ms` - Optional blocking timeout in ms
1298    ///
1299    /// # Returns
1300    /// Vector of (entry_id, fields) tuples
1301    ///
1302    /// # Errors
1303    /// Returns error if streaming backend is not configured
1304    pub async fn read_stream(
1305        &self,
1306        stream_key: &str,
1307        last_id: &str,
1308        count: usize,
1309        block_ms: Option<usize>
1310    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1311        match &self.streaming_backend {
1312            Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
1313            None => Err(anyhow::anyhow!("Streaming backend not configured"))
1314        }
1315    }
1316
1317    // ===== Cache Invalidation Methods =====
1318
1319    /// Invalidate a cache key across all instances
1320    ///
1321    /// This removes the key from all cache tiers and broadcasts
1322    /// the invalidation to all other cache instances via Redis Pub/Sub.
1323    ///
1324    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1325    ///
1326    /// # Arguments
1327    /// * `key` - Cache key to invalidate
1328    ///
1329    /// # Example
1330    /// ```rust,ignore
1331    /// // Invalidate user cache after profile update
1332    /// cache_manager.invalidate("user:123").await?;
1333    /// ```
1334    pub async fn invalidate(&self, key: &str) -> Result<()> {
1335        // NEW: Multi-tier mode (v0.5.0+)
1336        if let Some(tiers) = &self.tiers {
1337            // Remove from ALL tiers
1338            for tier in tiers {
1339                if let Err(e) = tier.remove(key).await {
1340                    warn!("Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1341                }
1342            }
1343        } else {
1344            // LEGACY: 2-tier mode
1345            self.l1_cache.remove(key).await?;
1346            self.l2_cache.remove(key).await?;
1347        }
1348
1349        // Broadcast to other instances
1350        if let Some(publisher) = &self.invalidation_publisher {
1351            let mut pub_lock = publisher.lock().await;
1352            let msg = InvalidationMessage::remove(key);
1353            pub_lock.publish(&msg).await?;
1354            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1355        }
1356
1357        debug!("Invalidated '{}' across all instances", key);
1358        Ok(())
1359    }
1360
1361    /// Update cache value across all instances
1362    ///
1363    /// This updates the key in all cache tiers and broadcasts
1364    /// the update to all other cache instances, avoiding cache misses.
1365    ///
1366    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1367    ///
1368    /// # Arguments
1369    /// * `key` - Cache key to update
1370    /// * `value` - New value
1371    /// * `ttl` - Optional TTL (uses default if None)
1372    ///
1373    /// # Example
1374    /// ```rust,ignore
1375    /// // Update user cache with new data
1376    /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1377    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1378    /// ```
1379    pub async fn update_cache(
1380        &self,
1381        key: &str,
1382        value: serde_json::Value,
1383        ttl: Option<Duration>,
1384    ) -> Result<()> {
1385        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1386
1387        // NEW: Multi-tier mode (v0.5.0+)
1388        if let Some(tiers) = &self.tiers {
1389            // Update ALL tiers with their respective TTL scaling
1390            for tier in tiers {
1391                if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1392                    warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1393                }
1394            }
1395        } else {
1396            // LEGACY: 2-tier mode
1397            self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1398            self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1399        }
1400
1401        // Broadcast update to other instances
1402        if let Some(publisher) = &self.invalidation_publisher {
1403            let mut pub_lock = publisher.lock().await;
1404            let msg = InvalidationMessage::update(key, value, Some(ttl));
1405            pub_lock.publish(&msg).await?;
1406            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1407        }
1408
1409        debug!("Updated '{}' across all instances", key);
1410        Ok(())
1411    }
1412
1413    /// Invalidate all keys matching a pattern
1414    ///
1415    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1416    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1417    ///
1418    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1419    ///
1420    /// **Note**: Pattern scanning requires a concrete L2Cache instance with `scan_keys()`.
1421    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1422    ///
1423    /// # Arguments
1424    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1425    ///
1426    /// # Example
1427    /// ```rust,ignore
1428    /// // Invalidate all user caches
1429    /// cache_manager.invalidate_pattern("user:*").await?;
1430    ///
1431    /// // Invalidate specific user's related caches
1432    /// cache_manager.invalidate_pattern("user:123:*").await?;
1433    /// ```
1434    pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1435        // Scan L2 for matching keys
1436        // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1437        let keys = if let Some(l2) = &self.l2_cache_concrete {
1438            l2.scan_keys(pattern).await?
1439        } else {
1440            return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
1441        };
1442
1443        if keys.is_empty() {
1444            debug!("No keys found matching pattern '{}'", pattern);
1445            return Ok(());
1446        }
1447
1448        // NEW: Multi-tier mode (v0.5.0+)
1449        if let Some(tiers) = &self.tiers {
1450            // Remove from ALL tiers
1451            for key in &keys {
1452                for tier in tiers {
1453                    if let Err(e) = tier.remove(key).await {
1454                        warn!("Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1455                    }
1456                }
1457            }
1458        } else {
1459            // LEGACY: 2-tier mode - Remove from L2 in bulk
1460            if let Some(l2) = &self.l2_cache_concrete {
1461                l2.remove_bulk(&keys).await?;
1462            }
1463        }
1464
1465        // Broadcast pattern invalidation
1466        if let Some(publisher) = &self.invalidation_publisher {
1467            let mut pub_lock = publisher.lock().await;
1468            let msg = InvalidationMessage::remove_bulk(keys.clone());
1469            pub_lock.publish(&msg).await?;
1470            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1471        }
1472
1473        debug!("Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
1474        Ok(())
1475    }
1476
1477    /// Set value with automatic broadcast to all instances
1478    ///
1479    /// This is a write-through operation that updates the cache and
1480    /// broadcasts the update to all other instances automatically.
1481    ///
1482    /// # Arguments
1483    /// * `key` - Cache key
1484    /// * `value` - Value to cache
1485    /// * `strategy` - Cache strategy (determines TTL)
1486    ///
1487    /// # Example
1488    /// ```rust,ignore
1489    /// // Update and broadcast in one call
1490    /// let data = serde_json::json!({"status": "active"});
1491    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1492    /// ```
1493    pub async fn set_with_broadcast(
1494        &self,
1495        key: &str,
1496        value: serde_json::Value,
1497        strategy: CacheStrategy,
1498    ) -> Result<()> {
1499        let ttl = strategy.to_duration();
1500
1501        // Set in local caches
1502        self.set_with_strategy(key, value.clone(), strategy).await?;
1503
1504        // Broadcast update if invalidation is enabled
1505        if let Some(publisher) = &self.invalidation_publisher {
1506            let mut pub_lock = publisher.lock().await;
1507            let msg = InvalidationMessage::update(key, value, Some(ttl));
1508            pub_lock.publish(&msg).await?;
1509            self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1510        }
1511
1512        Ok(())
1513    }
1514
1515    /// Get invalidation statistics
1516    ///
1517    /// Returns statistics about invalidation operations if invalidation is enabled.
1518    pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1519        if self.invalidation_subscriber.is_some() {
1520            Some(self.invalidation_stats.snapshot())
1521        } else {
1522            None
1523        }
1524    }
1525}
1526
1527/// Cache Manager statistics
1528#[allow(dead_code)]
1529#[derive(Debug, Clone)]
1530pub struct CacheManagerStats {
1531    pub total_requests: u64,
1532    pub l1_hits: u64,
1533    pub l2_hits: u64,
1534    pub total_hits: u64,
1535    pub misses: u64,
1536    pub hit_rate: f64,
1537    pub l1_hit_rate: f64,
1538    pub promotions: usize,
1539    pub in_flight_requests: usize,
1540}