Skip to main content

multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use anyhow::Result;
6use dashmap::DashMap;
7use serde_json;
8use std::future::Future;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::time::Duration;
12use tokio::sync::{Mutex, broadcast};
13
14use tracing::{debug, error, info, warn};
15use rand::Rng;
16
17use crate::invalidation::{
18    AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
19    InvalidationStats, InvalidationSubscriber,
20};
21use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
22use crate::{L1Cache, L2Cache};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25
26///// Type alias for the in-flight requests map
27/// Stores a broadcast sender for each active key computation
28type InFlightMap = DashMap<String, broadcast::Sender<Result<Bytes, Arc<anyhow::Error>>>>;
29
30/// Cache strategies for different data types
31#[derive(Debug, Clone)]
32#[allow(dead_code)]
33pub enum CacheStrategy {
34    /// Real-time data - 10 seconds TTL
35    RealTime,
36    /// Short-term data - 5 minutes TTL  
37    ShortTerm,
38    /// Medium-term data - 1 hour TTL
39    MediumTerm,
40    /// Long-term data - 3 hours TTL
41    LongTerm,
42    /// Custom TTL
43    Custom(Duration),
44    /// Default strategy (5 minutes)
45    Default,
46}
47
48impl CacheStrategy {
49    /// Convert strategy to duration
50    #[must_use]
51    pub fn to_duration(&self) -> Duration {
52        match self {
53            Self::RealTime => Duration::from_secs(10),
54            Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
55            Self::MediumTerm => Duration::from_secs(3600),               // 1 hour
56            Self::LongTerm => Duration::from_secs(10800),                // 3 hours
57            Self::Custom(duration) => *duration,
58        }
59    }
60}
61
62/// Statistics for a single cache tier
63#[derive(Debug)]
64pub struct TierStats {
65    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
66    pub tier_level: usize,
67    /// Number of cache hits at this tier
68    pub hits: AtomicU64,
69    /// Backend name for identification
70    pub backend_name: String,
71}
72
73impl Clone for TierStats {
74    fn clone(&self) -> Self {
75        Self {
76            tier_level: self.tier_level,
77            hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
78            backend_name: self.backend_name.clone(),
79        }
80    }
81}
82
83impl TierStats {
84    fn new(tier_level: usize, backend_name: String) -> Self {
85        Self {
86            tier_level,
87            hits: AtomicU64::new(0),
88            backend_name,
89        }
90    }
91
92    /// Get current hit count
93    pub fn hit_count(&self) -> u64 {
94        self.hits.load(Ordering::Relaxed)
95    }
96}
97
98/// A single cache tier in the multi-tier architecture
99#[derive(Clone)]
100pub struct CacheTier {
101    /// The backend for this tier
102    pub backend: Arc<dyn L2CacheBackend>,
103    /// Tier level (1 for fastest, increases for slower/cheaper tiers)
104    pub tier_level: usize,
105    /// Whether to promote keys FROM lower tiers TO this tier
106    pub promotion_enabled: bool,
107    /// Promotion frequency (N) - promote with 1/N probability
108    pub promotion_frequency: usize,
109    /// TTL multiplier for this tier (e.g., L2 might store for 2x L1 TTL)
110    pub ttl_scale: f64,
111    /// Statistics for this tier
112    pub stats: TierStats,
113}
114
115impl CacheTier {
116    /// Create a new cache tier
117    pub fn new(
118        backend: Arc<dyn L2CacheBackend>,
119        tier_level: usize,
120        promotion_enabled: bool,
121        promotion_frequency: usize,
122        ttl_scale: f64,
123    ) -> Self {
124        let backend_name = backend.name().to_string();
125        Self {
126            backend,
127            tier_level,
128            promotion_enabled,
129            promotion_frequency,
130            ttl_scale,
131            stats: TierStats::new(tier_level, backend_name),
132        }
133    }
134
135    /// Get value with TTL from this tier
136    async fn get_with_ttl(&self, key: &str) -> Option<(Bytes, Option<Duration>)> {
137        self.backend.get_with_ttl(key).await
138    }
139
140    /// Set value with TTL in this tier
141    async fn set_with_ttl(&self, key: &str, value: Bytes, ttl: Duration) -> Result<()> {
142        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
143        self.backend.set_with_ttl(key, value, scaled_ttl).await
144    }
145
146    /// Remove value from this tier
147    async fn remove(&self, key: &str) -> Result<()> {
148        self.backend.remove(key).await
149    }
150
151    /// Record a cache hit for this tier
152    fn record_hit(&self) {
153        self.stats.hits.fetch_add(1, Ordering::Relaxed);
154    }
155}
156
157/// Configuration for a cache tier (used in builder pattern)
158#[derive(Debug, Clone)]
159pub struct TierConfig {
160    /// Tier level (1, 2, 3, 4...)
161    pub tier_level: usize,
162    /// Enable promotion to upper tiers on hit
163    pub promotion_enabled: bool,
164    /// Promotion frequency (N) - promote with 1/N probability (default 10)
165    pub promotion_frequency: usize,
166    /// TTL scale factor (1.0 = same as base TTL)
167    pub ttl_scale: f64,
168}
169
170impl TierConfig {
171    /// Create new tier configuration
172    #[must_use]
173    pub fn new(tier_level: usize) -> Self {
174        Self {
175            tier_level,
176            promotion_enabled: true,
177            promotion_frequency: 10,
178            ttl_scale: 1.0,
179        }
180    }
181
182    /// Configure as L1 (hot tier)
183    #[must_use]
184    pub fn as_l1() -> Self {
185        Self {
186            tier_level: 1,
187            promotion_enabled: false, // L1 is already top tier
188            promotion_frequency: 1,   // Doesn't matter but use 1
189            ttl_scale: 1.0,
190        }
191    }
192
193    /// Configure as L2 (warm tier)
194    #[must_use]
195    pub fn as_l2() -> Self {
196        Self {
197            tier_level: 2,
198            promotion_enabled: true,
199            promotion_frequency: 10,
200            ttl_scale: 1.0,
201        }
202    }
203
204    /// Configure as L3 (cold tier) with longer TTL
205    #[must_use]
206    pub fn as_l3() -> Self {
207        Self {
208            tier_level: 3,
209            promotion_enabled: true,
210            promotion_frequency: 10,
211            ttl_scale: 2.0, // Keep data 2x longer
212        }
213    }
214
215    /// Configure as L4 (archive tier) with much longer TTL
216    #[must_use]
217    pub fn as_l4() -> Self {
218        Self {
219            tier_level: 4,
220            promotion_enabled: true,
221            promotion_frequency: 10,
222            ttl_scale: 8.0, // Keep data 8x longer
223        }
224    }
225
226    /// Set promotion enabled
227    #[must_use]
228    pub fn with_promotion(mut self, enabled: bool) -> Self {
229        self.promotion_enabled = enabled;
230        self
231    }
232
233    /// Set promotion frequency (N)
234    #[must_use]
235    pub fn with_promotion_frequency(mut self, n: usize) -> Self {
236        self.promotion_frequency = n;
237        self
238    }
239
240    /// Set TTL scale factor
241    #[must_use]
242    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
243        self.ttl_scale = scale;
244        self
245    }
246
247    /// Set tier level
248    #[must_use]
249    pub fn with_level(mut self, level: usize) -> Self {
250        self.tier_level = level;
251        self
252    }
253}
254
255pub struct CacheManager {
256    /// Ordered list of cache tiers (L1, L2, L3, ...)
257    tiers: Vec<CacheTier>,
258
259    /// Optional streaming backend
260    streaming_backend: Option<Arc<dyn StreamingBackend>>,
261    /// Statistics
262    total_requests: AtomicU64,
263    l1_hits: AtomicU64,
264    l2_hits: AtomicU64,
265    misses: AtomicU64,
266    promotions: AtomicUsize,
267    /// In-flight requests map (Broadcaster integration will replace this in Step 4)
268    in_flight_requests: Arc<InFlightMap>,
269    /// Invalidation publisher
270    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
271    /// Invalidation subscriber
272    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
273    /// Invalidation statistics
274    invalidation_stats: Arc<AtomicInvalidationStats>,
275}
276
277impl CacheManager {
278    /// Create new cache manager with trait objects (pluggable backends)
279    ///
280    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
281    ///
282    /// # Arguments
283    ///
284    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
285    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
286    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
287    ///
288    /// # Example
289    ///
290    /// ```rust,ignore
291    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
292    /// use std::sync::Arc;
293    ///
294    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
295    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
296    ///
297    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
298    /// ```
299    /// # Errors
300    ///
301    /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
302    pub fn new_with_backends(
303        l1_cache: Arc<dyn CacheBackend>,
304        l2_cache: Arc<dyn L2CacheBackend>,
305        streaming_backend: Option<Arc<dyn StreamingBackend>>,
306    ) -> Result<Self> {
307        debug!("Initializing Cache Manager with L1+L2 backends...");
308
309        let tiers = vec![
310            CacheTier::new(Arc::new(ProxyL1ToL2(l1_cache)), 1, false, 1, 1.0),
311            CacheTier::new(l2_cache, 2, true, 10, 1.0),
312        ];
313
314        Ok(Self {
315            tiers,
316            streaming_backend,
317            total_requests: AtomicU64::new(0),
318            l1_hits: AtomicU64::new(0),
319            l2_hits: AtomicU64::new(0),
320            misses: AtomicU64::new(0),
321            promotions: AtomicUsize::new(0),
322            in_flight_requests: Arc::new(DashMap::new()),
323            invalidation_publisher: None,
324            invalidation_subscriber: None,
325            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
326        })
327    }
328
329    /// Create new cache manager with default backends (backward compatible)
330    ///
331    /// This is the legacy constructor maintained for backward compatibility.
332    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
333    ///
334    /// # Arguments
335    ///
336    /// * `l1_cache` - Moka L1 cache instance
337    /// * `l2_cache` - Redis L2 cache instance
338    /// # Errors
339    ///
340    /// Returns an error if Redis connection fails.
341    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
342        debug!("Initializing Cache Manager...");
343
344        // Convert concrete types to trait objects
345        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
346        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
347
348        // Create RedisStreams backend for streaming functionality
349        let redis_url =
350            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
351        let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
352        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
353
354        Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend))
355    }
356
357    /// Create new cache manager with invalidation support
358    ///
359    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
360    ///
361    /// # Arguments
362    ///
363    /// * `l1_cache` - Moka L1 cache instance
364    /// * `l2_cache` - Redis L2 cache instance
365    /// * `redis_url` - Redis connection URL for Pub/Sub
366    /// * `config` - Invalidation configuration
367    ///
368    /// # Example
369    ///
370    /// ```rust,ignore
371    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
372    ///
373    /// let config = InvalidationConfig {
374    ///     channel: "my_app:cache:invalidate".to_string(),
375    ///     ..Default::default()
376    /// };
377    ///
378    /// let manager = CacheManager::new_with_invalidation(
379    ///     l1, l2, "redis://localhost", config
380    /// ).await?;
381    /// ```
382    /// # Errors
383    ///
384    /// Returns an error if Redis connection fails or invalidation setup fails.
385    pub async fn new_with_invalidation(
386        l1_cache: Arc<L1Cache>,
387        l2_cache: Arc<L2Cache>,
388        redis_url: &str,
389        config: InvalidationConfig,
390    ) -> Result<Self> {
391        debug!("Initializing Cache Manager with Invalidation...");
392        debug!("  Pub/Sub channel: {}", config.channel);
393
394        // Convert concrete types to trait objects
395        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
396        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
397
398        // Create RedisStreams backend for streaming functionality
399        let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
400        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
401
402        // Create publisher
403        let client = redis::Client::open(redis_url)?;
404        let conn_manager = redis::aio::ConnectionManager::new(client).await?;
405        let publisher = InvalidationPublisher::new(conn_manager, config.clone());
406
407        // Create subscriber
408        let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
409        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
410
411        let tiers = vec![
412            CacheTier::new(Arc::new(ProxyL1ToL2(l1_backend)), 1, false, 1, 1.0),
413            CacheTier::new(l2_backend, 2, true, 10, 1.0),
414        ];
415
416        let manager = Self {
417            tiers,
418            streaming_backend: Some(streaming_backend),
419            total_requests: AtomicU64::new(0),
420            l1_hits: AtomicU64::new(0),
421            l2_hits: AtomicU64::new(0),
422            misses: AtomicU64::new(0),
423            promotions: AtomicUsize::new(0),
424            in_flight_requests: Arc::new(DashMap::new()),
425            invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
426            invalidation_subscriber: Some(Arc::new(subscriber)),
427            invalidation_stats,
428        };
429
430        // Start subscriber with handler
431        manager.start_invalidation_subscriber();
432
433        info!("Cache Manager initialized with invalidation support");
434
435        Ok(manager)
436    }
437
438    /// Create new cache manager with multi-tier architecture (v0.5.0+)
439    ///
440    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
441    /// Tiers are checked in order (lower `tier_level` = faster/hotter).
442    ///
443    /// # Arguments
444    ///
445    /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
446    /// * `streaming_backend` - Optional streaming backend
447    ///
448    /// # Example
449    ///
450    /// ```rust,ignore
451    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
452    /// use std::sync::Arc;
453    ///
454    /// // L1 + L2 + L3 setup
455    /// let l1 = Arc::new(L1Cache::new()?);
456    /// let l2 = Arc::new(L2Cache::new().await?);
457    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
458    ///
459    /// let tiers = vec![
460    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
461    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
462    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
463    /// ];
464    ///
465    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
466    /// ```
467    /// # Errors
468    ///
469    /// Returns an error if tiers are not sorted by level or if no tiers are provided.
470    pub fn new_with_tiers(
471        tiers: Vec<CacheTier>,
472        streaming_backend: Option<Arc<dyn StreamingBackend>>,
473    ) -> Result<Self> {
474        debug!("Initializing Multi-Tier Cache Manager...");
475        debug!("  Tier count: {}", tiers.len());
476        for tier in &tiers {
477            debug!(
478                "  L{}: {} (promotion={}, ttl_scale={})",
479                tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
480            );
481        }
482
483        // Validate tiers are sorted by level
484        for i in 1..tiers.len() {
485            if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1))
486                && current.tier_level <= prev.tier_level
487            {
488                anyhow::bail!(
489                    "Tiers must be sorted by tier_level ascending (found L{} after L{})",
490                    current.tier_level,
491                    prev.tier_level
492                );
493            }
494        }
495
496        Ok(Self {
497            tiers,
498            streaming_backend,
499            total_requests: AtomicU64::new(0),
500            l1_hits: AtomicU64::new(0),
501            l2_hits: AtomicU64::new(0),
502            misses: AtomicU64::new(0),
503            promotions: AtomicUsize::new(0),
504            in_flight_requests: Arc::new(DashMap::new()),
505            invalidation_publisher: None,
506            invalidation_subscriber: None,
507            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
508        })
509    }
510
511    /// Start the invalidation subscriber background task
512    fn start_invalidation_subscriber(&self) {
513        if let Some(subscriber) = &self.invalidation_subscriber {
514            let tiers = self.tiers.clone();
515
516            subscriber.start(move |msg| {
517                let tiers = tiers.clone();
518                async move {
519                    for tier in &tiers {
520                        match &msg {
521                            InvalidationMessage::Remove { key } => {
522                                if let Err(e) = tier.backend.remove(key).await {
523                                    warn!(
524                                        "Failed to remove '{}' from L{}: {}",
525                                        key, tier.tier_level, e
526                                    );
527                                }
528                            }
529                            InvalidationMessage::Update {
530                                key,
531                                value,
532                                ttl_secs,
533                            } => {
534                                let ttl = ttl_secs
535                                    .map_or_else(|| Duration::from_secs(300), Duration::from_secs);
536                                if let Err(e) =
537                                    tier.backend.set_with_ttl(key, value.clone(), ttl).await
538                                {
539                                    warn!(
540                                        "Failed to update '{}' in L{}: {}",
541                                        key, tier.tier_level, e
542                                    );
543                                }
544                            }
545                            InvalidationMessage::RemovePattern { pattern } => {
546                                if let Err(e) = tier.backend.remove_pattern(pattern).await {
547                                    warn!(
548                                        "Failed to remove pattern '{}' from L{}: {}",
549                                        pattern, tier.tier_level, e
550                                    );
551                                }
552                            }
553                            InvalidationMessage::RemoveBulk { keys } => {
554                                for key in keys {
555                                    if let Err(e) = tier.backend.remove(key).await {
556                                        warn!(
557                                            "Failed to remove '{}' from L{}: {}",
558                                            key, tier.tier_level, e
559                                        );
560                                    }
561                                }
562                            }
563                        }
564                    }
565                    Ok(())
566                }
567            });
568
569            info!("Invalidation subscriber started across all tiers");
570        }
571    }
572
573    /// Get value from cache using multi-tier architecture (v0.5.0+)
574    ///
575    /// This method iterates through all configured tiers and automatically promotes
576    /// to upper tiers on cache hit.
577    async fn get_multi_tier(&self, key: &str) -> Result<Option<Bytes>> {
578        // Try each tier sequentially (sorted by tier_level)
579        for (tier_index, tier) in self.tiers.iter().enumerate() {
580            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
581                // Cache hit!
582                tier.record_hit();
583
584                // Promote to all upper tiers (if promotion enabled)
585                if tier.promotion_enabled && tier_index > 0 {
586                    // Probabilistic Promotion Check
587                    let should_promote = if tier.promotion_frequency <= 1 {
588                        true
589                    } else {
590                        rand::thread_rng().gen_ratio(1, tier.promotion_frequency as u32)
591                    };
592
593                    if should_promote {
594                        let promotion_ttl =
595                            ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
596
597                        // Promote to all tiers above this one
598                        for upper_tier in self.tiers.iter().take(tier_index).rev() {
599                            if let Err(e) = upper_tier
600                                .set_with_ttl(key, value.clone(), promotion_ttl)
601                                .await
602                            {
603                                warn!(
604                                    "Failed to promote '{}' from L{} to L{}: {}",
605                                    key, tier.tier_level, upper_tier.tier_level, e
606                                );
607                            } else {
608                                self.promotions.fetch_add(1, Ordering::Relaxed);
609                                debug!(
610                                    "Promoted '{}' from L{} to L{} (TTL: {:?})",
611                                    key, tier.tier_level, upper_tier.tier_level, promotion_ttl
612                                );
613                            }
614                        }
615                    } else {
616                        debug!(
617                            "Probabilistic skip promotion for '{}' from L{} (N={})",
618                            key, tier.tier_level, tier.promotion_frequency
619                        );
620                    }
621                }
622
623                return Ok(Some(value));
624            }
625        }
626
627        // Cache miss across all tiers
628        Ok(None)
629    }
630
631    /// Get value from cache (L1 first, then L2 fallback with promotion)
632    ///
633    /// This method now includes built-in Cache Stampede protection when cache misses occur.
634    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
635    /// unnecessary duplicate work on external data sources.
636    ///
637    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
638    ///
639    /// # Arguments
640    /// * `key` - Cache key to retrieve
641    ///
642    /// # Returns
643    /// * `Ok(Some(value))` - Cache hit, value found in any tier
644    /// * `Ok(None)` - Cache miss, value not found in any cache
645    /// * `Err(error)` - Cache operation failed
646    /// # Errors
647    ///
648    /// Returns an error if cache operation fails.
649    ///
650    /// # Panics
651    ///
652    /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
653    pub async fn get(&self, key: &str) -> Result<Option<Bytes>> {
654        self.total_requests.fetch_add(1, Ordering::Relaxed);
655
656        // Fast path for L1 (first tier) - no locking needed
657        if let Some(tier1) = self.tiers.first()
658            && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
659        {
660            tier1.record_hit();
661            // Update legacy stats for backward compatibility
662            self.l1_hits.fetch_add(1, Ordering::Relaxed);
663            return Ok(Some(value));
664        }
665
666        // L1 miss - use stampede protection for lower tiers
667        let key_owned = key.to_string();
668        let lock_guard = self
669            .in_flight_requests
670            .entry(key_owned.clone())
671            .or_insert_with(|| broadcast::Sender::new(1))
672            .clone();
673
674        let mut rx = lock_guard.subscribe();
675
676        // If there are other receivers, someone else is computing, wait for it
677        if lock_guard.receiver_count() > 1 {
678            match rx.recv().await {
679                Ok(Ok(value)) => return Ok(Some(value)),
680                Ok(Err(e)) => {
681                    return Err(anyhow::anyhow!("Computation failed in another thread: {e}"));
682                }
683                Err(_) => {} // Fall through to re-compute if sender dropped or channel empty
684            }
685        }
686
687        // Double-check L1 after acquiring lock (or if we are the first to compute)
688        if let Some(tier1) = self.tiers.first()
689            && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
690        {
691            tier1.record_hit();
692            self.l1_hits.fetch_add(1, Ordering::Relaxed);
693            let _ = lock_guard.send(Ok(value.clone())); // Notify any waiting subscribers
694            return Ok(Some(value));
695        }
696
697        // Check remaining tiers with promotion
698        let result = self.get_multi_tier(key).await?;
699
700        if let Some(val) = result.clone() {
701            // Hit in L2+ tier - update legacy stats
702            if self.tiers.len() >= 2 {
703                self.l2_hits.fetch_add(1, Ordering::Relaxed);
704            }
705
706            // Notify any waiting subscribers
707            let _ = lock_guard.send(Ok(val.clone()));
708
709            // Remove the in-flight entry after computation/retrieval
710            self.in_flight_requests.remove(key);
711
712            Ok(Some(val))
713        } else {
714            self.misses.fetch_add(1, Ordering::Relaxed);
715
716            // Remove the in-flight entry after computation/retrieval
717            self.in_flight_requests.remove(key);
718
719            Ok(None)
720        }
721    }
722
723    /// Set value with specific cache strategy (all tiers)
724    ///
725    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
726    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
727    /// # Errors
728    ///
729    /// Returns an error if cache set operation fails.
730    pub async fn set_with_strategy(
731        &self,
732        key: &str,
733        value: Bytes,
734        strategy: CacheStrategy,
735    ) -> Result<()> {
736        let ttl = strategy.to_duration();
737
738        let mut success_count = 0;
739        let mut last_error = None;
740
741        for tier in &self.tiers {
742            match tier.set_with_ttl(key, value.clone(), ttl).await {
743                Ok(()) => {
744                    success_count += 1;
745                }
746                Err(e) => {
747                    error!(
748                        "L{} cache set failed for key '{}': {}",
749                        tier.tier_level, key, e
750                    );
751                    last_error = Some(e);
752                }
753            }
754        }
755
756        if success_count > 0 {
757            debug!(
758                "[Cache] Stored '{}' in {}/{} tiers (base TTL: {:?})",
759                key,
760                success_count,
761                self.tiers.len(),
762                ttl
763            );
764            return Ok(());
765        }
766
767        Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{key}'")))
768    }
769
770    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
771    ///
772    /// This method provides comprehensive Cache Stampede protection:
773    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
774    /// 2. Check L2 cache with mutex-based coalescing
775    /// 3. Compute fresh data with protection against concurrent computations
776    ///
777    /// # Arguments
778    /// * `key` - Cache key
779    /// * `strategy` - Cache strategy for TTL and storage behavior
780    /// * `compute_fn` - Async function to compute the value if not in any cache
781    ///
782    /// # Example
783    /// ```ignore
784    /// let api_data = cache_manager.get_or_compute_with(
785    ///     "api_response",
786    ///     CacheStrategy::RealTime,
787    ///     || async {
788    ///         fetch_data_from_api().await
789    ///     }
790    /// ).await?;
791    /// ```
792    #[allow(dead_code)]
793    /// # Errors
794    ///
795    /// Returns an error if compute function fails or cache operations fail.
796    pub async fn get_or_compute_with<F, Fut>(
797        &self,
798        key: &str,
799        strategy: CacheStrategy,
800        compute_fn: F,
801    ) -> Result<Bytes>
802    where
803        F: FnOnce() -> Fut + Send,
804        Fut: Future<Output = Result<Bytes>> + Send,
805    {
806        self.total_requests.fetch_add(1, Ordering::Relaxed);
807
808        // 1. Try tiers sequentially first
809        for (idx, tier) in self.tiers.iter().enumerate() {
810            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
811                tier.record_hit();
812                if tier.tier_level == 1 {
813                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
814                } else if tier.tier_level == 2 {
815                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
816                }
817
818                // Promotion to L1 if hit was in a lower tier
819                if idx > 0 && tier.promotion_enabled {
820                    // Probabilistic Promotion Check
821                    let should_promote = if tier.promotion_frequency <= 1 {
822                        true
823                    } else {
824                        rand::thread_rng().gen_ratio(1, tier.promotion_frequency as u32)
825                    };
826
827                    if should_promote {
828                        let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
829                        if let Some(l1_tier) = self.tiers.first() {
830                            let _ = l1_tier
831                                .set_with_ttl(key, value.clone(), promotion_ttl)
832                                .await;
833                        }
834                    }
835                }
836
837                return Ok(value);
838            }
839        }
840
841        // 2. Cache miss across all tiers - use stampede protection
842        let (tx, mut rx) = match self.in_flight_requests.entry(key.to_string()) {
843            dashmap::mapref::entry::Entry::Occupied(entry) => {
844                let tx = entry.get().clone();
845                (tx.clone(), tx.subscribe())
846            }
847            dashmap::mapref::entry::Entry::Vacant(entry) => {
848                let (tx, _) = broadcast::channel(1);
849                entry.insert(tx.clone());
850                (tx.clone(), tx.subscribe())
851            }
852        };
853
854        if tx.receiver_count() > 1 {
855            // Someone else is computing, wait for it
856            match rx.recv().await {
857                Ok(Ok(value)) => return Ok(value),
858                Ok(Err(e)) => {
859                    return Err(anyhow::anyhow!("Computation failed in another thread: {e}"));
860                }
861                Err(_) => {} // Fall through to re-compute
862            }
863        }
864
865        // 3. Re-check cache after receiving/creating broadcaster (double-check pattern)
866        for tier in &self.tiers {
867            if let Some((value, _)) = tier.get_with_ttl(key).await {
868                let _ = tx.send(Ok(value.clone()));
869                return Ok(value);
870            }
871        }
872
873        // 4. Miss - compute fresh data
874        debug!(
875            "Computing fresh data for key: '{}' (Stampede protected)",
876            key
877        );
878
879        let result = compute_fn().await;
880
881        // Remove from in_flight BEFORE broadcasting
882        self.in_flight_requests.remove(key);
883
884        match &result {
885            Ok(value) => {
886                let _ = self.set_with_strategy(key, value.clone(), strategy).await;
887                let _ = tx.send(Ok(value.clone()));
888            }
889            Err(e) => {
890                let _ = tx.send(Err(Arc::new(anyhow::anyhow!("{e}"))));
891            }
892        }
893
894        result
895    }
896
897    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
898    ///
899    /// This method provides the same functionality as `get_or_compute_with()` but with
900    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
901    /// API calls, or any computation that returns structured data.
902    ///
903    /// # Type Safety
904    ///
905    /// - Returns your actual type `T` instead of `serde_json::Value`
906    /// - Compiler enforces Serialize + `DeserializeOwned` bounds
907    /// - No manual JSON conversion needed
908    ///
909    /// # Cache Flow
910    ///
911    /// 1. Check L1 cache → deserialize if found
912    /// 2. Check L2 cache → deserialize + promote to L1 if found
913    /// 3. Execute `compute_fn` → serialize → store in L1+L2
914    /// 4. Full stampede protection (only ONE request computes)
915    ///
916    /// # Arguments
917    ///
918    /// * `key` - Cache key
919    /// * `strategy` - Cache strategy for TTL
920    /// * `compute_fn` - Async function returning `Result<T>`
921    ///
922    /// # Example - Database Query
923    ///
924    /// ```no_run
925    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
926    /// # use std::sync::Arc;
927    /// # use serde::{Serialize, Deserialize};
928    /// # async fn example() -> anyhow::Result<()> {
929    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
930    /// # let l2 = Arc::new(L2Cache::new().await?);
931    /// # let cache_manager = CacheManager::new(l1, l2);
932    ///
933    /// #[derive(Serialize, Deserialize)]
934    /// struct User {
935    ///     id: i64,
936    ///     name: String,
937    /// }
938    ///
939    /// // Type-safe database caching (example - requires sqlx)
940    /// // let user: User = cache_manager.get_or_compute_typed(
941    /// //     "user:123",
942    /// //     CacheStrategy::MediumTerm,
943    /// //     || async {
944    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
945    /// //             .bind(123)
946    /// //             .fetch_one(&pool)
947    /// //             .await
948    /// //     }
949    /// // ).await?;
950    /// # Ok(())
951    /// # }
952    /// ```
953    ///
954    /// # Example - API Call
955    ///
956    /// ```no_run
957    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
958    /// # use std::sync::Arc;
959    /// # use serde::{Serialize, Deserialize};
960    /// # async fn example() -> anyhow::Result<()> {
961    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
962    /// # let l2 = Arc::new(L2Cache::new().await?);
963    /// # let cache_manager = CacheManager::new(l1, l2);
964    /// #[derive(Serialize, Deserialize)]
965    /// struct ApiResponse {
966    ///     data: String,
967    ///     timestamp: i64,
968    /// }
969    ///
970    /// // API call caching (example - requires reqwest)
971    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
972    /// //     "api:endpoint",
973    /// //     CacheStrategy::RealTime,
974    /// //     || async {
975    /// //         reqwest::get("https://api.example.com/data")
976    /// //             .await?
977    /// //             .json::<ApiResponse>()
978    /// //             .await
979    /// //     }
980    /// // ).await?;
981    /// # Ok(())
982    /// # }
983    /// ```
984    ///
985    /// # Performance
986    ///
987    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
988    /// - L2 hit: 2-5ms + deserialization + L1 promotion
989    /// - Compute: Your function time + serialization + L1+L2 storage
990    /// - Stampede protection: 99.6% latency reduction under high concurrency
991    ///
992    /// # Errors
993    ///
994    /// Returns error if:
995    /// - Compute function fails
996    /// - Serialization fails (invalid type for JSON)
997    /// - Deserialization fails (cache data doesn't match type T)
998    /// - Cache operations fail (Redis connection issues)
999    #[allow(clippy::too_many_lines)]
1000    pub async fn get_or_compute_typed<T, F, Fut>(
1001        &self,
1002        key: &str,
1003        strategy: CacheStrategy,
1004        compute_fn: F,
1005    ) -> Result<T>
1006    where
1007        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1008        F: FnOnce() -> Fut + Send,
1009        Fut: Future<Output = Result<T>> + Send,
1010    {
1011        self.total_requests.fetch_add(1, Ordering::Relaxed);
1012
1013        // 1. Try L1 first with zero-cost typed access (if backend supports it)
1014        // Note: For now we still use bytes path in CacheManager to keep it generic,
1015        // but backends like Moka use their internal typed cache.
1016        // We might want a dedicated Tier trait method for typed access later.
1017
1018        for (idx, tier) in self.tiers.iter().enumerate() {
1019            if let Some((bytes, ttl)) = tier.get_with_ttl(key).await {
1020                tier.record_hit();
1021                if tier.tier_level == 1 {
1022                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
1023                } else if tier.tier_level == 2 {
1024                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
1025                }
1026
1027                match serde_json::from_slice::<T>(&bytes) {
1028                    Ok(value) => {
1029                        // Promotion
1030                        if idx > 0 && tier.promotion_enabled {
1031                            let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1032                            if let Some(l1_tier) = self.tiers.first() {
1033                                let _ = l1_tier.set_with_ttl(key, bytes, promotion_ttl).await;
1034                                self.promotions.fetch_add(1, Ordering::Relaxed);
1035                            }
1036                        }
1037                        return Ok(value);
1038                    }
1039                    Err(e) => {
1040                        warn!("Deserialization failed for key '{}': {}", key, e);
1041                        // Fall through to next tier or compute
1042                    }
1043                }
1044            }
1045        }
1046
1047        // 2. Compute with stampede protection (reuse get_or_compute_with logic for Bytes)
1048        let compute_fn_wrapped = || async {
1049            let val = compute_fn().await?;
1050            Ok(Bytes::from(serde_json::to_vec(&val)?))
1051        };
1052
1053        let bytes = self
1054            .get_or_compute_with(key, strategy, compute_fn_wrapped)
1055            .await?;
1056
1057        match serde_json::from_slice::<T>(&bytes) {
1058            Ok(val) => Ok(val),
1059            Err(e) => Err(anyhow::anyhow!(
1060                "Coalesced result deserialization failed: {e}"
1061            )),
1062        }
1063    }
1064
1065    /// Get comprehensive cache statistics
1066    ///
1067    /// In multi-tier mode, aggregates statistics from all tiers.
1068    /// In legacy mode, returns L1 and L2 stats.
1069    #[allow(dead_code)]
1070    pub fn get_stats(&self) -> CacheManagerStats {
1071        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1072        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1073        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1074        let misses = self.misses.load(Ordering::Relaxed);
1075
1076        CacheManagerStats {
1077            total_requests: total_reqs,
1078            l1_hits,
1079            l2_hits,
1080            total_hits: l1_hits + l2_hits,
1081            misses,
1082            hit_rate: if total_reqs > 0 {
1083                #[allow(clippy::cast_precision_loss)]
1084                {
1085                    ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1086                }
1087            } else {
1088                0.0
1089            },
1090            l1_hit_rate: if total_reqs > 0 {
1091                #[allow(clippy::cast_precision_loss)]
1092                {
1093                    (l1_hits as f64 / total_reqs as f64) * 100.0
1094                }
1095            } else {
1096                0.0
1097            },
1098            promotions: self.promotions.load(Ordering::Relaxed),
1099            in_flight_requests: self.in_flight_requests.len(),
1100        }
1101    }
1102
1103    /// Get per-tier statistics (v0.5.0+)
1104    ///
1105    /// Returns statistics for each tier if multi-tier mode is enabled.
1106    /// Returns None if using legacy 2-tier mode.
1107    ///
1108    /// # Example
1109    /// ```rust,ignore
1110    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1111    ///     for stats in tier_stats {
1112    ///         println!("L{}: {} hits ({})",
1113    ///                  stats.tier_level,
1114    ///                  stats.hit_count(),
1115    ///                  stats.backend_name);
1116    ///     }
1117    /// }
1118    /// ```
1119    pub fn get_tier_stats(&self) -> Vec<TierStats> {
1120        self.tiers.iter().map(|tier| tier.stats.clone()).collect()
1121    }
1122}
1123
1124/// Proxy wrapper to allow using `CacheBackend` where `DynL2CacheBackend` is expected
1125/// (Internal helper for `new_with_backends` to wrap L1 `CacheBackend` into `DynL2CacheBackend`)
1126struct ProxyL1ToL2(Arc<dyn CacheBackend>);
1127
1128impl CacheBackend for ProxyL1ToL2 {
1129    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
1130        self.0.get(key)
1131    }
1132
1133    fn set_with_ttl<'a>(
1134        &'a self,
1135        key: &'a str,
1136        value: Bytes,
1137        ttl: Duration,
1138    ) -> BoxFuture<'a, Result<()>> {
1139        self.0.set_with_ttl(key, value, ttl)
1140    }
1141
1142    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Result<()>> {
1143        self.0.remove(key)
1144    }
1145
1146    fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, Result<()>> {
1147        self.0.remove_pattern(pattern)
1148    }
1149
1150    fn health_check(&self) -> BoxFuture<'_, bool> {
1151        self.0.health_check()
1152    }
1153
1154    fn name(&self) -> &'static str {
1155        self.0.name()
1156    }
1157}
1158
1159impl L2CacheBackend for ProxyL1ToL2 {
1160    fn get_with_ttl<'a>(
1161        &'a self,
1162        key: &'a str,
1163    ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
1164        Box::pin(async move { self.0.get(key).await.map(|v| (v, None)) })
1165    }
1166}
1167
1168impl CacheManager {
1169    // ===== Redis Streams Methods =====
1170
1171    /// Publish data to Redis Stream
1172    ///
1173    /// # Arguments
1174    /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1175    /// * `fields` - Field-value pairs to publish
1176    /// * `maxlen` - Optional max length for stream trimming
1177    ///
1178    /// # Returns
1179    /// The entry ID generated by Redis
1180    ///
1181    /// # Errors
1182    /// Returns error if streaming backend is not configured
1183    pub async fn publish_to_stream(
1184        &self,
1185        stream_key: &str,
1186        fields: Vec<(String, String)>,
1187        maxlen: Option<usize>,
1188    ) -> Result<String> {
1189        match &self.streaming_backend {
1190            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1191            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1192        }
1193    }
1194
1195    /// Read latest entries from Redis Stream
1196    ///
1197    /// # Arguments
1198    /// * `stream_key` - Name of the stream
1199    /// * `count` - Number of latest entries to retrieve
1200    ///
1201    /// # Returns
1202    /// Vector of (`entry_id`, fields) tuples (newest first)
1203    ///
1204    /// # Errors
1205    /// Returns error if streaming backend is not configured
1206    pub async fn read_stream_latest(
1207        &self,
1208        stream_key: &str,
1209        count: usize,
1210    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1211        match &self.streaming_backend {
1212            Some(backend) => backend.stream_read_latest(stream_key, count).await,
1213            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1214        }
1215    }
1216
1217    /// Read from Redis Stream with optional blocking
1218    ///
1219    /// # Arguments
1220    /// * `stream_key` - Name of the stream
1221    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1222    /// * `count` - Max entries to retrieve
1223    /// * `block_ms` - Optional blocking timeout in ms
1224    ///
1225    /// # Returns
1226    /// Vector of (`entry_id`, fields) tuples
1227    ///
1228    /// # Errors
1229    /// Returns error if streaming backend is not configured
1230    pub async fn read_stream(
1231        &self,
1232        stream_key: &str,
1233        last_id: &str,
1234        count: usize,
1235        block_ms: Option<usize>,
1236    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1237        match &self.streaming_backend {
1238            Some(backend) => {
1239                backend
1240                    .stream_read(stream_key, last_id, count, block_ms)
1241                    .await
1242            }
1243            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1244        }
1245    }
1246
1247    // ===== Cache Invalidation Methods =====
1248
1249    /// Invalidate a cache key across all instances
1250    ///
1251    /// This removes the key from all cache tiers and broadcasts
1252    /// the invalidation to all other cache instances via Redis Pub/Sub.
1253    ///
1254    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1255    ///
1256    /// # Arguments
1257    /// * `key` - Cache key to invalidate
1258    ///
1259    /// # Example
1260    /// ```rust,no_run
1261    /// # use multi_tier_cache::CacheManager;
1262    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1263    /// // Invalidate user cache after profile update
1264    /// cache_manager.invalidate("user:123").await?;
1265    /// # Ok(())
1266    /// # }
1267    /// ```
1268    /// # Errors
1269    ///
1270    /// Returns an error if invalidation fails.
1271    pub async fn invalidate(&self, key: &str) -> Result<()> {
1272        // Remove from ALL tiers
1273        for tier in &self.tiers {
1274            if let Err(e) = tier.remove(key).await {
1275                warn!(
1276                    "Failed to remove '{}' from L{}: {}",
1277                    key, tier.tier_level, e
1278                );
1279            }
1280        }
1281
1282        // Broadcast to other instances
1283        if let Some(publisher) = &self.invalidation_publisher {
1284            let mut pub_lock = publisher.lock().await;
1285            let msg = InvalidationMessage::remove(key);
1286            pub_lock.publish(&msg).await?;
1287            self.invalidation_stats
1288                .messages_sent
1289                .fetch_add(1, Ordering::Relaxed);
1290        }
1291
1292        debug!("Invalidated '{}' across all instances", key);
1293        Ok(())
1294    }
1295
1296    /// Update cache value across all instances
1297    ///
1298    /// This updates the key in all cache tiers and broadcasts
1299    /// the update to all other cache instances, avoiding cache misses.
1300    ///
1301    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1302    ///
1303    /// # Arguments
1304    /// * `key` - Cache key to update
1305    /// * `value` - New value
1306    /// * `ttl` - Optional TTL (uses default if None)
1307    ///
1308    /// # Example
1309    /// ```rust,no_run
1310    /// # use multi_tier_cache::CacheManager;
1311    /// # use std::time::Duration;
1312    /// # use bytes::Bytes;
1313    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1314    /// // Update user cache with new data
1315    /// let user_data = Bytes::from("alice");
1316    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1317    /// # Ok(())
1318    /// # }
1319    /// ```
1320    /// # Errors
1321    ///
1322    /// Returns an error if cache update fails.
1323    pub async fn update_cache(&self, key: &str, value: Bytes, ttl: Option<Duration>) -> Result<()> {
1324        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1325
1326        // Update ALL tiers
1327        for tier in &self.tiers {
1328            if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1329                warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1330            }
1331        }
1332
1333        // Broadcast update to other instances
1334        if let Some(publisher) = &self.invalidation_publisher {
1335            let mut pub_lock = publisher.lock().await;
1336            let msg = InvalidationMessage::update(key, value, Some(ttl));
1337            pub_lock.publish(&msg).await?;
1338            self.invalidation_stats
1339                .messages_sent
1340                .fetch_add(1, Ordering::Relaxed);
1341        }
1342
1343        debug!("Updated '{}' across all instances", key);
1344        Ok(())
1345    }
1346
1347    /// Invalidate all keys matching a pattern
1348    ///
1349    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1350    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1351    ///
1352    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1353    ///
1354    /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1355    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1356    ///
1357    /// # Arguments
1358    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1359    ///
1360    /// # Example
1361    /// ```rust,no_run
1362    /// # use multi_tier_cache::CacheManager;
1363    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1364    /// // Invalidate all user caches
1365    /// cache_manager.invalidate_pattern("user:*").await?;
1366    ///
1367    /// // Invalidate specific user's related caches
1368    /// cache_manager.invalidate_pattern("user:123:*").await?;
1369    /// # Ok(())
1370    /// # }
1371    /// ```
1372    /// # Errors
1373    ///
1374    /// Returns an error if invalidation fails.
1375    pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1376        debug!(pattern = %pattern, "Invalidating pattern across all tiers");
1377
1378        // 1. Invalidate in all configured tiers
1379        for tier in &self.tiers {
1380            debug!(tier = %tier.tier_level, "Invalidating pattern in tier");
1381            tier.backend.remove_pattern(pattern).await?;
1382        }
1383
1384        // 2. Broadcast invalidation if publisher is configured
1385        if let Some(publisher) = &self.invalidation_publisher {
1386            let msg = InvalidationMessage::remove_pattern(pattern);
1387            publisher.lock().await.publish(&msg).await?;
1388            debug!(pattern = %pattern, "Broadcasted pattern invalidation");
1389        }
1390
1391        Ok(())
1392    }
1393
1394    /// Set value with automatic broadcast to all instances
1395    ///
1396    /// This is a write-through operation that updates the cache and
1397    /// broadcasts the update to all other instances automatically.
1398    ///
1399    /// # Arguments
1400    /// * `key` - Cache key
1401    /// * `value` - Value to cache
1402    /// * `strategy` - Cache strategy (determines TTL)
1403    ///
1404    /// # Example
1405    /// ```rust,no_run
1406    /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1407    /// # use bytes::Bytes;
1408    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1409    /// // Update and broadcast in one call
1410    /// let data = Bytes::from("active");
1411    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1412    /// # Ok(())
1413    /// # }
1414    /// ```
1415    /// # Errors
1416    ///
1417    /// Returns an error if cache set or broadcast fails.
1418    pub async fn set_with_broadcast(
1419        &self,
1420        key: &str,
1421        value: Bytes,
1422        strategy: CacheStrategy,
1423    ) -> Result<()> {
1424        let ttl = strategy.to_duration();
1425
1426        // Set in local caches
1427        self.set_with_strategy(key, value.clone(), strategy).await?;
1428
1429        // Broadcast update if invalidation is enabled
1430        if let Some(publisher) = &self.invalidation_publisher {
1431            let mut pub_lock = publisher.lock().await;
1432            let msg = InvalidationMessage::update(key, value, Some(ttl));
1433            pub_lock.publish(&msg).await?;
1434            self.invalidation_stats
1435                .messages_sent
1436                .fetch_add(1, Ordering::Relaxed);
1437        }
1438
1439        Ok(())
1440    }
1441
1442    /// Get invalidation statistics
1443    ///
1444    /// Returns statistics about invalidation operations if invalidation is enabled.
1445    pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1446        if self.invalidation_subscriber.is_some() {
1447            Some(self.invalidation_stats.snapshot())
1448        } else {
1449            None
1450        }
1451    }
1452}
1453
1454/// Cache Manager statistics
1455#[allow(dead_code)]
1456#[derive(Debug, Clone)]
1457pub struct CacheManagerStats {
1458    pub total_requests: u64,
1459    pub l1_hits: u64,
1460    pub l2_hits: u64,
1461    pub total_hits: u64,
1462    pub misses: u64,
1463    pub hit_rate: f64,
1464    pub l1_hit_rate: f64,
1465    pub promotions: usize,
1466    pub in_flight_requests: usize,
1467}