Skip to main content

multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use anyhow::Result;
6use dashmap::DashMap;
7use serde_json;
8use std::future::Future;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::time::Duration;
12use tokio::sync::{Mutex, broadcast};
13
14use tracing::{debug, error, info, warn};
15
16use crate::invalidation::{
17    AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
18    InvalidationStats, InvalidationSubscriber,
19};
20use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
21use crate::{L1Cache, L2Cache};
22use bytes::Bytes;
23use futures_util::future::BoxFuture;
24
25///// Type alias for the in-flight requests map
26/// Stores a broadcast sender for each active key computation
27type InFlightMap = DashMap<String, broadcast::Sender<Result<Bytes, Arc<anyhow::Error>>>>;
28
29/// Cache strategies for different data types
30#[derive(Debug, Clone)]
31#[allow(dead_code)]
32pub enum CacheStrategy {
33    /// Real-time data - 10 seconds TTL
34    RealTime,
35    /// Short-term data - 5 minutes TTL  
36    ShortTerm,
37    /// Medium-term data - 1 hour TTL
38    MediumTerm,
39    /// Long-term data - 3 hours TTL
40    LongTerm,
41    /// Custom TTL
42    Custom(Duration),
43    /// Default strategy (5 minutes)
44    Default,
45}
46
47impl CacheStrategy {
48    /// Convert strategy to duration
49    #[must_use]
50    pub fn to_duration(&self) -> Duration {
51        match self {
52            Self::RealTime => Duration::from_secs(10),
53            Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
54            Self::MediumTerm => Duration::from_secs(3600),               // 1 hour
55            Self::LongTerm => Duration::from_secs(10800),                // 3 hours
56            Self::Custom(duration) => *duration,
57        }
58    }
59}
60
61/// Statistics for a single cache tier
62#[derive(Debug)]
63pub struct TierStats {
64    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
65    pub tier_level: usize,
66    /// Number of cache hits at this tier
67    pub hits: AtomicU64,
68    /// Backend name for identification
69    pub backend_name: String,
70}
71
72impl Clone for TierStats {
73    fn clone(&self) -> Self {
74        Self {
75            tier_level: self.tier_level,
76            hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
77            backend_name: self.backend_name.clone(),
78        }
79    }
80}
81
82impl TierStats {
83    fn new(tier_level: usize, backend_name: String) -> Self {
84        Self {
85            tier_level,
86            hits: AtomicU64::new(0),
87            backend_name,
88        }
89    }
90
91    /// Get current hit count
92    pub fn hit_count(&self) -> u64 {
93        self.hits.load(Ordering::Relaxed)
94    }
95}
96
97/// A single cache tier in the multi-tier architecture
98#[derive(Clone)]
99pub struct CacheTier {
100    /// The backend for this tier
101    pub backend: Arc<dyn L2CacheBackend>,
102    /// Tier level (1 for fastest, increases for slower/cheaper tiers)
103    pub tier_level: usize,
104    /// Whether to promote keys FROM lower tiers TO this tier
105    pub promotion_enabled: bool,
106    /// TTL multiplier for this tier (e.g., L2 might store for 2x L1 TTL)
107    pub ttl_scale: f64,
108    /// Statistics for this tier
109    pub stats: TierStats,
110}
111
112impl CacheTier {
113    /// Create a new cache tier
114    pub fn new(
115        backend: Arc<dyn L2CacheBackend>,
116        tier_level: usize,
117        promotion_enabled: bool,
118        ttl_scale: f64,
119    ) -> Self {
120        let backend_name = backend.name().to_string();
121        Self {
122            backend,
123            tier_level,
124            promotion_enabled,
125            ttl_scale,
126            stats: TierStats::new(tier_level, backend_name),
127        }
128    }
129
130    /// Get value with TTL from this tier
131    async fn get_with_ttl(&self, key: &str) -> Option<(Bytes, Option<Duration>)> {
132        self.backend.get_with_ttl(key).await
133    }
134
135    /// Set value with TTL in this tier
136    async fn set_with_ttl(&self, key: &str, value: Bytes, ttl: Duration) -> Result<()> {
137        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
138        self.backend.set_with_ttl(key, value, scaled_ttl).await
139    }
140
141    /// Remove value from this tier
142    async fn remove(&self, key: &str) -> Result<()> {
143        self.backend.remove(key).await
144    }
145
146    /// Record a cache hit for this tier
147    fn record_hit(&self) {
148        self.stats.hits.fetch_add(1, Ordering::Relaxed);
149    }
150}
151
152/// Configuration for a cache tier (used in builder pattern)
153#[derive(Debug, Clone)]
154pub struct TierConfig {
155    /// Tier level (1, 2, 3, 4...)
156    pub tier_level: usize,
157    /// Enable promotion to upper tiers on hit
158    pub promotion_enabled: bool,
159    /// TTL scale factor (1.0 = same as base TTL)
160    pub ttl_scale: f64,
161}
162
163impl TierConfig {
164    /// Create new tier configuration
165    #[must_use]
166    pub fn new(tier_level: usize) -> Self {
167        Self {
168            tier_level,
169            promotion_enabled: true,
170            ttl_scale: 1.0,
171        }
172    }
173
174    /// Configure as L1 (hot tier)
175    #[must_use]
176    pub fn as_l1() -> Self {
177        Self {
178            tier_level: 1,
179            promotion_enabled: false, // L1 is already top tier
180            ttl_scale: 1.0,
181        }
182    }
183
184    /// Configure as L2 (warm tier)
185    #[must_use]
186    pub fn as_l2() -> Self {
187        Self {
188            tier_level: 2,
189            promotion_enabled: true,
190            ttl_scale: 1.0,
191        }
192    }
193
194    /// Configure as L3 (cold tier) with longer TTL
195    #[must_use]
196    pub fn as_l3() -> Self {
197        Self {
198            tier_level: 3,
199            promotion_enabled: true,
200            ttl_scale: 2.0, // Keep data 2x longer
201        }
202    }
203
204    /// Configure as L4 (archive tier) with much longer TTL
205    #[must_use]
206    pub fn as_l4() -> Self {
207        Self {
208            tier_level: 4,
209            promotion_enabled: true,
210            ttl_scale: 8.0, // Keep data 8x longer
211        }
212    }
213
214    /// Set promotion enabled
215    #[must_use]
216    pub fn with_promotion(mut self, enabled: bool) -> Self {
217        self.promotion_enabled = enabled;
218        self
219    }
220
221    /// Set TTL scale factor
222    #[must_use]
223    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
224        self.ttl_scale = scale;
225        self
226    }
227
228    /// Set tier level
229    #[must_use]
230    pub fn with_level(mut self, level: usize) -> Self {
231        self.tier_level = level;
232        self
233    }
234}
235
236pub struct CacheManager {
237    /// Ordered list of cache tiers (L1, L2, L3, ...)
238    tiers: Vec<CacheTier>,
239
240    /// Optional streaming backend
241    streaming_backend: Option<Arc<dyn StreamingBackend>>,
242    /// Statistics
243    total_requests: AtomicU64,
244    l1_hits: AtomicU64,
245    l2_hits: AtomicU64,
246    misses: AtomicU64,
247    promotions: AtomicUsize,
248    /// In-flight requests map (Broadcaster integration will replace this in Step 4)
249    in_flight_requests: Arc<InFlightMap>,
250    /// Invalidation publisher
251    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
252    /// Invalidation subscriber
253    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
254    /// Invalidation statistics
255    invalidation_stats: Arc<AtomicInvalidationStats>,
256}
257
258impl CacheManager {
259    /// Create new cache manager with trait objects (pluggable backends)
260    ///
261    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
262    ///
263    /// # Arguments
264    ///
265    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
266    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
267    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
268    ///
269    /// # Example
270    ///
271    /// ```rust,ignore
272    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
273    /// use std::sync::Arc;
274    ///
275    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
276    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
277    ///
278    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
279    /// ```
280    /// # Errors
281    ///
282    /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
283    pub fn new_with_backends(
284        l1_cache: Arc<dyn CacheBackend>,
285        l2_cache: Arc<dyn L2CacheBackend>,
286        streaming_backend: Option<Arc<dyn StreamingBackend>>,
287    ) -> Result<Self> {
288        debug!("Initializing Cache Manager with L1+L2 backends...");
289
290        let tiers = vec![
291            CacheTier::new(Arc::new(ProxyL1ToL2(l1_cache)), 1, false, 1.0),
292            CacheTier::new(l2_cache, 2, true, 1.0),
293        ];
294
295        Ok(Self {
296            tiers,
297            streaming_backend,
298            total_requests: AtomicU64::new(0),
299            l1_hits: AtomicU64::new(0),
300            l2_hits: AtomicU64::new(0),
301            misses: AtomicU64::new(0),
302            promotions: AtomicUsize::new(0),
303            in_flight_requests: Arc::new(DashMap::new()),
304            invalidation_publisher: None,
305            invalidation_subscriber: None,
306            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
307        })
308    }
309
310    /// Create new cache manager with default backends (backward compatible)
311    ///
312    /// This is the legacy constructor maintained for backward compatibility.
313    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
314    ///
315    /// # Arguments
316    ///
317    /// * `l1_cache` - Moka L1 cache instance
318    /// * `l2_cache` - Redis L2 cache instance
319    /// # Errors
320    ///
321    /// Returns an error if Redis connection fails.
322    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
323        debug!("Initializing Cache Manager...");
324
325        // Convert concrete types to trait objects
326        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
327        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
328
329        // Create RedisStreams backend for streaming functionality
330        let redis_url =
331            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
332        let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
333        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
334
335        Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend))
336    }
337
338    /// Create new cache manager with invalidation support
339    ///
340    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
341    ///
342    /// # Arguments
343    ///
344    /// * `l1_cache` - Moka L1 cache instance
345    /// * `l2_cache` - Redis L2 cache instance
346    /// * `redis_url` - Redis connection URL for Pub/Sub
347    /// * `config` - Invalidation configuration
348    ///
349    /// # Example
350    ///
351    /// ```rust,ignore
352    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
353    ///
354    /// let config = InvalidationConfig {
355    ///     channel: "my_app:cache:invalidate".to_string(),
356    ///     ..Default::default()
357    /// };
358    ///
359    /// let manager = CacheManager::new_with_invalidation(
360    ///     l1, l2, "redis://localhost", config
361    /// ).await?;
362    /// ```
363    /// # Errors
364    ///
365    /// Returns an error if Redis connection fails or invalidation setup fails.
366    pub async fn new_with_invalidation(
367        l1_cache: Arc<L1Cache>,
368        l2_cache: Arc<L2Cache>,
369        redis_url: &str,
370        config: InvalidationConfig,
371    ) -> Result<Self> {
372        debug!("Initializing Cache Manager with Invalidation...");
373        debug!("  Pub/Sub channel: {}", config.channel);
374
375        // Convert concrete types to trait objects
376        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
377        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
378
379        // Create RedisStreams backend for streaming functionality
380        let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
381        let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
382
383        // Create publisher
384        let client = redis::Client::open(redis_url)?;
385        let conn_manager = redis::aio::ConnectionManager::new(client).await?;
386        let publisher = InvalidationPublisher::new(conn_manager, config.clone());
387
388        // Create subscriber
389        let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
390        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
391
392        let tiers = vec![
393            CacheTier::new(Arc::new(ProxyL1ToL2(l1_backend)), 1, false, 1.0),
394            CacheTier::new(l2_backend, 2, true, 1.0),
395        ];
396
397        let manager = Self {
398            tiers,
399            streaming_backend: Some(streaming_backend),
400            total_requests: AtomicU64::new(0),
401            l1_hits: AtomicU64::new(0),
402            l2_hits: AtomicU64::new(0),
403            misses: AtomicU64::new(0),
404            promotions: AtomicUsize::new(0),
405            in_flight_requests: Arc::new(DashMap::new()),
406            invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
407            invalidation_subscriber: Some(Arc::new(subscriber)),
408            invalidation_stats,
409        };
410
411        // Start subscriber with handler
412        manager.start_invalidation_subscriber();
413
414        info!("Cache Manager initialized with invalidation support");
415
416        Ok(manager)
417    }
418
419    /// Create new cache manager with multi-tier architecture (v0.5.0+)
420    ///
421    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
422    /// Tiers are checked in order (lower `tier_level` = faster/hotter).
423    ///
424    /// # Arguments
425    ///
426    /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
427    /// * `streaming_backend` - Optional streaming backend
428    ///
429    /// # Example
430    ///
431    /// ```rust,ignore
432    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
433    /// use std::sync::Arc;
434    ///
435    /// // L1 + L2 + L3 setup
436    /// let l1 = Arc::new(L1Cache::new()?);
437    /// let l2 = Arc::new(L2Cache::new().await?);
438    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
439    ///
440    /// let tiers = vec![
441    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
442    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
443    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
444    /// ];
445    ///
446    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
447    /// ```
448    /// # Errors
449    ///
450    /// Returns an error if tiers are not sorted by level or if no tiers are provided.
451    pub fn new_with_tiers(
452        tiers: Vec<CacheTier>,
453        streaming_backend: Option<Arc<dyn StreamingBackend>>,
454    ) -> Result<Self> {
455        debug!("Initializing Multi-Tier Cache Manager...");
456        debug!("  Tier count: {}", tiers.len());
457        for tier in &tiers {
458            debug!(
459                "  L{}: {} (promotion={}, ttl_scale={})",
460                tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
461            );
462        }
463
464        // Validate tiers are sorted by level
465        for i in 1..tiers.len() {
466            if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1))
467                && current.tier_level <= prev.tier_level
468            {
469                anyhow::bail!(
470                    "Tiers must be sorted by tier_level ascending (found L{} after L{})",
471                    current.tier_level,
472                    prev.tier_level
473                );
474            }
475        }
476
477        Ok(Self {
478            tiers,
479            streaming_backend,
480            total_requests: AtomicU64::new(0),
481            l1_hits: AtomicU64::new(0),
482            l2_hits: AtomicU64::new(0),
483            misses: AtomicU64::new(0),
484            promotions: AtomicUsize::new(0),
485            in_flight_requests: Arc::new(DashMap::new()),
486            invalidation_publisher: None,
487            invalidation_subscriber: None,
488            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
489        })
490    }
491
492    /// Start the invalidation subscriber background task
493    fn start_invalidation_subscriber(&self) {
494        if let Some(subscriber) = &self.invalidation_subscriber {
495            let tiers = self.tiers.clone();
496
497            subscriber.start(move |msg| {
498                let tiers = tiers.clone();
499                async move {
500                    for tier in &tiers {
501                        match &msg {
502                            InvalidationMessage::Remove { key } => {
503                                if let Err(e) = tier.backend.remove(key).await {
504                                    warn!(
505                                        "Failed to remove '{}' from L{}: {}",
506                                        key, tier.tier_level, e
507                                    );
508                                }
509                            }
510                            InvalidationMessage::Update {
511                                key,
512                                value,
513                                ttl_secs,
514                            } => {
515                                let ttl = ttl_secs
516                                    .map_or_else(|| Duration::from_secs(300), Duration::from_secs);
517                                if let Err(e) =
518                                    tier.backend.set_with_ttl(key, value.clone(), ttl).await
519                                {
520                                    warn!(
521                                        "Failed to update '{}' in L{}: {}",
522                                        key, tier.tier_level, e
523                                    );
524                                }
525                            }
526                            InvalidationMessage::RemovePattern { pattern } => {
527                                if let Err(e) = tier.backend.remove_pattern(pattern).await {
528                                    warn!(
529                                        "Failed to remove pattern '{}' from L{}: {}",
530                                        pattern, tier.tier_level, e
531                                    );
532                                }
533                            }
534                            InvalidationMessage::RemoveBulk { keys } => {
535                                for key in keys {
536                                    if let Err(e) = tier.backend.remove(key).await {
537                                        warn!(
538                                            "Failed to remove '{}' from L{}: {}",
539                                            key, tier.tier_level, e
540                                        );
541                                    }
542                                }
543                            }
544                        }
545                    }
546                    Ok(())
547                }
548            });
549
550            info!("Invalidation subscriber started across all tiers");
551        }
552    }
553
554    /// Get value from cache using multi-tier architecture (v0.5.0+)
555    ///
556    /// This method iterates through all configured tiers and automatically promotes
557    /// to upper tiers on cache hit.
558    async fn get_multi_tier(&self, key: &str) -> Result<Option<Bytes>> {
559        // Try each tier sequentially (sorted by tier_level)
560        for (tier_index, tier) in self.tiers.iter().enumerate() {
561            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
562                // Cache hit!
563                tier.record_hit();
564
565                // Promote to all upper tiers (if promotion enabled)
566                if tier.promotion_enabled && tier_index > 0 {
567                    let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
568
569                    // Promote to all tiers above this one
570                    for upper_tier in self.tiers.iter().take(tier_index).rev() {
571                        if let Err(e) = upper_tier
572                            .set_with_ttl(key, value.clone(), promotion_ttl)
573                            .await
574                        {
575                            warn!(
576                                "Failed to promote '{}' from L{} to L{}: {}",
577                                key, tier.tier_level, upper_tier.tier_level, e
578                            );
579                        } else {
580                            self.promotions.fetch_add(1, Ordering::Relaxed);
581                            debug!(
582                                "Promoted '{}' from L{} to L{} (TTL: {:?})",
583                                key, tier.tier_level, upper_tier.tier_level, promotion_ttl
584                            );
585                        }
586                    }
587                }
588
589                return Ok(Some(value));
590            }
591        }
592
593        // Cache miss across all tiers
594        Ok(None)
595    }
596
597    /// Get value from cache (L1 first, then L2 fallback with promotion)
598    ///
599    /// This method now includes built-in Cache Stampede protection when cache misses occur.
600    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
601    /// unnecessary duplicate work on external data sources.
602    ///
603    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
604    ///
605    /// # Arguments
606    /// * `key` - Cache key to retrieve
607    ///
608    /// # Returns
609    /// * `Ok(Some(value))` - Cache hit, value found in any tier
610    /// * `Ok(None)` - Cache miss, value not found in any cache
611    /// * `Err(error)` - Cache operation failed
612    /// # Errors
613    ///
614    /// Returns an error if cache operation fails.
615    ///
616    /// # Panics
617    ///
618    /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
619    pub async fn get(&self, key: &str) -> Result<Option<Bytes>> {
620        self.total_requests.fetch_add(1, Ordering::Relaxed);
621
622        // Fast path for L1 (first tier) - no locking needed
623        if let Some(tier1) = self.tiers.first()
624            && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
625        {
626            tier1.record_hit();
627            // Update legacy stats for backward compatibility
628            self.l1_hits.fetch_add(1, Ordering::Relaxed);
629            return Ok(Some(value));
630        }
631
632        // L1 miss - use stampede protection for lower tiers
633        let key_owned = key.to_string();
634        let lock_guard = self
635            .in_flight_requests
636            .entry(key_owned.clone())
637            .or_insert_with(|| broadcast::Sender::new(1))
638            .clone();
639
640        let mut rx = lock_guard.subscribe();
641
642        // If there are other receivers, someone else is computing, wait for it
643        if lock_guard.receiver_count() > 1 {
644            match rx.recv().await {
645                Ok(Ok(value)) => return Ok(Some(value)),
646                Ok(Err(e)) => {
647                    return Err(anyhow::anyhow!("Computation failed in another thread: {e}"));
648                }
649                Err(_) => {} // Fall through to re-compute if sender dropped or channel empty
650            }
651        }
652
653        // Double-check L1 after acquiring lock (or if we are the first to compute)
654        if let Some(tier1) = self.tiers.first()
655            && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
656        {
657            tier1.record_hit();
658            self.l1_hits.fetch_add(1, Ordering::Relaxed);
659            let _ = lock_guard.send(Ok(value.clone())); // Notify any waiting subscribers
660            return Ok(Some(value));
661        }
662
663        // Check remaining tiers with promotion
664        let result = self.get_multi_tier(key).await?;
665
666        if let Some(val) = result.clone() {
667            // Hit in L2+ tier - update legacy stats
668            if self.tiers.len() >= 2 {
669                self.l2_hits.fetch_add(1, Ordering::Relaxed);
670            }
671
672            // Notify any waiting subscribers
673            let _ = lock_guard.send(Ok(val.clone()));
674
675            // Remove the in-flight entry after computation/retrieval
676            self.in_flight_requests.remove(key);
677
678            Ok(Some(val))
679        } else {
680            self.misses.fetch_add(1, Ordering::Relaxed);
681
682            // Remove the in-flight entry after computation/retrieval
683            self.in_flight_requests.remove(key);
684
685            Ok(None)
686        }
687    }
688
689    /// Set value with specific cache strategy (all tiers)
690    ///
691    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
692    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
693    /// # Errors
694    ///
695    /// Returns an error if cache set operation fails.
696    pub async fn set_with_strategy(
697        &self,
698        key: &str,
699        value: Bytes,
700        strategy: CacheStrategy,
701    ) -> Result<()> {
702        let ttl = strategy.to_duration();
703
704        let mut success_count = 0;
705        let mut last_error = None;
706
707        for tier in &self.tiers {
708            match tier.set_with_ttl(key, value.clone(), ttl).await {
709                Ok(()) => {
710                    success_count += 1;
711                }
712                Err(e) => {
713                    error!(
714                        "L{} cache set failed for key '{}': {}",
715                        tier.tier_level, key, e
716                    );
717                    last_error = Some(e);
718                }
719            }
720        }
721
722        if success_count > 0 {
723            debug!(
724                "[Cache] Stored '{}' in {}/{} tiers (base TTL: {:?})",
725                key,
726                success_count,
727                self.tiers.len(),
728                ttl
729            );
730            return Ok(());
731        }
732
733        Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{key}'")))
734    }
735
736    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
737    ///
738    /// This method provides comprehensive Cache Stampede protection:
739    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
740    /// 2. Check L2 cache with mutex-based coalescing
741    /// 3. Compute fresh data with protection against concurrent computations
742    ///
743    /// # Arguments
744    /// * `key` - Cache key
745    /// * `strategy` - Cache strategy for TTL and storage behavior
746    /// * `compute_fn` - Async function to compute the value if not in any cache
747    ///
748    /// # Example
749    /// ```ignore
750    /// let api_data = cache_manager.get_or_compute_with(
751    ///     "api_response",
752    ///     CacheStrategy::RealTime,
753    ///     || async {
754    ///         fetch_data_from_api().await
755    ///     }
756    /// ).await?;
757    /// ```
758    #[allow(dead_code)]
759    /// # Errors
760    ///
761    /// Returns an error if compute function fails or cache operations fail.
762    pub async fn get_or_compute_with<F, Fut>(
763        &self,
764        key: &str,
765        strategy: CacheStrategy,
766        compute_fn: F,
767    ) -> Result<Bytes>
768    where
769        F: FnOnce() -> Fut + Send,
770        Fut: Future<Output = Result<Bytes>> + Send,
771    {
772        self.total_requests.fetch_add(1, Ordering::Relaxed);
773
774        // 1. Try tiers sequentially first
775        for (idx, tier) in self.tiers.iter().enumerate() {
776            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
777                tier.record_hit();
778                if tier.tier_level == 1 {
779                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
780                } else if tier.tier_level == 2 {
781                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
782                }
783
784                // Promotion to L1 if hit was in a lower tier
785                if idx > 0 && tier.promotion_enabled {
786                    let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
787                    if let Some(l1_tier) = self.tiers.first() {
788                        let _ = l1_tier
789                            .set_with_ttl(key, value.clone(), promotion_ttl)
790                            .await;
791                    }
792                }
793
794                return Ok(value);
795            }
796        }
797
798        // 2. Cache miss across all tiers - use stampede protection
799        let (tx, mut rx) = match self.in_flight_requests.entry(key.to_string()) {
800            dashmap::mapref::entry::Entry::Occupied(entry) => {
801                let tx = entry.get().clone();
802                (tx.clone(), tx.subscribe())
803            }
804            dashmap::mapref::entry::Entry::Vacant(entry) => {
805                let (tx, _) = broadcast::channel(1);
806                entry.insert(tx.clone());
807                (tx.clone(), tx.subscribe())
808            }
809        };
810
811        if tx.receiver_count() > 1 {
812            // Someone else is computing, wait for it
813            match rx.recv().await {
814                Ok(Ok(value)) => return Ok(value),
815                Ok(Err(e)) => {
816                    return Err(anyhow::anyhow!("Computation failed in another thread: {e}"));
817                }
818                Err(_) => {} // Fall through to re-compute
819            }
820        }
821
822        // 3. Re-check cache after receiving/creating broadcaster (double-check pattern)
823        for tier in &self.tiers {
824            if let Some((value, _)) = tier.get_with_ttl(key).await {
825                let _ = tx.send(Ok(value.clone()));
826                return Ok(value);
827            }
828        }
829
830        // 4. Miss - compute fresh data
831        debug!(
832            "Computing fresh data for key: '{}' (Stampede protected)",
833            key
834        );
835
836        let result = compute_fn().await;
837
838        // Remove from in_flight BEFORE broadcasting
839        self.in_flight_requests.remove(key);
840
841        match &result {
842            Ok(value) => {
843                let _ = self.set_with_strategy(key, value.clone(), strategy).await;
844                let _ = tx.send(Ok(value.clone()));
845            }
846            Err(e) => {
847                let _ = tx.send(Err(Arc::new(anyhow::anyhow!("{e}"))));
848            }
849        }
850
851        result
852    }
853
854    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
855    ///
856    /// This method provides the same functionality as `get_or_compute_with()` but with
857    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
858    /// API calls, or any computation that returns structured data.
859    ///
860    /// # Type Safety
861    ///
862    /// - Returns your actual type `T` instead of `serde_json::Value`
863    /// - Compiler enforces Serialize + `DeserializeOwned` bounds
864    /// - No manual JSON conversion needed
865    ///
866    /// # Cache Flow
867    ///
868    /// 1. Check L1 cache → deserialize if found
869    /// 2. Check L2 cache → deserialize + promote to L1 if found
870    /// 3. Execute `compute_fn` → serialize → store in L1+L2
871    /// 4. Full stampede protection (only ONE request computes)
872    ///
873    /// # Arguments
874    ///
875    /// * `key` - Cache key
876    /// * `strategy` - Cache strategy for TTL
877    /// * `compute_fn` - Async function returning `Result<T>`
878    ///
879    /// # Example - Database Query
880    ///
881    /// ```no_run
882    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
883    /// # use std::sync::Arc;
884    /// # use serde::{Serialize, Deserialize};
885    /// # async fn example() -> anyhow::Result<()> {
886    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
887    /// # let l2 = Arc::new(L2Cache::new().await?);
888    /// # let cache_manager = CacheManager::new(l1, l2);
889    ///
890    /// #[derive(Serialize, Deserialize)]
891    /// struct User {
892    ///     id: i64,
893    ///     name: String,
894    /// }
895    ///
896    /// // Type-safe database caching (example - requires sqlx)
897    /// // let user: User = cache_manager.get_or_compute_typed(
898    /// //     "user:123",
899    /// //     CacheStrategy::MediumTerm,
900    /// //     || async {
901    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
902    /// //             .bind(123)
903    /// //             .fetch_one(&pool)
904    /// //             .await
905    /// //     }
906    /// // ).await?;
907    /// # Ok(())
908    /// # }
909    /// ```
910    ///
911    /// # Example - API Call
912    ///
913    /// ```no_run
914    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
915    /// # use std::sync::Arc;
916    /// # use serde::{Serialize, Deserialize};
917    /// # async fn example() -> anyhow::Result<()> {
918    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
919    /// # let l2 = Arc::new(L2Cache::new().await?);
920    /// # let cache_manager = CacheManager::new(l1, l2);
921    /// #[derive(Serialize, Deserialize)]
922    /// struct ApiResponse {
923    ///     data: String,
924    ///     timestamp: i64,
925    /// }
926    ///
927    /// // API call caching (example - requires reqwest)
928    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
929    /// //     "api:endpoint",
930    /// //     CacheStrategy::RealTime,
931    /// //     || async {
932    /// //         reqwest::get("https://api.example.com/data")
933    /// //             .await?
934    /// //             .json::<ApiResponse>()
935    /// //             .await
936    /// //     }
937    /// // ).await?;
938    /// # Ok(())
939    /// # }
940    /// ```
941    ///
942    /// # Performance
943    ///
944    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
945    /// - L2 hit: 2-5ms + deserialization + L1 promotion
946    /// - Compute: Your function time + serialization + L1+L2 storage
947    /// - Stampede protection: 99.6% latency reduction under high concurrency
948    ///
949    /// # Errors
950    ///
951    /// Returns error if:
952    /// - Compute function fails
953    /// - Serialization fails (invalid type for JSON)
954    /// - Deserialization fails (cache data doesn't match type T)
955    /// - Cache operations fail (Redis connection issues)
956    #[allow(clippy::too_many_lines)]
957    pub async fn get_or_compute_typed<T, F, Fut>(
958        &self,
959        key: &str,
960        strategy: CacheStrategy,
961        compute_fn: F,
962    ) -> Result<T>
963    where
964        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
965        F: FnOnce() -> Fut + Send,
966        Fut: Future<Output = Result<T>> + Send,
967    {
968        self.total_requests.fetch_add(1, Ordering::Relaxed);
969
970        // 1. Try L1 first with zero-cost typed access (if backend supports it)
971        // Note: For now we still use bytes path in CacheManager to keep it generic,
972        // but backends like Moka use their internal typed cache.
973        // We might want a dedicated Tier trait method for typed access later.
974
975        for (idx, tier) in self.tiers.iter().enumerate() {
976            if let Some((bytes, ttl)) = tier.get_with_ttl(key).await {
977                tier.record_hit();
978                if tier.tier_level == 1 {
979                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
980                } else if tier.tier_level == 2 {
981                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
982                }
983
984                match serde_json::from_slice::<T>(&bytes) {
985                    Ok(value) => {
986                        // Promotion
987                        if idx > 0 && tier.promotion_enabled {
988                            let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
989                            if let Some(l1_tier) = self.tiers.first() {
990                                let _ = l1_tier.set_with_ttl(key, bytes, promotion_ttl).await;
991                                self.promotions.fetch_add(1, Ordering::Relaxed);
992                            }
993                        }
994                        return Ok(value);
995                    }
996                    Err(e) => {
997                        warn!("Deserialization failed for key '{}': {}", key, e);
998                        // Fall through to next tier or compute
999                    }
1000                }
1001            }
1002        }
1003
1004        // 2. Compute with stampede protection (reuse get_or_compute_with logic for Bytes)
1005        let compute_fn_wrapped = || async {
1006            let val = compute_fn().await?;
1007            Ok(Bytes::from(serde_json::to_vec(&val)?))
1008        };
1009
1010        let bytes = self
1011            .get_or_compute_with(key, strategy, compute_fn_wrapped)
1012            .await?;
1013
1014        match serde_json::from_slice::<T>(&bytes) {
1015            Ok(val) => Ok(val),
1016            Err(e) => Err(anyhow::anyhow!(
1017                "Coalesced result deserialization failed: {e}"
1018            )),
1019        }
1020    }
1021
1022    /// Get comprehensive cache statistics
1023    ///
1024    /// In multi-tier mode, aggregates statistics from all tiers.
1025    /// In legacy mode, returns L1 and L2 stats.
1026    #[allow(dead_code)]
1027    pub fn get_stats(&self) -> CacheManagerStats {
1028        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1029        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1030        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1031        let misses = self.misses.load(Ordering::Relaxed);
1032
1033        CacheManagerStats {
1034            total_requests: total_reqs,
1035            l1_hits,
1036            l2_hits,
1037            total_hits: l1_hits + l2_hits,
1038            misses,
1039            hit_rate: if total_reqs > 0 {
1040                #[allow(clippy::cast_precision_loss)]
1041                {
1042                    ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1043                }
1044            } else {
1045                0.0
1046            },
1047            l1_hit_rate: if total_reqs > 0 {
1048                #[allow(clippy::cast_precision_loss)]
1049                {
1050                    (l1_hits as f64 / total_reqs as f64) * 100.0
1051                }
1052            } else {
1053                0.0
1054            },
1055            promotions: self.promotions.load(Ordering::Relaxed),
1056            in_flight_requests: self.in_flight_requests.len(),
1057        }
1058    }
1059
1060    /// Get per-tier statistics (v0.5.0+)
1061    ///
1062    /// Returns statistics for each tier if multi-tier mode is enabled.
1063    /// Returns None if using legacy 2-tier mode.
1064    ///
1065    /// # Example
1066    /// ```rust,ignore
1067    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1068    ///     for stats in tier_stats {
1069    ///         println!("L{}: {} hits ({})",
1070    ///                  stats.tier_level,
1071    ///                  stats.hit_count(),
1072    ///                  stats.backend_name);
1073    ///     }
1074    /// }
1075    /// ```
1076    pub fn get_tier_stats(&self) -> Vec<TierStats> {
1077        self.tiers.iter().map(|tier| tier.stats.clone()).collect()
1078    }
1079}
1080
1081/// Proxy wrapper to allow using `CacheBackend` where `DynL2CacheBackend` is expected
1082/// (Internal helper for `new_with_backends` to wrap L1 `CacheBackend` into `DynL2CacheBackend`)
1083struct ProxyL1ToL2(Arc<dyn CacheBackend>);
1084
1085impl CacheBackend for ProxyL1ToL2 {
1086    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
1087        self.0.get(key)
1088    }
1089
1090    fn set_with_ttl<'a>(
1091        &'a self,
1092        key: &'a str,
1093        value: Bytes,
1094        ttl: Duration,
1095    ) -> BoxFuture<'a, Result<()>> {
1096        self.0.set_with_ttl(key, value, ttl)
1097    }
1098
1099    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Result<()>> {
1100        self.0.remove(key)
1101    }
1102
1103    fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, Result<()>> {
1104        self.0.remove_pattern(pattern)
1105    }
1106
1107    fn health_check(&self) -> BoxFuture<'_, bool> {
1108        self.0.health_check()
1109    }
1110
1111    fn name(&self) -> &'static str {
1112        self.0.name()
1113    }
1114}
1115
1116impl L2CacheBackend for ProxyL1ToL2 {
1117    fn get_with_ttl<'a>(
1118        &'a self,
1119        key: &'a str,
1120    ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
1121        Box::pin(async move { self.0.get(key).await.map(|v| (v, None)) })
1122    }
1123}
1124
1125impl CacheManager {
1126    // ===== Redis Streams Methods =====
1127
1128    /// Publish data to Redis Stream
1129    ///
1130    /// # Arguments
1131    /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1132    /// * `fields` - Field-value pairs to publish
1133    /// * `maxlen` - Optional max length for stream trimming
1134    ///
1135    /// # Returns
1136    /// The entry ID generated by Redis
1137    ///
1138    /// # Errors
1139    /// Returns error if streaming backend is not configured
1140    pub async fn publish_to_stream(
1141        &self,
1142        stream_key: &str,
1143        fields: Vec<(String, String)>,
1144        maxlen: Option<usize>,
1145    ) -> Result<String> {
1146        match &self.streaming_backend {
1147            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1148            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1149        }
1150    }
1151
1152    /// Read latest entries from Redis Stream
1153    ///
1154    /// # Arguments
1155    /// * `stream_key` - Name of the stream
1156    /// * `count` - Number of latest entries to retrieve
1157    ///
1158    /// # Returns
1159    /// Vector of (`entry_id`, fields) tuples (newest first)
1160    ///
1161    /// # Errors
1162    /// Returns error if streaming backend is not configured
1163    pub async fn read_stream_latest(
1164        &self,
1165        stream_key: &str,
1166        count: usize,
1167    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1168        match &self.streaming_backend {
1169            Some(backend) => backend.stream_read_latest(stream_key, count).await,
1170            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1171        }
1172    }
1173
1174    /// Read from Redis Stream with optional blocking
1175    ///
1176    /// # Arguments
1177    /// * `stream_key` - Name of the stream
1178    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1179    /// * `count` - Max entries to retrieve
1180    /// * `block_ms` - Optional blocking timeout in ms
1181    ///
1182    /// # Returns
1183    /// Vector of (`entry_id`, fields) tuples
1184    ///
1185    /// # Errors
1186    /// Returns error if streaming backend is not configured
1187    pub async fn read_stream(
1188        &self,
1189        stream_key: &str,
1190        last_id: &str,
1191        count: usize,
1192        block_ms: Option<usize>,
1193    ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1194        match &self.streaming_backend {
1195            Some(backend) => {
1196                backend
1197                    .stream_read(stream_key, last_id, count, block_ms)
1198                    .await
1199            }
1200            None => Err(anyhow::anyhow!("Streaming backend not configured")),
1201        }
1202    }
1203
1204    // ===== Cache Invalidation Methods =====
1205
1206    /// Invalidate a cache key across all instances
1207    ///
1208    /// This removes the key from all cache tiers and broadcasts
1209    /// the invalidation to all other cache instances via Redis Pub/Sub.
1210    ///
1211    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1212    ///
1213    /// # Arguments
1214    /// * `key` - Cache key to invalidate
1215    ///
1216    /// # Example
1217    /// ```rust,no_run
1218    /// # use multi_tier_cache::CacheManager;
1219    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1220    /// // Invalidate user cache after profile update
1221    /// cache_manager.invalidate("user:123").await?;
1222    /// # Ok(())
1223    /// # }
1224    /// ```
1225    /// # Errors
1226    ///
1227    /// Returns an error if invalidation fails.
1228    pub async fn invalidate(&self, key: &str) -> Result<()> {
1229        // Remove from ALL tiers
1230        for tier in &self.tiers {
1231            if let Err(e) = tier.remove(key).await {
1232                warn!(
1233                    "Failed to remove '{}' from L{}: {}",
1234                    key, tier.tier_level, e
1235                );
1236            }
1237        }
1238
1239        // Broadcast to other instances
1240        if let Some(publisher) = &self.invalidation_publisher {
1241            let mut pub_lock = publisher.lock().await;
1242            let msg = InvalidationMessage::remove(key);
1243            pub_lock.publish(&msg).await?;
1244            self.invalidation_stats
1245                .messages_sent
1246                .fetch_add(1, Ordering::Relaxed);
1247        }
1248
1249        debug!("Invalidated '{}' across all instances", key);
1250        Ok(())
1251    }
1252
1253    /// Update cache value across all instances
1254    ///
1255    /// This updates the key in all cache tiers and broadcasts
1256    /// the update to all other cache instances, avoiding cache misses.
1257    ///
1258    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1259    ///
1260    /// # Arguments
1261    /// * `key` - Cache key to update
1262    /// * `value` - New value
1263    /// * `ttl` - Optional TTL (uses default if None)
1264    ///
1265    /// # Example
1266    /// ```rust,no_run
1267    /// # use multi_tier_cache::CacheManager;
1268    /// # use std::time::Duration;
1269    /// # use bytes::Bytes;
1270    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1271    /// // Update user cache with new data
1272    /// let user_data = Bytes::from("alice");
1273    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1274    /// # Ok(())
1275    /// # }
1276    /// ```
1277    /// # Errors
1278    ///
1279    /// Returns an error if cache update fails.
1280    pub async fn update_cache(&self, key: &str, value: Bytes, ttl: Option<Duration>) -> Result<()> {
1281        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1282
1283        // Update ALL tiers
1284        for tier in &self.tiers {
1285            if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1286                warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1287            }
1288        }
1289
1290        // Broadcast update to other instances
1291        if let Some(publisher) = &self.invalidation_publisher {
1292            let mut pub_lock = publisher.lock().await;
1293            let msg = InvalidationMessage::update(key, value, Some(ttl));
1294            pub_lock.publish(&msg).await?;
1295            self.invalidation_stats
1296                .messages_sent
1297                .fetch_add(1, Ordering::Relaxed);
1298        }
1299
1300        debug!("Updated '{}' across all instances", key);
1301        Ok(())
1302    }
1303
1304    /// Invalidate all keys matching a pattern
1305    ///
1306    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1307    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1308    ///
1309    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1310    ///
1311    /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1312    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1313    ///
1314    /// # Arguments
1315    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1316    ///
1317    /// # Example
1318    /// ```rust,no_run
1319    /// # use multi_tier_cache::CacheManager;
1320    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1321    /// // Invalidate all user caches
1322    /// cache_manager.invalidate_pattern("user:*").await?;
1323    ///
1324    /// // Invalidate specific user's related caches
1325    /// cache_manager.invalidate_pattern("user:123:*").await?;
1326    /// # Ok(())
1327    /// # }
1328    /// ```
1329    /// # Errors
1330    ///
1331    /// Returns an error if invalidation fails.
1332    pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1333        debug!(pattern = %pattern, "Invalidating pattern across all tiers");
1334
1335        // 1. Invalidate in all configured tiers
1336        for tier in &self.tiers {
1337            debug!(tier = %tier.tier_level, "Invalidating pattern in tier");
1338            tier.backend.remove_pattern(pattern).await?;
1339        }
1340
1341        // 2. Broadcast invalidation if publisher is configured
1342        if let Some(publisher) = &self.invalidation_publisher {
1343            let msg = InvalidationMessage::remove_pattern(pattern);
1344            publisher.lock().await.publish(&msg).await?;
1345            debug!(pattern = %pattern, "Broadcasted pattern invalidation");
1346        }
1347
1348        Ok(())
1349    }
1350
1351    /// Set value with automatic broadcast to all instances
1352    ///
1353    /// This is a write-through operation that updates the cache and
1354    /// broadcasts the update to all other instances automatically.
1355    ///
1356    /// # Arguments
1357    /// * `key` - Cache key
1358    /// * `value` - Value to cache
1359    /// * `strategy` - Cache strategy (determines TTL)
1360    ///
1361    /// # Example
1362    /// ```rust,no_run
1363    /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1364    /// # use bytes::Bytes;
1365    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1366    /// // Update and broadcast in one call
1367    /// let data = Bytes::from("active");
1368    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1369    /// # Ok(())
1370    /// # }
1371    /// ```
1372    /// # Errors
1373    ///
1374    /// Returns an error if cache set or broadcast fails.
1375    pub async fn set_with_broadcast(
1376        &self,
1377        key: &str,
1378        value: Bytes,
1379        strategy: CacheStrategy,
1380    ) -> Result<()> {
1381        let ttl = strategy.to_duration();
1382
1383        // Set in local caches
1384        self.set_with_strategy(key, value.clone(), strategy).await?;
1385
1386        // Broadcast update if invalidation is enabled
1387        if let Some(publisher) = &self.invalidation_publisher {
1388            let mut pub_lock = publisher.lock().await;
1389            let msg = InvalidationMessage::update(key, value, Some(ttl));
1390            pub_lock.publish(&msg).await?;
1391            self.invalidation_stats
1392                .messages_sent
1393                .fetch_add(1, Ordering::Relaxed);
1394        }
1395
1396        Ok(())
1397    }
1398
1399    /// Get invalidation statistics
1400    ///
1401    /// Returns statistics about invalidation operations if invalidation is enabled.
1402    pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1403        if self.invalidation_subscriber.is_some() {
1404            Some(self.invalidation_stats.snapshot())
1405        } else {
1406            None
1407        }
1408    }
1409}
1410
1411/// Cache Manager statistics
1412#[allow(dead_code)]
1413#[derive(Debug, Clone)]
1414pub struct CacheManagerStats {
1415    pub total_requests: u64,
1416    pub l1_hits: u64,
1417    pub l2_hits: u64,
1418    pub total_hits: u64,
1419    pub misses: u64,
1420    pub hit_rate: f64,
1421    pub l1_hit_rate: f64,
1422    pub promotions: usize,
1423    pub in_flight_requests: usize,
1424}