multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use anyhow::Result;
6use dashmap::DashMap;
7use serde_json;
8use std::future::Future;
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13
14use tracing::{debug, error, info, warn};
15
16use super::invalidation::{
17    AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
18    InvalidationStats, InvalidationSubscriber,
19};
20use crate::backends::{L1Cache, L2Cache};
21use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
22
23/// Type alias for the in-flight requests map
24type InFlightMap = DashMap<String, Arc<Mutex<()>>>;
25
26/// RAII cleanup guard for in-flight request tracking
27/// Ensures that entries are removed from `DashMap` even on early return or panic
28struct CleanupGuard<'a> {
29    map: &'a InFlightMap,
30    key: String,
31}
32
33impl Drop for CleanupGuard<'_> {
34    fn drop(&mut self) {
35        self.map.remove(&self.key);
36    }
37}
38
39/// Cache strategies for different data types
40#[derive(Debug, Clone)]
41#[allow(dead_code)]
42pub enum CacheStrategy {
43    /// Real-time data - 10 seconds TTL
44    RealTime,
45    /// Short-term data - 5 minutes TTL  
46    ShortTerm,
47    /// Medium-term data - 1 hour TTL
48    MediumTerm,
49    /// Long-term data - 3 hours TTL
50    LongTerm,
51    /// Custom TTL
52    Custom(Duration),
53    /// Default strategy (5 minutes)
54    Default,
55}
56
57impl CacheStrategy {
58    /// Convert strategy to duration
59    #[must_use]
60    pub fn to_duration(&self) -> Duration {
61        match self {
62            Self::RealTime => Duration::from_secs(10),
63            Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
64            Self::MediumTerm => Duration::from_secs(3600),               // 1 hour
65            Self::LongTerm => Duration::from_secs(10800),                // 3 hours
66            Self::Custom(duration) => *duration,
67        }
68    }
69}
70
71/// Statistics for a single cache tier
72#[derive(Debug)]
73pub struct TierStats {
74    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
75    pub tier_level: usize,
76    /// Number of cache hits at this tier
77    pub hits: AtomicU64,
78    /// Backend name for identification
79    pub backend_name: String,
80}
81
82impl Clone for TierStats {
83    fn clone(&self) -> Self {
84        Self {
85            tier_level: self.tier_level,
86            hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
87            backend_name: self.backend_name.clone(),
88        }
89    }
90}
91
92impl TierStats {
93    fn new(tier_level: usize, backend_name: String) -> Self {
94        Self {
95            tier_level,
96            hits: AtomicU64::new(0),
97            backend_name,
98        }
99    }
100
101    /// Get current hit count
102    pub fn hit_count(&self) -> u64 {
103        self.hits.load(Ordering::Relaxed)
104    }
105}
106
107/// A single cache tier in the multi-tier architecture
108pub struct CacheTier {
109    /// Cache backend for this tier
110    backend: Arc<dyn L2CacheBackend>,
111    /// Tier level (1 = hottest/fastest, higher = colder/slower)
112    tier_level: usize,
113    /// Enable automatic promotion to upper tiers on cache hit
114    promotion_enabled: bool,
115    /// TTL scale factor (multiplier for TTL when storing/promoting)
116    ttl_scale: f64,
117    /// Statistics for this tier
118    stats: TierStats,
119}
120
121impl CacheTier {
122    /// Create a new cache tier
123    pub fn new(
124        backend: Arc<dyn L2CacheBackend>,
125        tier_level: usize,
126        promotion_enabled: bool,
127        ttl_scale: f64,
128    ) -> Self {
129        let backend_name = backend.name().to_string();
130        Self {
131            backend,
132            tier_level,
133            promotion_enabled,
134            ttl_scale,
135            stats: TierStats::new(tier_level, backend_name),
136        }
137    }
138
139    /// Get value with TTL from this tier
140    async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
141        self.backend.get_with_ttl(key).await
142    }
143
144    /// Set value with TTL in this tier
145    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
146        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
147        self.backend.set_with_ttl(key, value, scaled_ttl).await
148    }
149
150    /// Remove value from this tier
151    async fn remove(&self, key: &str) -> Result<()> {
152        self.backend.remove(key).await
153    }
154
155    /// Record a cache hit for this tier
156    fn record_hit(&self) {
157        self.stats.hits.fetch_add(1, Ordering::Relaxed);
158    }
159}
160
161/// Configuration for a cache tier (used in builder pattern)
162#[derive(Debug, Clone)]
163pub struct TierConfig {
164    /// Tier level (1, 2, 3, 4...)
165    pub tier_level: usize,
166    /// Enable promotion to upper tiers on hit
167    pub promotion_enabled: bool,
168    /// TTL scale factor (1.0 = same as base TTL)
169    pub ttl_scale: f64,
170}
171
172impl TierConfig {
173    /// Create new tier configuration
174    #[must_use]
175    pub fn new(tier_level: usize) -> Self {
176        Self {
177            tier_level,
178            promotion_enabled: true,
179            ttl_scale: 1.0,
180        }
181    }
182
183    /// Configure as L1 (hot tier)
184    #[must_use]
185    pub fn as_l1() -> Self {
186        Self {
187            tier_level: 1,
188            promotion_enabled: false, // L1 is already top tier
189            ttl_scale: 1.0,
190        }
191    }
192
193    /// Configure as L2 (warm tier)
194    #[must_use]
195    pub fn as_l2() -> Self {
196        Self {
197            tier_level: 2,
198            promotion_enabled: true,
199            ttl_scale: 1.0,
200        }
201    }
202
203    /// Configure as L3 (cold tier) with longer TTL
204    #[must_use]
205    pub fn as_l3() -> Self {
206        Self {
207            tier_level: 3,
208            promotion_enabled: true,
209            ttl_scale: 2.0, // Keep data 2x longer
210        }
211    }
212
213    /// Configure as L4 (archive tier) with much longer TTL
214    #[must_use]
215    pub fn as_l4() -> Self {
216        Self {
217            tier_level: 4,
218            promotion_enabled: true,
219            ttl_scale: 8.0, // Keep data 8x longer
220        }
221    }
222
223    /// Set promotion enabled
224    #[must_use]
225    pub fn with_promotion(mut self, enabled: bool) -> Self {
226        self.promotion_enabled = enabled;
227        self
228    }
229
230    /// Set TTL scale factor
231    #[must_use]
232    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
233        self.ttl_scale = scale;
234        self
235    }
236
237    /// Set tier level
238    #[must_use]
239    pub fn with_level(mut self, level: usize) -> Self {
240        self.tier_level = level;
241        self
242    }
243}
244
245/// Proxy wrapper to convert `L2CacheBackend` to `CacheBackend`
246/// (Rust doesn't support automatic trait upcasting for trait objects)
247struct ProxyCacheBackend {
248    backend: Arc<dyn L2CacheBackend>,
249}
250
251#[async_trait::async_trait]
252impl CacheBackend for ProxyCacheBackend {
253    async fn get(&self, key: &str) -> Option<serde_json::Value> {
254        self.backend.get(key).await
255    }
256
257    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
258        self.backend.set_with_ttl(key, value, ttl).await
259    }
260
261    async fn remove(&self, key: &str) -> Result<()> {
262        self.backend.remove(key).await
263    }
264
265    async fn health_check(&self) -> bool {
266        self.backend.health_check().await
267    }
268
269    fn name(&self) -> &'static str {
270        self.backend.name()
271    }
272}
273
274/// Cache Manager - Unified operations across multiple cache tiers
275///
276/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
277/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
278/// legacy L1+L2 behavior for backward compatibility.
279pub struct CacheManager {
280    /// Dynamic multi-tier cache architecture (v0.5.0+)
281    /// If Some, this takes precedence over `l1_cache/l2_cache` fields
282    tiers: Option<Vec<CacheTier>>,
283
284    // ===== Legacy fields (v0.1.0 - v0.4.x) =====
285    // Maintained for backward compatibility
286    /// L1 Cache (trait object for pluggable backends)
287    l1_cache: Arc<dyn CacheBackend>,
288    /// L2 Cache (trait object for pluggable backends)
289    l2_cache: Arc<dyn L2CacheBackend>,
290    /// L2 Cache concrete instance (for invalidation `scan_keys`)
291    l2_cache_concrete: Option<Arc<L2Cache>>,
292
293    /// Optional streaming backend (defaults to L2 if it implements `StreamingBackend`)
294    streaming_backend: Option<Arc<dyn StreamingBackend>>,
295    /// Statistics (`AtomicU64` is already thread-safe, no Arc needed)
296    total_requests: AtomicU64,
297    l1_hits: AtomicU64,
298    l2_hits: AtomicU64,
299    misses: AtomicU64,
300    promotions: AtomicUsize,
301    /// In-flight requests to prevent Cache Stampede on L2/compute operations
302    in_flight_requests: Arc<InFlightMap>,
303    /// Invalidation publisher (for broadcasting invalidation messages)
304    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
305    /// Invalidation subscriber (for receiving invalidation messages)
306    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
307    /// Invalidation statistics
308    invalidation_stats: Arc<AtomicInvalidationStats>,
309}
310
311impl CacheManager {
312    /// Create new cache manager with trait objects (pluggable backends)
313    ///
314    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
315    ///
316    /// # Arguments
317    ///
318    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
319    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
320    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
321    ///
322    /// # Example
323    ///
324    /// ```rust,ignore
325    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
326    /// use std::sync::Arc;
327    ///
328    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
329    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
330    ///
331    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
332    /// ```
333    /// # Errors
334    ///
335    /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
336    pub fn new_with_backends(
337        l1_cache: Arc<dyn CacheBackend>,
338        l2_cache: Arc<dyn L2CacheBackend>,
339        streaming_backend: Option<Arc<dyn StreamingBackend>>,
340    ) -> Result<Self> {
341        debug!("Initializing Cache Manager with custom backends...");
342        debug!("  L1: {}", l1_cache.name());
343        debug!("  L2: {}", l2_cache.name());
344        if streaming_backend.is_some() {
345            debug!("  Streaming: enabled");
346        } else {
347            debug!("  Streaming: disabled");
348        }
349
350        Ok(Self {
351            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
352            l1_cache,
353            l2_cache,
354            l2_cache_concrete: None,
355            streaming_backend,
356            total_requests: AtomicU64::new(0),
357            l1_hits: AtomicU64::new(0),
358            l2_hits: AtomicU64::new(0),
359            misses: AtomicU64::new(0),
360            promotions: AtomicUsize::new(0),
361            in_flight_requests: Arc::new(DashMap::new()),
362            invalidation_publisher: None,
363            invalidation_subscriber: None,
364            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
365        })
366    }
367
368    /// Create new cache manager with default backends (backward compatible)
369    ///
370    /// This is the legacy constructor maintained for backward compatibility.
371    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
372    ///
373    /// # Arguments
374    ///
375    /// * `l1_cache` - Moka L1 cache instance
376    /// * `l2_cache` - Redis L2 cache instance
377    /// # Errors
378    ///
379    /// Returns an error if Redis connection fails.
380    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
381        debug!("Initializing Cache Manager...");
382
383        // Convert concrete types to trait objects
384        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
385        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
386
387        // Create RedisStreams backend for streaming functionality
388        let redis_url =
389            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
390        let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
391        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
392
393        Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend))
394    }
395
396    /// Create new cache manager with invalidation support
397    ///
398    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
399    ///
400    /// # Arguments
401    ///
402    /// * `l1_cache` - Moka L1 cache instance
403    /// * `l2_cache` - Redis L2 cache instance
404    /// * `redis_url` - Redis connection URL for Pub/Sub
405    /// * `config` - Invalidation configuration
406    ///
407    /// # Example
408    ///
409    /// ```rust,ignore
410    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
411    ///
412    /// let config = InvalidationConfig {
413    ///     channel: "my_app:cache:invalidate".to_string(),
414    ///     ..Default::default()
415    /// };
416    ///
417    /// let manager = CacheManager::new_with_invalidation(
418    ///     l1, l2, "redis://localhost", config
419    /// ).await?;
420    /// ```
421    /// # Errors
422    ///
423    /// Returns an error if Redis connection fails or invalidation setup fails.
424    pub async fn new_with_invalidation(
425        l1_cache: Arc<L1Cache>,
426        l2_cache: Arc<L2Cache>,
427        redis_url: &str,
428        config: InvalidationConfig,
429    ) -> Result<Self> {
430        debug!("Initializing Cache Manager with Invalidation...");
431        debug!("  Pub/Sub channel: {}", config.channel);
432
433        // Convert concrete types to trait objects
434        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
435        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
436
437        // Create RedisStreams backend for streaming functionality
438        let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
439        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
440
441        // Create publisher
442        let client = redis::Client::open(redis_url)?;
443        let conn_manager = redis::aio::ConnectionManager::new(client).await?;
444        let publisher = InvalidationPublisher::new(conn_manager, config.clone());
445
446        // Create subscriber
447        let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
448        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
449
450        let manager = Self {
451            tiers: None, // Legacy mode: use l1_cache/l2_cache fields
452            l1_cache: l1_backend,
453            l2_cache: l2_backend,
454            l2_cache_concrete: Some(l2_cache),
455            streaming_backend: Some(streaming_backend),
456            total_requests: AtomicU64::new(0),
457            l1_hits: AtomicU64::new(0),
458            l2_hits: AtomicU64::new(0),
459            misses: AtomicU64::new(0),
460            promotions: AtomicUsize::new(0),
461            in_flight_requests: Arc::new(DashMap::new()),
462            invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
463            invalidation_subscriber: Some(Arc::new(subscriber)),
464            invalidation_stats,
465        };
466
467        // Start subscriber with handler
468        manager.start_invalidation_subscriber();
469
470        info!("Cache Manager initialized with invalidation support");
471
472        Ok(manager)
473    }
474
475    /// Create new cache manager with multi-tier architecture (v0.5.0+)
476    ///
477    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
478    /// Tiers are checked in order (lower `tier_level` = faster/hotter).
479    ///
480    /// # Arguments
481    ///
482    /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
483    /// * `streaming_backend` - Optional streaming backend
484    ///
485    /// # Example
486    ///
487    /// ```rust,ignore
488    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
489    /// use std::sync::Arc;
490    ///
491    /// // L1 + L2 + L3 setup
492    /// let l1 = Arc::new(L1Cache::new()?);
493    /// let l2 = Arc::new(L2Cache::new().await?);
494    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
495    ///
496    /// let tiers = vec![
497    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
498    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
499    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
500    /// ];
501    ///
502    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
503    /// ```
504    /// # Errors
505    ///
506    /// Returns an error if tiers are not sorted by level or if no tiers are provided.
507    pub fn new_with_tiers(
508        tiers: Vec<CacheTier>,
509        streaming_backend: Option<Arc<dyn StreamingBackend>>,
510    ) -> Result<Self> {
511        debug!("Initializing Multi-Tier Cache Manager...");
512        debug!("  Tier count: {}", tiers.len());
513        for tier in &tiers {
514            debug!(
515                "  L{}: {} (promotion={}, ttl_scale={})",
516                tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
517            );
518        }
519
520        // Validate tiers are sorted by level
521        for i in 1..tiers.len() {
522            if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1)) {
523                if current.tier_level <= prev.tier_level {
524                    anyhow::bail!(
525                        "Tiers must be sorted by tier_level ascending (found L{} after L{})",
526                        current.tier_level,
527                        prev.tier_level
528                    );
529                }
530            }
531        }
532
533        // For backward compatibility with legacy code, we need dummy l1/l2 caches
534        // Use first tier as l1, second tier as l2 if available
535        let (l1_cache, l2_cache) = if tiers.len() >= 2 {
536            if let (Some(t0), Some(t1)) = (tiers.first(), tiers.get(1)) {
537                (t0.backend.clone(), t1.backend.clone())
538            } else {
539                // Should be unreachable due to len check
540                anyhow::bail!("Failed to access tiers 0 and 1");
541            }
542        } else if tiers.len() == 1 {
543            // Only one tier - use it for both
544            if let Some(t0) = tiers.first() {
545                let tier = t0.backend.clone();
546                (tier.clone(), tier)
547            } else {
548                anyhow::bail!("Failed to access tier 0");
549            }
550        } else {
551            anyhow::bail!("At least one cache tier is required");
552        };
553
554        // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
555        let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
556            backend: l1_cache.clone(),
557        });
558
559        Ok(Self {
560            tiers: Some(tiers),
561            l1_cache: l1_backend,
562            l2_cache,
563            l2_cache_concrete: None,
564            streaming_backend,
565            total_requests: AtomicU64::new(0),
566            l1_hits: AtomicU64::new(0),
567            l2_hits: AtomicU64::new(0),
568            misses: AtomicU64::new(0),
569            promotions: AtomicUsize::new(0),
570            in_flight_requests: Arc::new(DashMap::new()),
571            invalidation_publisher: None,
572            invalidation_subscriber: None,
573            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
574        })
575    }
576
577    /// Start the invalidation subscriber background task
578    fn start_invalidation_subscriber(&self) {
579        if let Some(subscriber) = &self.invalidation_subscriber {
580            let l1_cache = Arc::clone(&self.l1_cache);
581            let l2_cache_concrete = self.l2_cache_concrete.clone();
582
583            subscriber.start(move |msg| {
584                let l1 = Arc::clone(&l1_cache);
585                let _l2 = l2_cache_concrete.clone();
586
587                async move {
588                    match msg {
589                        InvalidationMessage::Remove { key } => {
590                            // Remove from L1
591                            l1.remove(&key).await?;
592                            debug!("Invalidation: Removed '{}' from L1", key);
593                        }
594                        InvalidationMessage::Update {
595                            key,
596                            value,
597                            ttl_secs,
598                        } => {
599                            // Update L1 with new value
600                            let ttl = ttl_secs
601                                .map_or_else(|| Duration::from_secs(300), Duration::from_secs);
602                            l1.set_with_ttl(&key, value, ttl).await?;
603                            debug!("Invalidation: Updated '{}' in L1", key);
604                        }
605                        InvalidationMessage::RemovePattern { pattern } => {
606                            // Remove matching keys from L1
607                            if let Err(e) = l1.remove_pattern(&pattern).await {
608                                warn!("Failed to remove pattern '{}' from L1: {}", pattern, e);
609                            }
610                            debug!("Invalidation: Pattern '{}' removed from L1", pattern);
611                        }
612                        InvalidationMessage::RemoveBulk { keys } => {
613                            // Remove multiple keys from L1
614                            for key in keys {
615                                if let Err(e) = l1.remove(&key).await {
616                                    warn!("Failed to remove '{}' from L1: {}", key, e);
617                                }
618                            }
619                            debug!("Invalidation: Bulk removed keys from L1");
620                        }
621                    }
622                    Ok(())
623                }
624            });
625
626            info!("Invalidation subscriber started");
627        }
628    }
629
630    /// Get value from cache using multi-tier architecture (v0.5.0+)
631    ///
632    /// This method iterates through all configured tiers and automatically promotes
633    /// to upper tiers on cache hit.
634    async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
635        let Some(tiers) = self.tiers.as_ref() else {
636            panic!("Tiers must be initialized in multi-tier mode")
637        }; // Safe: only called when tiers is Some
638
639        // Try each tier sequentially (sorted by tier_level)
640        for (tier_index, tier) in tiers.iter().enumerate() {
641            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
642                // Cache hit!
643                tier.record_hit();
644
645                // Promote to all upper tiers (if promotion enabled)
646                if tier.promotion_enabled && tier_index > 0 {
647                    let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
648
649                    // Promote to all tiers above this one
650                    for upper_tier in tiers.iter().take(tier_index).rev() {
651                        if let Err(e) = upper_tier
652                            .set_with_ttl(key, value.clone(), promotion_ttl)
653                            .await
654                        {
655                            warn!(
656                                "Failed to promote '{}' from L{} to L{}: {}",
657                                key, tier.tier_level, upper_tier.tier_level, e
658                            );
659                        } else {
660                            self.promotions.fetch_add(1, Ordering::Relaxed);
661                            debug!(
662                                "Promoted '{}' from L{} to L{} (TTL: {:?})",
663                                key, tier.tier_level, upper_tier.tier_level, promotion_ttl
664                            );
665                        }
666                    }
667                }
668
669                return Ok(Some(value));
670            }
671        }
672
673        // Cache miss across all tiers
674        Ok(None)
675    }
676
677    /// Get value from cache (L1 first, then L2 fallback with promotion)
678    ///
679    /// This method now includes built-in Cache Stampede protection when cache misses occur.
680    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
681    /// unnecessary duplicate work on external data sources.
682    ///
683    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
684    ///
685    /// # Arguments
686    /// * `key` - Cache key to retrieve
687    ///
688    /// # Returns
689    /// * `Ok(Some(value))` - Cache hit, value found in any tier
690    /// * `Ok(None)` - Cache miss, value not found in any cache
691    /// * `Err(error)` - Cache operation failed
692    /// # Errors
693    ///
694    /// Returns an error if cache operation fails.
695    ///
696    /// # Panics
697    ///
698    /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
699    pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
700        self.total_requests.fetch_add(1, Ordering::Relaxed);
701
702        // NEW: Multi-tier mode (v0.5.0+)
703        if self.tiers.is_some() {
704            // Fast path for L1 (first tier) - no locking needed
705            if let Some(tier1) = self
706                .tiers
707                .as_ref()
708                .unwrap_or_else(|| panic!("Tiers initialized"))
709                .first()
710            {
711                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
712                    tier1.record_hit();
713                    // Update legacy stats for backward compatibility
714                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
715                    return Ok(Some(value));
716                }
717            }
718
719            // L1 miss - use stampede protection for lower tiers
720            let key_owned = key.to_string();
721            let lock_guard = self
722                .in_flight_requests
723                .entry(key_owned.clone())
724                .or_insert_with(|| Arc::new(Mutex::new(())))
725                .clone();
726
727            let _guard = lock_guard.lock().await;
728            let cleanup_guard = CleanupGuard {
729                map: &self.in_flight_requests,
730                key: key_owned.clone(),
731            };
732
733            // Double-check L1 after acquiring lock
734            if let Some(tier1) = self
735                .tiers
736                .as_ref()
737                .unwrap_or_else(|| panic!("Tiers initialized"))
738                .first()
739            {
740                if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
741                    tier1.record_hit();
742                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
743                    return Ok(Some(value));
744                }
745            }
746
747            // Check remaining tiers with promotion
748            let result = self.get_multi_tier(key).await?;
749
750            if result.is_some() {
751                // Hit in L2+ tier - update legacy stats
752                if self
753                    .tiers
754                    .as_ref()
755                    .unwrap_or_else(|| panic!("Tiers initialized"))
756                    .len()
757                    >= 2
758                {
759                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
760                }
761            } else {
762                self.misses.fetch_add(1, Ordering::Relaxed);
763            }
764
765            drop(cleanup_guard);
766            return Ok(result);
767        }
768
769        // LEGACY: 2-tier mode (L1 + L2)
770        // Fast path: Try L1 first (no locking needed)
771        if let Some(value) = self.l1_cache.get(key).await {
772            self.l1_hits.fetch_add(1, Ordering::Relaxed);
773            return Ok(Some(value));
774        }
775
776        // L1 miss - implement Cache Stampede protection for L2 lookup
777        let key_owned = key.to_string();
778        let lock_guard = self
779            .in_flight_requests
780            .entry(key_owned.clone())
781            .or_insert_with(|| Arc::new(Mutex::new(())))
782            .clone();
783
784        let _guard = lock_guard.lock().await;
785
786        // RAII cleanup guard - ensures entry is removed even on early return or panic
787        let cleanup_guard = CleanupGuard {
788            map: &self.in_flight_requests,
789            key: key_owned.clone(),
790        };
791
792        // Double-check L1 cache after acquiring lock
793        // (Another concurrent request might have populated it while we were waiting)
794        if let Some(value) = self.l1_cache.get(key).await {
795            self.l1_hits.fetch_add(1, Ordering::Relaxed);
796            // cleanup_guard will auto-remove entry on drop
797            return Ok(Some(value));
798        }
799
800        // Check L2 cache with TTL information
801        if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
802            self.l2_hits.fetch_add(1, Ordering::Relaxed);
803
804            // Promote to L1 with same TTL as Redis (or default if no TTL)
805            let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
806
807            if self
808                .l1_cache
809                .set_with_ttl(key, value.clone(), promotion_ttl)
810                .await
811                .is_err()
812            {
813                // L1 promotion failed, but we still have the data
814                warn!("Failed to promote key '{}' to L1 cache", key);
815            } else {
816                self.promotions.fetch_add(1, Ordering::Relaxed);
817                debug!(
818                    "Promoted '{}' from L2 to L1 with TTL {:?} (via get)",
819                    key, promotion_ttl
820                );
821            }
822
823            // cleanup_guard will auto-remove entry on drop
824            return Ok(Some(value));
825        }
826
827        // Both L1 and L2 miss
828        self.misses.fetch_add(1, Ordering::Relaxed);
829
830        // cleanup_guard will auto-remove entry on drop
831        drop(cleanup_guard);
832
833        Ok(None)
834    }
835
836    /// Set value with specific cache strategy (all tiers)
837    ///
838    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
839    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
840    /// # Errors
841    ///
842    /// Returns an error if cache set operation fails.
843    pub async fn set_with_strategy(
844        &self,
845        key: &str,
846        value: serde_json::Value,
847        strategy: CacheStrategy,
848    ) -> Result<()> {
849        let ttl = strategy.to_duration();
850
851        // NEW: Multi-tier mode (v0.5.0+)
852        if let Some(tiers) = &self.tiers {
853            // Store in ALL tiers with their respective TTL scaling
854            let mut success_count = 0;
855            let mut last_error = None;
856
857            for tier in tiers {
858                match tier.set_with_ttl(key, value.clone(), ttl).await {
859                    Ok(()) => {
860                        success_count += 1;
861                    }
862                    Err(e) => {
863                        error!(
864                            "L{} cache set failed for key '{}': {}",
865                            tier.tier_level, key, e
866                        );
867                        last_error = Some(e);
868                    }
869                }
870            }
871
872            if success_count > 0 {
873                debug!(
874                    "[Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
875                    key,
876                    success_count,
877                    tiers.len(),
878                    ttl
879                );
880                return Ok(());
881            }
882            return Err(
883                last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{key}'"))
884            );
885        }
886
887        // LEGACY: 2-tier mode (L1 + L2)
888        // Store in both L1 and L2
889        let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
890        let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
891
892        // Return success if at least one cache succeeded
893        match (l1_result, l2_result) {
894            (Ok(()), Ok(())) => {
895                // Both succeeded
896                debug!("[L1+L2] Cached '{}' with TTL {:?}", key, ttl);
897                Ok(())
898            }
899            (Ok(()), Err(_)) => {
900                // L1 succeeded, L2 failed
901                warn!("L2 cache set failed for key '{}', continuing with L1", key);
902                debug!("[L1] Cached '{}' with TTL {:?}", key, ttl);
903                Ok(())
904            }
905            (Err(_), Ok(())) => {
906                // L1 failed, L2 succeeded
907                warn!("L1 cache set failed for key '{}', continuing with L2", key);
908                debug!("[L2] Cached '{}' with TTL {:?}", key, ttl);
909                Ok(())
910            }
911            (Err(e1), Err(_e2)) => {
912                // Both failed
913                Err(anyhow::anyhow!(
914                    "Both L1 and L2 cache set failed for key '{key}': {e1}"
915                ))
916            }
917        }
918    }
919
920    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
921    ///
922    /// This method provides comprehensive Cache Stampede protection:
923    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
924    /// 2. Check L2 cache with mutex-based coalescing
925    /// 3. Compute fresh data with protection against concurrent computations
926    ///
927    /// # Arguments
928    /// * `key` - Cache key
929    /// * `strategy` - Cache strategy for TTL and storage behavior
930    /// * `compute_fn` - Async function to compute the value if not in any cache
931    ///
932    /// # Example
933    /// ```ignore
934    /// let api_data = cache_manager.get_or_compute_with(
935    ///     "api_response",
936    ///     CacheStrategy::RealTime,
937    ///     || async {
938    ///         fetch_data_from_api().await
939    ///     }
940    /// ).await?;
941    /// ```
942    #[allow(dead_code)]
943    /// # Errors
944    ///
945    /// Returns an error if compute function fails or cache operations fail.
946    pub async fn get_or_compute_with<F, Fut>(
947        &self,
948        key: &str,
949        strategy: CacheStrategy,
950        compute_fn: F,
951    ) -> Result<serde_json::Value>
952    where
953        F: FnOnce() -> Fut + Send,
954        Fut: Future<Output = Result<serde_json::Value>> + Send,
955    {
956        self.total_requests.fetch_add(1, Ordering::Relaxed);
957
958        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
959        if let Some(value) = self.l1_cache.get(key).await {
960            self.l1_hits.fetch_add(1, Ordering::Relaxed);
961            return Ok(value);
962        }
963
964        // 2. L1 miss - try L2 with Cache Stampede protection
965        let key_owned = key.to_string();
966        let lock_guard = self
967            .in_flight_requests
968            .entry(key_owned.clone())
969            .or_insert_with(|| Arc::new(Mutex::new(())))
970            .clone();
971
972        let _guard = lock_guard.lock().await;
973
974        // RAII cleanup guard - ensures entry is removed even on early return or panic
975        let _cleanup_guard = CleanupGuard {
976            map: &self.in_flight_requests,
977            key: key_owned,
978        };
979
980        // 3. Double-check L1 cache after acquiring lock
981        // (Another request might have populated it while we were waiting)
982        if let Some(value) = self.l1_cache.get(key).await {
983            self.l1_hits.fetch_add(1, Ordering::Relaxed);
984            // _cleanup_guard will auto-remove entry on drop
985            return Ok(value);
986        }
987
988        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
989        if let Some(tiers) = &self.tiers {
990            // Check tiers starting from index 1 (skip L1 since already checked)
991            for tier in tiers.iter().skip(1) {
992                if let Some((value, ttl)) = tier.get_with_ttl(key).await {
993                    tier.record_hit();
994
995                    let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
996
997                    // Promote to L1 (first tier)
998                    if let Some(l1_tier) = tiers.first() {
999                        if let Err(e) = l1_tier
1000                            .set_with_ttl(key, value.clone(), promotion_ttl)
1001                            .await
1002                        {
1003                            warn!(
1004                                "Failed to promote '{}' from L{} to L1: {}",
1005                                key, tier.tier_level, e
1006                            );
1007                        } else {
1008                            self.promotions.fetch_add(1, Ordering::Relaxed);
1009                            debug!(
1010                                "Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1011                                key, tier.tier_level, promotion_ttl
1012                            );
1013                        }
1014                    }
1015
1016                    // _cleanup_guard will auto-remove entry on drop
1017                    return Ok(value);
1018                }
1019            }
1020        } else {
1021            // LEGACY: Check L2 cache with TTL
1022            if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1023                self.l2_hits.fetch_add(1, Ordering::Relaxed);
1024
1025                // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1026                let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1027
1028                if let Err(e) = self
1029                    .l1_cache
1030                    .set_with_ttl(key, value.clone(), promotion_ttl)
1031                    .await
1032                {
1033                    warn!("Failed to promote key '{}' to L1: {}", key, e);
1034                } else {
1035                    self.promotions.fetch_add(1, Ordering::Relaxed);
1036                    debug!(
1037                        "Promoted '{}' from L2 to L1 with TTL {:?}",
1038                        key, promotion_ttl
1039                    );
1040                }
1041
1042                // _cleanup_guard will auto-remove entry on drop
1043                return Ok(value);
1044            }
1045        }
1046
1047        // 5. Cache miss across all tiers - compute fresh data
1048        debug!(
1049            "Computing fresh data for key: '{}' (Cache Stampede protected)",
1050            key
1051        );
1052        let fresh_data = compute_fn().await?;
1053
1054        // 6. Store in both caches
1055        if let Err(e) = self
1056            .set_with_strategy(key, fresh_data.clone(), strategy)
1057            .await
1058        {
1059            warn!("Failed to cache computed data for key '{}': {}", key, e);
1060        }
1061
1062        // 7. _cleanup_guard will auto-remove entry on drop
1063
1064        Ok(fresh_data)
1065    }
1066
1067    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
1068    ///
1069    /// This method provides the same functionality as `get_or_compute_with()` but with
1070    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
1071    /// API calls, or any computation that returns structured data.
1072    ///
1073    /// # Type Safety
1074    ///
1075    /// - Returns your actual type `T` instead of `serde_json::Value`
1076    /// - Compiler enforces Serialize + `DeserializeOwned` bounds
1077    /// - No manual JSON conversion needed
1078    ///
1079    /// # Cache Flow
1080    ///
1081    /// 1. Check L1 cache → deserialize if found
1082    /// 2. Check L2 cache → deserialize + promote to L1 if found
1083    /// 3. Execute `compute_fn` → serialize → store in L1+L2
1084    /// 4. Full stampede protection (only ONE request computes)
1085    ///
1086    /// # Arguments
1087    ///
1088    /// * `key` - Cache key
1089    /// * `strategy` - Cache strategy for TTL
1090    /// * `compute_fn` - Async function returning `Result<T>`
1091    ///
1092    /// # Example - Database Query
1093    ///
1094    /// ```no_run
1095    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1096    /// # use std::sync::Arc;
1097    /// # use serde::{Serialize, Deserialize};
1098    /// # async fn example() -> anyhow::Result<()> {
1099    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1100    /// # let l2 = Arc::new(L2Cache::new().await?);
1101    /// # let cache_manager = CacheManager::new(l1, l2);
1102    ///
1103    /// #[derive(Serialize, Deserialize)]
1104    /// struct User {
1105    ///     id: i64,
1106    ///     name: String,
1107    /// }
1108    ///
1109    /// // Type-safe database caching (example - requires sqlx)
1110    /// // let user: User = cache_manager.get_or_compute_typed(
1111    /// //     "user:123",
1112    /// //     CacheStrategy::MediumTerm,
1113    /// //     || async {
1114    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1115    /// //             .bind(123)
1116    /// //             .fetch_one(&pool)
1117    /// //             .await
1118    /// //     }
1119    /// // ).await?;
1120    /// # Ok(())
1121    /// # }
1122    /// ```
1123    ///
1124    /// # Example - API Call
1125    ///
1126    /// ```no_run
1127    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1128    /// # use std::sync::Arc;
1129    /// # use serde::{Serialize, Deserialize};
1130    /// # async fn example() -> anyhow::Result<()> {
1131    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1132    /// # let l2 = Arc::new(L2Cache::new().await?);
1133    /// # let cache_manager = CacheManager::new(l1, l2);
1134    /// #[derive(Serialize, Deserialize)]
1135    /// struct ApiResponse {
1136    ///     data: String,
1137    ///     timestamp: i64,
1138    /// }
1139    ///
1140    /// // API call caching (example - requires reqwest)
1141    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1142    /// //     "api:endpoint",
1143    /// //     CacheStrategy::RealTime,
1144    /// //     || async {
1145    /// //         reqwest::get("https://api.example.com/data")
1146    /// //             .await?
1147    /// //             .json::<ApiResponse>()
1148    /// //             .await
1149    /// //     }
1150    /// // ).await?;
1151    /// # Ok(())
1152    /// # }
1153    /// ```
1154    ///
1155    /// # Performance
1156    ///
1157    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1158    /// - L2 hit: 2-5ms + deserialization + L1 promotion
1159    /// - Compute: Your function time + serialization + L1+L2 storage
1160    /// - Stampede protection: 99.6% latency reduction under high concurrency
1161    ///
1162    /// # Errors
1163    ///
1164    /// Returns error if:
1165    /// - Compute function fails
1166    /// - Serialization fails (invalid type for JSON)
1167    /// - Deserialization fails (cache data doesn't match type T)
1168    /// - Cache operations fail (Redis connection issues)
1169    #[allow(clippy::too_many_lines)]
1170    pub async fn get_or_compute_typed<T, F, Fut>(
1171        &self,
1172        key: &str,
1173        strategy: CacheStrategy,
1174        compute_fn: F,
1175    ) -> Result<T>
1176    where
1177        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1178        F: FnOnce() -> Fut + Send,
1179        Fut: Future<Output = Result<T>> + Send,
1180    {
1181        self.total_requests.fetch_add(1, Ordering::Relaxed);
1182
1183        // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1184        if let Some(cached_json) = self.l1_cache.get(key).await {
1185            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1186
1187            // Attempt to deserialize from JSON to type T
1188            match serde_json::from_value::<T>(cached_json) {
1189                Ok(typed_value) => {
1190                    debug!(
1191                        "[L1 HIT] Deserialized '{}' to type {}",
1192                        key,
1193                        std::any::type_name::<T>()
1194                    );
1195                    return Ok(typed_value);
1196                }
1197                Err(e) => {
1198                    // Deserialization failed - cache data may be stale or corrupt
1199                    warn!(
1200                        "L1 cache deserialization failed for key '{}': {}. Will recompute.",
1201                        key, e
1202                    );
1203                    // Fall through to recompute
1204                }
1205            }
1206        }
1207
1208        // 2. L1 miss - try L2 with Cache Stampede protection
1209        let key_owned = key.to_string();
1210        let lock_guard = self
1211            .in_flight_requests
1212            .entry(key_owned.clone())
1213            .or_insert_with(|| Arc::new(Mutex::new(())))
1214            .clone();
1215
1216        let _guard = lock_guard.lock().await;
1217
1218        // RAII cleanup guard - ensures entry is removed even on early return or panic
1219        let _cleanup_guard = CleanupGuard {
1220            map: &self.in_flight_requests,
1221            key: key_owned,
1222        };
1223
1224        // 3. Double-check L1 cache after acquiring lock
1225        // (Another request might have populated it while we were waiting)
1226        if let Some(cached_json) = self.l1_cache.get(key).await {
1227            self.l1_hits.fetch_add(1, Ordering::Relaxed);
1228            if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1229                debug!("[L1 HIT] Deserialized '{}' after lock acquisition", key);
1230                return Ok(typed_value);
1231            }
1232        }
1233
1234        // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1235        if let Some(tiers) = &self.tiers {
1236            // Check tiers starting from index 1 (skip L1 since already checked)
1237            for tier in tiers.iter().skip(1) {
1238                if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1239                    tier.record_hit();
1240
1241                    // Attempt to deserialize
1242                    match serde_json::from_value::<T>(cached_json.clone()) {
1243                        Ok(typed_value) => {
1244                            debug!(
1245                                "[L{} HIT] Deserialized '{}' to type {}",
1246                                tier.tier_level,
1247                                key,
1248                                std::any::type_name::<T>()
1249                            );
1250
1251                            // Promote to L1 (first tier)
1252                            let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1253                            if let Some(tier) = tiers.first() {
1254                                if let Err(e) =
1255                                    tier.set_with_ttl(key, cached_json, promotion_ttl).await
1256                                {
1257                                    warn!(
1258                                        "Failed to promote '{}' from L{} to L1: {}",
1259                                        key, tier.tier_level, e
1260                                    );
1261                                } else {
1262                                    self.promotions.fetch_add(1, Ordering::Relaxed);
1263                                    debug!("Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1264                                            key, tier.tier_level, promotion_ttl);
1265                                }
1266                            }
1267
1268                            return Ok(typed_value);
1269                        }
1270                        Err(e) => {
1271                            warn!("L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1272                                     tier.tier_level, key, e);
1273                            // Continue to next tier
1274                        }
1275                    }
1276                }
1277            }
1278        } else {
1279            // LEGACY: Check L2 cache with TTL
1280            if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1281                self.l2_hits.fetch_add(1, Ordering::Relaxed);
1282
1283                // Attempt to deserialize
1284                match serde_json::from_value::<T>(cached_json.clone()) {
1285                    Ok(typed_value) => {
1286                        debug!("[L2 HIT] Deserialized '{}' from Redis", key);
1287
1288                        // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1289                        let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1290
1291                        if let Err(e) = self
1292                            .l1_cache
1293                            .set_with_ttl(key, cached_json, promotion_ttl)
1294                            .await
1295                        {
1296                            warn!("Failed to promote key '{}' to L1: {}", key, e);
1297                        } else {
1298                            self.promotions.fetch_add(1, Ordering::Relaxed);
1299                            debug!(
1300                                "Promoted '{}' from L2 to L1 with TTL {:?}",
1301                                key, promotion_ttl
1302                            );
1303                        }
1304
1305                        return Ok(typed_value);
1306                    }
1307                    Err(e) => {
1308                        warn!(
1309                            "L2 cache deserialization failed for key '{}': {}. Will recompute.",
1310                            key, e
1311                        );
1312                        // Fall through to recompute
1313                    }
1314                }
1315            }
1316        }
1317
1318        // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1319        debug!(
1320            "Computing fresh typed data for key: '{}' (Cache Stampede protected)",
1321            key
1322        );
1323        let typed_value = compute_fn().await?;
1324
1325        // 6. Serialize to JSON for storage
1326        let json_value = serde_json::to_value(&typed_value).map_err(|e| {
1327            anyhow::anyhow!(
1328                "Failed to serialize type {} for caching: {}",
1329                std::any::type_name::<T>(),
1330                e
1331            )
1332        })?;
1333
1334        // 7. Store in both L1 and L2 caches
1335        if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1336            warn!(
1337                "Failed to cache computed typed data for key '{}': {}",
1338                key, e
1339            );
1340        } else {
1341            debug!(
1342                "Cached typed value for '{}' (type: {})",
1343                key,
1344                std::any::type_name::<T>()
1345            );
1346        }
1347
1348        // 8. _cleanup_guard will auto-remove entry on drop
1349
1350        Ok(typed_value)
1351    }
1352
1353    /// Get comprehensive cache statistics
1354    ///
1355    /// In multi-tier mode, aggregates statistics from all tiers.
1356    /// In legacy mode, returns L1 and L2 stats.
1357    #[allow(dead_code)]
1358    pub fn get_stats(&self) -> CacheManagerStats {
1359        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1360        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1361        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1362        let misses = self.misses.load(Ordering::Relaxed);
1363
1364        CacheManagerStats {
1365            total_requests: total_reqs,
1366            l1_hits,
1367            l2_hits,
1368            total_hits: l1_hits + l2_hits,
1369            misses,
1370            hit_rate: if total_reqs > 0 {
1371                #[allow(clippy::cast_precision_loss)]
1372                {
1373                    ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1374                }
1375            } else {
1376                0.0
1377            },
1378            l1_hit_rate: if total_reqs > 0 {
1379                #[allow(clippy::cast_precision_loss)]
1380                {
1381                    (l1_hits as f64 / total_reqs as f64) * 100.0
1382                }
1383            } else {
1384                0.0
1385            },
1386            promotions: self.promotions.load(Ordering::Relaxed),
1387            in_flight_requests: self.in_flight_requests.len(),
1388        }
1389    }
1390
1391    /// Get per-tier statistics (v0.5.0+)
1392    ///
1393    /// Returns statistics for each tier if multi-tier mode is enabled.
1394    /// Returns None if using legacy 2-tier mode.
1395    ///
1396    /// # Example
1397    /// ```rust,ignore
1398    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1399    ///     for stats in tier_stats {
1400    ///         println!("L{}: {} hits ({})",
1401    ///                  stats.tier_level,
1402    ///                  stats.hit_count(),
1403    ///                  stats.backend_name);
1404    ///     }
1405    /// }
1406    /// ```
1407    pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1408        self.tiers
1409            .as_ref()
1410            .map(|tiers| tiers.iter().map(|tier| tier.stats.clone()).collect())
1411    }
1412
1413    // ===== Redis Streams Methods =====
1414
1415    /// Publish data to Redis Stream
1416    ///
1417    /// # Arguments
1418    /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1419    /// * `fields` - Field-value pairs to publish
1420    /// * `maxlen` - Optional max length for stream trimming
1421    ///
1422    /// # Returns
1423    /// The entry ID generated by Redis
1424    ///
1425    /// # Errors
1426    /// Returns error if streaming backend is not configured
1427    pub async fn publish_to_stream(
1428        &self,
1429        stream_key: &str,
1430        fields: Vec<(String, String)>,
1431        maxlen: Option<usize>,
1432    ) -> Result<String> {
1433        match &self.streaming_backend {
1434            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1435            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1436        }
1437    }
1438
1439    /// Read latest entries from Redis Stream
1440    ///
1441    /// # Arguments
1442    /// * `stream_key` - Name of the stream
1443    /// * `count` - Number of latest entries to retrieve
1444    ///
1445    /// # Returns
1446    /// Vector of (`entry_id`, fields) tuples (newest first)
1447    ///
1448    /// # Errors
1449    /// Returns error if streaming backend is not configured
1450    pub async fn read_stream_latest(
1451        &self,
1452        stream_key: &str,
1453        count: usize,
1454    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1455        match &self.streaming_backend {
1456            Some(backend) => backend.stream_read_latest(stream_key, count).await,
1457            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1458        }
1459    }
1460
1461    /// Read from Redis Stream with optional blocking
1462    ///
1463    /// # Arguments
1464    /// * `stream_key` - Name of the stream
1465    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1466    /// * `count` - Max entries to retrieve
1467    /// * `block_ms` - Optional blocking timeout in ms
1468    ///
1469    /// # Returns
1470    /// Vector of (`entry_id`, fields) tuples
1471    ///
1472    /// # Errors
1473    /// Returns error if streaming backend is not configured
1474    pub async fn read_stream(
1475        &self,
1476        stream_key: &str,
1477        last_id: &str,
1478        count: usize,
1479        block_ms: Option<usize>,
1480    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1481        match &self.streaming_backend {
1482            Some(backend) => {
1483                backend
1484                    .stream_read(stream_key, last_id, count, block_ms)
1485                    .await
1486            }
1487            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1488        }
1489    }
1490
1491    // ===== Cache Invalidation Methods =====
1492
1493    /// Invalidate a cache key across all instances
1494    ///
1495    /// This removes the key from all cache tiers and broadcasts
1496    /// the invalidation to all other cache instances via Redis Pub/Sub.
1497    ///
1498    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1499    ///
1500    /// # Arguments
1501    /// * `key` - Cache key to invalidate
1502    ///
1503    /// # Example
1504    /// ```rust,no_run
1505    /// # use multi_tier_cache::CacheManager;
1506    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1507    /// // Invalidate user cache after profile update
1508    /// cache_manager.invalidate("user:123").await?;
1509    /// # Ok(())
1510    /// # }
1511    /// ```
1512    /// # Errors
1513    ///
1514    /// Returns an error if invalidation fails.
1515    pub async fn invalidate(&self, key: &str) -> Result<()> {
1516        // NEW: Multi-tier mode (v0.5.0+)
1517        if let Some(tiers) = &self.tiers {
1518            // Remove from ALL tiers
1519            for tier in tiers {
1520                if let Err(e) = tier.remove(key).await {
1521                    warn!(
1522                        "Failed to remove '{}' from L{}: {}",
1523                        key, tier.tier_level, e
1524                    );
1525                }
1526            }
1527        } else {
1528            // LEGACY: 2-tier mode
1529            self.l1_cache.remove(key).await?;
1530            self.l2_cache.remove(key).await?;
1531        }
1532
1533        // Broadcast to other instances
1534        if let Some(publisher) = &self.invalidation_publisher {
1535            let mut pub_lock = publisher.lock().await;
1536            let msg = InvalidationMessage::remove(key);
1537            pub_lock.publish(&msg).await?;
1538            self.invalidation_stats
1539                .messages_sent
1540                .fetch_add(1, Ordering::Relaxed);
1541        }
1542
1543        debug!("Invalidated '{}' across all instances", key);
1544        Ok(())
1545    }
1546
1547    /// Update cache value across all instances
1548    ///
1549    /// This updates the key in all cache tiers and broadcasts
1550    /// the update to all other cache instances, avoiding cache misses.
1551    ///
1552    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1553    ///
1554    /// # Arguments
1555    /// * `key` - Cache key to update
1556    /// * `value` - New value
1557    /// * `ttl` - Optional TTL (uses default if None)
1558    ///
1559    /// # Example
1560    /// ```rust,no_run
1561    /// # use multi_tier_cache::CacheManager;
1562    /// # use std::time::Duration;
1563    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1564    /// // Update user cache with new data
1565    /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1566    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1567    /// # Ok(())
1568    /// # }
1569    /// ```
1570    /// # Errors
1571    ///
1572    /// Returns an error if cache update fails.
1573    pub async fn update_cache(
1574        &self,
1575        key: &str,
1576        value: serde_json::Value,
1577        ttl: Option<Duration>,
1578    ) -> Result<()> {
1579        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1580
1581        // NEW: Multi-tier mode (v0.5.0+)
1582        if let Some(tiers) = &self.tiers {
1583            // Update ALL tiers with their respective TTL scaling
1584            for tier in tiers {
1585                if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1586                    warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1587                }
1588            }
1589        } else {
1590            // LEGACY: 2-tier mode
1591            self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1592            self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1593        }
1594
1595        // Broadcast update to other instances
1596        if let Some(publisher) = &self.invalidation_publisher {
1597            let mut pub_lock = publisher.lock().await;
1598            let msg = InvalidationMessage::update(key, value, Some(ttl));
1599            pub_lock.publish(&msg).await?;
1600            self.invalidation_stats
1601                .messages_sent
1602                .fetch_add(1, Ordering::Relaxed);
1603        }
1604
1605        debug!("Updated '{}' across all instances", key);
1606        Ok(())
1607    }
1608
1609    /// Invalidate all keys matching a pattern
1610    ///
1611    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1612    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1613    ///
1614    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1615    ///
1616    /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1617    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1618    ///
1619    /// # Arguments
1620    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1621    ///
1622    /// # Example
1623    /// ```rust,no_run
1624    /// # use multi_tier_cache::CacheManager;
1625    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1626    /// // Invalidate all user caches
1627    /// cache_manager.invalidate_pattern("user:*").await?;
1628    ///
1629    /// // Invalidate specific user's related caches
1630    /// cache_manager.invalidate_pattern("user:123:*").await?;
1631    /// # Ok(())
1632    /// # }
1633    /// ```
1634    /// # Errors
1635    ///
1636    /// Returns an error if invalidation fails.
1637    pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1638        // Scan L2 for matching keys
1639        // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1640        let keys = if let Some(l2) = &self.l2_cache_concrete {
1641            l2.scan_keys(pattern).await?
1642        } else {
1643            return Err(anyhow::anyhow!(
1644                "Pattern invalidation requires concrete L2Cache instance"
1645            ));
1646        };
1647
1648        if keys.is_empty() {
1649            debug!("No keys found matching pattern '{}'", pattern);
1650            return Ok(());
1651        }
1652
1653        // NEW: Multi-tier mode (v0.5.0+)
1654        if let Some(tiers) = &self.tiers {
1655            // Remove from ALL tiers
1656            for key in &keys {
1657                for tier in tiers {
1658                    if let Err(e) = tier.remove(key).await {
1659                        warn!(
1660                            "Failed to remove '{}' from L{}: {}",
1661                            key, tier.tier_level, e
1662                        );
1663                    }
1664                }
1665            }
1666        } else {
1667            // LEGACY: 2-tier mode - Remove from L2 in bulk
1668            if let Some(l2) = &self.l2_cache_concrete {
1669                l2.remove_bulk(&keys).await?;
1670            }
1671        }
1672
1673        // Broadcast pattern invalidation
1674        if let Some(publisher) = &self.invalidation_publisher {
1675            let mut pub_lock = publisher.lock().await;
1676            let msg = InvalidationMessage::remove_bulk(keys.clone());
1677            pub_lock.publish(&msg).await?;
1678            self.invalidation_stats
1679                .messages_sent
1680                .fetch_add(1, Ordering::Relaxed);
1681        }
1682
1683        debug!(
1684            "Invalidated {} keys matching pattern '{}'",
1685            keys.len(),
1686            pattern
1687        );
1688        Ok(())
1689    }
1690
1691    /// Set value with automatic broadcast to all instances
1692    ///
1693    /// This is a write-through operation that updates the cache and
1694    /// broadcasts the update to all other instances automatically.
1695    ///
1696    /// # Arguments
1697    /// * `key` - Cache key
1698    /// * `value` - Value to cache
1699    /// * `strategy` - Cache strategy (determines TTL)
1700    ///
1701    /// # Example
1702    /// ```rust,no_run
1703    /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1704    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1705    /// // Update and broadcast in one call
1706    /// let data = serde_json::json!({"status": "active"});
1707    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1708    /// # Ok(())
1709    /// # }
1710    /// ```
1711    /// # Errors
1712    ///
1713    /// Returns an error if cache set or broadcast fails.
1714    pub async fn set_with_broadcast(
1715        &self,
1716        key: &str,
1717        value: serde_json::Value,
1718        strategy: CacheStrategy,
1719    ) -> Result<()> {
1720        let ttl = strategy.to_duration();
1721
1722        // Set in local caches
1723        self.set_with_strategy(key, value.clone(), strategy).await?;
1724
1725        // Broadcast update if invalidation is enabled
1726        if let Some(publisher) = &self.invalidation_publisher {
1727            let mut pub_lock = publisher.lock().await;
1728            let msg = InvalidationMessage::update(key, value, Some(ttl));
1729            pub_lock.publish(&msg).await?;
1730            self.invalidation_stats
1731                .messages_sent
1732                .fetch_add(1, Ordering::Relaxed);
1733        }
1734
1735        Ok(())
1736    }
1737
1738    /// Get invalidation statistics
1739    ///
1740    /// Returns statistics about invalidation operations if invalidation is enabled.
1741    pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1742        if self.invalidation_subscriber.is_some() {
1743            Some(self.invalidation_stats.snapshot())
1744        } else {
1745            None
1746        }
1747    }
1748}
1749
1750/// Cache Manager statistics
1751#[allow(dead_code)]
1752#[derive(Debug, Clone)]
1753pub struct CacheManagerStats {
1754    pub total_requests: u64,
1755    pub l1_hits: u64,
1756    pub l2_hits: u64,
1757    pub total_hits: u64,
1758    pub misses: u64,
1759    pub hit_rate: f64,
1760    pub l1_hit_rate: f64,
1761    pub promotions: usize,
1762    pub in_flight_requests: usize,
1763}