Skip to main content

multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use crate::error::CacheResult;
6use dashmap::DashMap;
7use rand::Rng;
8use std::future::Future;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::time::Duration;
12#[cfg(feature = "redis")]
13#[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
14use tokio::sync::Mutex;
15use tokio::sync::broadcast;
16use tracing::{debug, error, info, warn};
17
18#[cfg(feature = "moka")]
19#[cfg_attr(docsrs, doc(cfg(feature = "moka")))]
20use crate::L1Cache;
21#[cfg(feature = "redis")]
22#[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
23use crate::L2Cache;
24#[cfg(feature = "redis")]
25#[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
26use crate::invalidation::{
27    AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
28    InvalidationSubscriber,
29};
30use crate::serialization::{CacheSerializer, JsonSerializer};
31use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
32use bytes::Bytes;
33use futures_util::future::BoxFuture;
34
35///// Type alias for the in-flight requests map
36/// Stores a broadcast sender for each active key computation
37type InFlightMap = DashMap<String, broadcast::Sender<CacheResult<Bytes>>>;
38
39/// Cache strategies for different data types
40#[derive(Debug, Clone)]
41#[allow(dead_code)]
42pub enum CacheStrategy {
43    /// Real-time data - 10 seconds TTL
44    RealTime,
45    /// Short-term data - 5 minutes TTL  
46    ShortTerm,
47    /// Medium-term data - 1 hour TTL
48    MediumTerm,
49    /// Long-term data - 3 hours TTL
50    LongTerm,
51    /// Custom TTL
52    Custom(Duration),
53    /// Default strategy (5 minutes)
54    Default,
55}
56
57impl CacheStrategy {
58    /// Convert strategy to duration
59    #[must_use]
60    pub fn to_duration(&self) -> Duration {
61        match self {
62            Self::RealTime => Duration::from_secs(10),
63            Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
64            Self::MediumTerm => Duration::from_secs(3600),               // 1 hour
65            Self::LongTerm => Duration::from_secs(10800),                // 3 hours
66            Self::Custom(duration) => *duration,
67        }
68    }
69}
70
71/// Statistics for a single cache tier
72#[derive(Debug)]
73pub struct TierStats {
74    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
75    pub tier_level: usize,
76    /// Number of cache hits at this tier
77    pub hits: AtomicU64,
78    /// Backend name for identification
79    pub backend_name: String,
80}
81
82impl Clone for TierStats {
83    fn clone(&self) -> Self {
84        Self {
85            tier_level: self.tier_level,
86            hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
87            backend_name: self.backend_name.clone(),
88        }
89    }
90}
91
92impl TierStats {
93    fn new(tier_level: usize, backend_name: String) -> Self {
94        Self {
95            tier_level,
96            hits: AtomicU64::new(0),
97            backend_name,
98        }
99    }
100
101    /// Get current hit count
102    pub fn hit_count(&self) -> u64 {
103        self.hits.load(Ordering::Relaxed)
104    }
105}
106
107/// A single cache tier in the multi-tier architecture
108#[derive(Clone)]
109pub struct CacheTier {
110    /// The backend for this tier
111    pub backend: Arc<dyn L2CacheBackend>,
112    /// Tier level (1 for fastest, increases for slower/cheaper tiers)
113    pub tier_level: usize,
114    /// Whether to promote keys FROM lower tiers TO this tier
115    pub promotion_enabled: bool,
116    /// Promotion frequency (N) - promote with 1/N probability
117    pub promotion_frequency: usize,
118    /// TTL multiplier for this tier (e.g., L2 might store for 2x L1 TTL)
119    pub ttl_scale: f64,
120    /// Statistics for this tier
121    pub stats: TierStats,
122}
123
124impl CacheTier {
125    /// Create a new cache tier
126    pub fn new(
127        backend: Arc<dyn L2CacheBackend>,
128        tier_level: usize,
129        promotion_enabled: bool,
130        promotion_frequency: usize,
131        ttl_scale: f64,
132    ) -> Self {
133        let backend_name = backend.name().to_string();
134        Self {
135            backend,
136            tier_level,
137            promotion_enabled,
138            promotion_frequency,
139            ttl_scale,
140            stats: TierStats::new(tier_level, backend_name),
141        }
142    }
143
144    /// Get value with TTL from this tier
145    async fn get_with_ttl(&self, key: &str) -> Option<(Bytes, Option<Duration>)> {
146        self.backend.get_with_ttl(key).await
147    }
148
149    /// Set value with TTL in this tier
150    async fn set_with_ttl(&self, key: &str, value: Bytes, ttl: Duration) -> CacheResult<()> {
151        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
152        self.backend.set_with_ttl(key, value, scaled_ttl).await
153    }
154
155    /// Remove value from this tier
156    async fn remove(&self, key: &str) -> CacheResult<()> {
157        self.backend.remove(key).await
158    }
159
160    /// Record a cache hit for this tier
161    fn record_hit(&self) {
162        self.stats.hits.fetch_add(1, Ordering::Relaxed);
163    }
164}
165
166/// Configuration for a cache tier (used in builder pattern)
167#[derive(Debug, Clone)]
168pub struct TierConfig {
169    /// Tier level (1, 2, 3, 4...)
170    pub tier_level: usize,
171    /// Enable promotion to upper tiers on hit
172    pub promotion_enabled: bool,
173    /// Promotion frequency (N) - promote with 1/N probability (default 10)
174    pub promotion_frequency: usize,
175    /// TTL scale factor (1.0 = same as base TTL)
176    pub ttl_scale: f64,
177}
178
179impl TierConfig {
180    /// Create new tier configuration
181    #[must_use]
182    pub fn new(tier_level: usize) -> Self {
183        Self {
184            tier_level,
185            promotion_enabled: true,
186            promotion_frequency: 10,
187            ttl_scale: 1.0,
188        }
189    }
190
191    /// Configure as L1 (hot tier)
192    #[must_use]
193    pub fn as_l1() -> Self {
194        Self {
195            tier_level: 1,
196            promotion_enabled: false, // L1 is already top tier
197            promotion_frequency: 1,   // Doesn't matter but use 1
198            ttl_scale: 1.0,
199        }
200    }
201
202    /// Configure as L2 (warm tier)
203    #[must_use]
204    pub fn as_l2() -> Self {
205        Self {
206            tier_level: 2,
207            promotion_enabled: true,
208            promotion_frequency: 10,
209            ttl_scale: 1.0,
210        }
211    }
212
213    /// Configure as L3 (cold tier) with longer TTL
214    #[must_use]
215    pub fn as_l3() -> Self {
216        Self {
217            tier_level: 3,
218            promotion_enabled: true,
219            promotion_frequency: 10,
220            ttl_scale: 2.0, // Keep data 2x longer
221        }
222    }
223
224    /// Configure as L4 (archive tier) with much longer TTL
225    #[must_use]
226    pub fn as_l4() -> Self {
227        Self {
228            tier_level: 4,
229            promotion_enabled: true,
230            promotion_frequency: 10,
231            ttl_scale: 8.0, // Keep data 8x longer
232        }
233    }
234
235    /// Set promotion enabled
236    #[must_use]
237    pub fn with_promotion(mut self, enabled: bool) -> Self {
238        self.promotion_enabled = enabled;
239        self
240    }
241
242    /// Set promotion frequency (N)
243    #[must_use]
244    pub fn with_promotion_frequency(mut self, n: usize) -> Self {
245        self.promotion_frequency = n;
246        self
247    }
248
249    /// Set TTL scale factor
250    #[must_use]
251    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
252        self.ttl_scale = scale;
253        self
254    }
255
256    /// Set tier level
257    #[must_use]
258    pub fn with_level(mut self, level: usize) -> Self {
259        self.tier_level = level;
260        self
261    }
262}
263
264pub struct CacheManager {
265    /// Ordered list of cache tiers (L1, L2, L3, ...)
266    tiers: Vec<CacheTier>,
267
268    /// Optional streaming backend
269    streaming_backend: Option<Arc<dyn StreamingBackend>>,
270    /// Statistics
271    total_requests: AtomicU64,
272    l1_hits: AtomicU64,
273    l2_hits: AtomicU64,
274    misses: AtomicU64,
275    /// In-flight requests map (Broadcaster integration will replace this in Step 4)
276    in_flight_requests: Arc<InFlightMap>,
277    /// Pluggable serializer
278    serializer: Arc<CacheSerializer>,
279    /// Invalidation publisher
280    #[cfg(feature = "redis")]
281    #[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
282    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
283    /// Invalidation subscriber
284    #[cfg(feature = "redis")]
285    #[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
286    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
287    /// Invalidation statistics
288    #[cfg(feature = "redis")]
289    #[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
290    invalidation_stats: Arc<AtomicInvalidationStats>,
291    /// Number of promotions performed
292    promotions: AtomicUsize,
293}
294
295impl CacheManager {
296    /// Create new cache manager with trait objects (pluggable backends)
297    ///
298    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
299    ///
300    /// # Arguments
301    ///
302    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
303    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
304    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
305    ///
306    /// # Example
307    ///
308    /// ```rust,ignore
309    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
310    /// use std::sync::Arc;
311    ///
312    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
313    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
314    ///
315    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
316    /// ```
317    /// # Errors
318    ///
319    /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
320    pub fn new_with_backends(
321        l1_cache: Arc<dyn CacheBackend>,
322        l2_cache: Arc<dyn L2CacheBackend>,
323        streaming_backend: Option<Arc<dyn StreamingBackend>>,
324    ) -> CacheResult<Self> {
325        debug!("Initializing Cache Manager with L1+L2 backends...");
326
327        let tiers = vec![
328            CacheTier::new(Arc::new(ProxyL1ToL2(l1_cache)), 1, false, 1, 1.0),
329            CacheTier::new(l2_cache, 2, true, 10, 1.0),
330        ];
331
332        Ok(Self {
333            tiers,
334            streaming_backend,
335            total_requests: AtomicU64::new(0),
336            l1_hits: AtomicU64::new(0),
337            l2_hits: AtomicU64::new(0),
338            misses: AtomicU64::new(0),
339            promotions: AtomicUsize::new(0),
340            in_flight_requests: Arc::new(DashMap::new()),
341            serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
342            #[cfg(feature = "redis")]
343            invalidation_publisher: None,
344            #[cfg(feature = "redis")]
345            invalidation_subscriber: None,
346            #[cfg(feature = "redis")]
347            #[cfg(feature = "redis")]
348            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
349        })
350    }
351
352    /// Create new cache manager with default backends (backward compatible)
353    ///
354    /// This is the legacy constructor maintained for backward compatibility.
355    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
356    ///
357    /// # Arguments
358    ///
359    /// * `l1_cache` - Moka L1 cache instance
360    /// * `l2_cache` - Redis L2 cache instance
361    /// # Errors
362    ///
363    /// Returns an error if Redis connection fails.
364    #[cfg(all(feature = "moka", feature = "redis"))]
365    #[cfg_attr(docsrs, doc(cfg(all(feature = "moka", feature = "redis"))))]
366    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> CacheResult<Self> {
367        debug!("Initializing Cache Manager...");
368
369        // Convert concrete types to trait objects
370        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
371        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
372
373        // Create RedisStreams backend for streaming functionality
374        let streaming_backend: Option<Arc<dyn StreamingBackend>> = {
375            let redis_url =
376                std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
377            let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
378            Some(Arc::new(redis_streams))
379        };
380
381        Self::new_with_backends(l1_backend, l2_backend, streaming_backend)
382    }
383
384    /// Create new cache manager with invalidation support
385    ///
386    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
387    ///
388    /// # Arguments
389    ///
390    /// * `l1_cache` - Moka L1 cache instance
391    /// * `l2_cache` - Redis L2 cache instance
392    /// * `redis_url` - Redis connection URL for Pub/Sub
393    /// * `config` - Invalidation configuration
394    ///
395    /// # Example
396    ///
397    /// ```rust,ignore
398    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
399    ///
400    /// let config = InvalidationConfig {
401    ///     channel: "my_app:cache:invalidate".to_string(),
402    ///     ..Default::default()
403    /// };
404    ///
405    /// let manager = CacheManager::new_with_invalidation(
406    ///     l1, l2, "redis://localhost", config
407    /// ).await?;
408    /// ```
409    /// # Errors
410    ///
411    /// Returns an error if Redis connection fails or invalidation setup fails.
412    #[cfg(all(feature = "moka", feature = "redis"))]
413    #[cfg_attr(docsrs, doc(cfg(all(feature = "moka", feature = "redis"))))]
414    pub async fn new_with_invalidation(
415        l1_cache: Arc<L1Cache>,
416        l2_cache: Arc<L2Cache>,
417        redis_url: &str,
418        config: InvalidationConfig,
419    ) -> CacheResult<Self> {
420        debug!("Initializing Cache Manager with Invalidation...");
421        debug!("  Pub/Sub channel: {}", config.channel);
422
423        // Convert concrete types to trait objects
424        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
425        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
426
427        // Create RedisStreams backend for streaming functionality
428        let streaming_backend: Option<Arc<dyn StreamingBackend>> = {
429            let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
430            Some(Arc::new(redis_streams))
431        };
432
433        // Create publisher
434        let (invalidation_publisher, invalidation_subscriber) = {
435            let client = redis::Client::open(redis_url)?;
436            let conn_manager = redis::aio::ConnectionManager::new(client).await?;
437            let publisher = InvalidationPublisher::new(conn_manager, config.clone());
438
439            // Create subscriber
440            let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
441            (
442                Some(Arc::new(Mutex::new(publisher))),
443                Some(Arc::new(subscriber)),
444            )
445        };
446
447        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
448
449        let tiers = vec![
450            CacheTier::new(Arc::new(ProxyL1ToL2(l1_backend)), 1, false, 1, 1.0),
451            CacheTier::new(l2_backend, 2, true, 10, 1.0),
452        ];
453
454        let manager = Self {
455            tiers,
456            streaming_backend,
457            total_requests: AtomicU64::new(0),
458            l1_hits: AtomicU64::new(0),
459            l2_hits: AtomicU64::new(0),
460            misses: AtomicU64::new(0),
461            promotions: AtomicUsize::new(0),
462            in_flight_requests: Arc::new(DashMap::new()),
463            serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
464            invalidation_publisher,
465            invalidation_subscriber,
466            invalidation_stats,
467        };
468
469        // Start subscriber with handler
470        manager.start_invalidation_subscriber();
471
472        info!("Cache Manager initialized with invalidation support");
473
474        Ok(manager)
475    }
476
477    /// Create new cache manager with multi-tier architecture (v0.5.0+)
478    ///
479    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
480    /// Tiers are checked in order (lower `tier_level` = faster/hotter).
481    ///
482    /// # Arguments
483    ///
484    /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
485    /// * `streaming_backend` - Optional streaming backend
486    ///
487    /// # Example
488    ///
489    /// ```rust,ignore
490    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
491    /// use std::sync::Arc;
492    ///
493    /// // L1 + L2 + L3 setup
494    /// let l1 = Arc::new(L1Cache::new()?);
495    /// let l2 = Arc::new(L2Cache::new().await?);
496    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
497    ///
498    /// let tiers = vec![
499    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
500    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
501    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
502    /// ];
503    ///
504    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
505    /// ```
506    /// # Errors
507    ///
508    /// Returns an error if tiers are not sorted by level or if no tiers are provided.
509    pub fn new_with_tiers(
510        tiers: Vec<CacheTier>,
511        streaming_backend: Option<Arc<dyn StreamingBackend>>,
512    ) -> CacheResult<Self> {
513        debug!("Initializing Multi-Tier Cache Manager...");
514        debug!("  Tier count: {}", tiers.len());
515        for tier in &tiers {
516            debug!(
517                "  L{}: {} (promotion={}, ttl_scale={})",
518                tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
519            );
520        }
521
522        // Validate tiers are sorted by level
523        for i in 1..tiers.len() {
524            if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1))
525                && current.tier_level <= prev.tier_level
526            {
527                return Err(crate::error::CacheError::ConfigError(format!(
528                    "Tiers must be sorted by tier_level ascending (found L{} after L{})",
529                    current.tier_level, prev.tier_level
530                )));
531            }
532        }
533
534        Ok(Self {
535            tiers,
536            streaming_backend,
537            total_requests: AtomicU64::new(0),
538            l1_hits: AtomicU64::new(0),
539            l2_hits: AtomicU64::new(0),
540            misses: AtomicU64::new(0),
541            promotions: AtomicUsize::new(0),
542            in_flight_requests: Arc::new(DashMap::new()),
543            serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
544            #[cfg(feature = "redis")]
545            invalidation_publisher: None,
546            #[cfg(feature = "redis")]
547            invalidation_subscriber: None,
548            #[cfg(feature = "redis")]
549            #[cfg(feature = "redis")]
550            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
551        })
552    }
553
554    /// Set a custom serializer for the cache manager
555    pub fn set_serializer(&mut self, serializer: CacheSerializer) {
556        debug!(name = %serializer.name(), "Switching cache serializer");
557        self.serializer = Arc::new(serializer);
558    }
559
560    /// Start the invalidation subscriber background task
561    #[cfg(feature = "redis")]
562    #[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
563    fn start_invalidation_subscriber(&self) {
564        #[cfg(feature = "redis")]
565        if let Some(subscriber) = &self.invalidation_subscriber {
566            let tiers = self.tiers.clone();
567
568            subscriber.start(move |msg: crate::invalidation::InvalidationMessage| {
569                let tiers = tiers.clone();
570                async move {
571                    for tier in &tiers {
572                        match &msg {
573                            InvalidationMessage::Remove { key } => {
574                                tier.backend.remove(key).await.ok();
575                            }
576                            InvalidationMessage::Update {
577                                key,
578                                value,
579                                ttl_secs,
580                            } => {
581                                if let Some(secs) = ttl_secs {
582                                    tier.backend
583                                        .set_with_ttl(
584                                            key,
585                                            value.clone(),
586                                            Duration::from_secs(*secs),
587                                        )
588                                        .await
589                                        .ok();
590                                } else {
591                                    tier.backend.set(key, value.clone()).await.ok();
592                                }
593                            }
594                            InvalidationMessage::RemovePattern { pattern } => {
595                                if let Err(e) = tier.backend.remove_pattern(pattern).await {
596                                    warn!(
597                                        "Failed to remove pattern '{}' from L{}: {}",
598                                        pattern, tier.tier_level, e
599                                    );
600                                }
601                            }
602                            InvalidationMessage::RemoveBulk { keys } => {
603                                for key in keys {
604                                    if let Err(e) = tier.backend.remove(key).await {
605                                        warn!(
606                                            "Failed to remove '{}' from L{}: {}",
607                                            key, tier.tier_level, e
608                                        );
609                                    }
610                                }
611                            }
612                        }
613                    }
614                    Ok(())
615                }
616            });
617
618            info!("Invalidation subscriber started across all tiers");
619        }
620    }
621
622    /// Get value from cache using multi-tier architecture (v0.5.0+)
623    ///
624    /// This method iterates through all configured tiers and automatically promotes
625    /// to upper tiers on cache hit.
626    async fn get_multi_tier(&self, key: &str) -> CacheResult<Option<Bytes>> {
627        // Try each tier sequentially (sorted by tier_level)
628        for (tier_index, tier) in self.tiers.iter().enumerate() {
629            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
630                // Cache hit!
631                tier.record_hit();
632
633                // Promote to all upper tiers (if promotion enabled)
634                if tier.promotion_enabled && tier_index > 0 {
635                    // Probabilistic Promotion Check
636                    let should_promote = if tier.promotion_frequency <= 1 {
637                        true
638                    } else {
639                        rand::thread_rng().gen_ratio(
640                            1,
641                            u32::try_from(tier.promotion_frequency).unwrap_or(u32::MAX),
642                        )
643                    };
644
645                    if should_promote {
646                        let promotion_ttl =
647                            ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
648
649                        // Promote to all tiers above this one
650                        for upper_tier in self.tiers.iter().take(tier_index).rev() {
651                            if let Err(e) = upper_tier
652                                .set_with_ttl(key, value.clone(), promotion_ttl)
653                                .await
654                            {
655                                warn!(
656                                    "Failed to promote '{}' from L{} to L{}: {}",
657                                    key, tier.tier_level, upper_tier.tier_level, e
658                                );
659                            } else {
660                                self.promotions.fetch_add(1, Ordering::Relaxed);
661                                debug!(
662                                    "Promoted '{}' from L{} to L{} (TTL: {:?})",
663                                    key, tier.tier_level, upper_tier.tier_level, promotion_ttl
664                                );
665                            }
666                        }
667                    } else {
668                        debug!(
669                            "Probabilistic skip promotion for '{}' from L{} (N={})",
670                            key, tier.tier_level, tier.promotion_frequency
671                        );
672                    }
673                }
674
675                return Ok(Some(value));
676            }
677        }
678
679        // Cache miss across all tiers
680        Ok(None)
681    }
682
683    /// Get value from cache (L1 first, then L2 fallback with promotion)
684    ///
685    /// This method now includes built-in Cache Stampede protection when cache misses occur.
686    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
687    /// unnecessary duplicate work on external data sources.
688    ///
689    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
690    ///
691    /// # Arguments
692    /// * `key` - Cache key to retrieve
693    ///
694    /// # Returns
695    /// * `Ok(Some(value))` - Cache hit, value found in any tier
696    /// * `Ok(None)` - Cache miss, value not found in any cache
697    /// * `Err(error)` - Cache operation failed
698    /// # Errors
699    ///
700    /// Returns an error if cache operation fails.
701    ///
702    /// # Panics
703    ///
704    /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
705    pub async fn get(&self, key: &str) -> CacheResult<Option<Bytes>> {
706        self.total_requests.fetch_add(1, Ordering::Relaxed);
707
708        // Fast path for L1 (first tier) - no locking needed
709        if let Some(tier1) = self.tiers.first()
710            && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
711        {
712            tier1.record_hit();
713            // Update legacy stats for backward compatibility
714            self.l1_hits.fetch_add(1, Ordering::Relaxed);
715            return Ok(Some(value));
716        }
717
718        // L1 miss - use stampede protection for lower tiers
719        let key_owned = key.to_string();
720        let lock_guard = self
721            .in_flight_requests
722            .entry(key_owned.clone())
723            .or_insert_with(|| broadcast::Sender::new(1))
724            .clone();
725
726        let mut rx = lock_guard.subscribe();
727
728        // If there are other receivers, someone else is computing, wait for it
729        if lock_guard.receiver_count() > 1 {
730            match rx.recv().await {
731                Ok(Ok(value)) => return Ok(Some(value)),
732                Ok(Err(e)) => {
733                    return Err(crate::error::CacheError::BackendError(format!(
734                        "Computation failed in another thread: {e}"
735                    )));
736                }
737                Err(_) => {} // Fall through to re-compute if sender dropped or channel empty
738            }
739        }
740
741        // Double-check L1 after acquiring lock (or if we are the first to compute)
742        if let Some(tier1) = self.tiers.first()
743            && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
744        {
745            tier1.record_hit();
746            self.l1_hits.fetch_add(1, Ordering::Relaxed);
747            let _ = lock_guard.send(Ok(value.clone())); // Notify any waiting subscribers
748            return Ok(Some(value));
749        }
750
751        // Check remaining tiers with promotion
752        let result = self.get_multi_tier(key).await?;
753
754        if let Some(val) = result.clone() {
755            // Hit in L2+ tier - update legacy stats
756            if self.tiers.len() >= 2 {
757                self.l2_hits.fetch_add(1, Ordering::Relaxed);
758            }
759
760            // Notify any waiting subscribers
761            let _ = lock_guard.send(Ok(val.clone()));
762
763            // Remove the in-flight entry after computation/retrieval
764            self.in_flight_requests.remove(key);
765
766            Ok(Some(val))
767        } else {
768            self.misses.fetch_add(1, Ordering::Relaxed);
769
770            // Remove the in-flight entry after computation/retrieval
771            self.in_flight_requests.remove(key);
772
773            Ok(None)
774        }
775    }
776
777    /// Get a value from cache and deserialize it (Type-Safe Version)
778    ///
779    /// # Errors
780    ///
781    /// Returns a `SerializationError` if deserialization fails, or a `BackendError` if the cache retrieval fails.
782    pub async fn get_typed<T>(&self, key: &str) -> CacheResult<Option<T>>
783    where
784        T: serde::de::DeserializeOwned,
785    {
786        if let Some(bytes) = self.get(key).await? {
787            return Ok(Some(self.serializer.deserialize::<T>(&bytes)?));
788        }
789        Ok(None)
790    }
791
792    /// Set value with specific cache strategy (all tiers)
793    ///
794    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
795    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
796    /// # Errors
797    ///
798    /// Returns an error if cache set operation fails.
799    pub async fn set_with_strategy(
800        &self,
801        key: &str,
802        value: Bytes,
803        strategy: CacheStrategy,
804    ) -> CacheResult<()> {
805        let ttl = strategy.to_duration();
806
807        let mut success_count = 0;
808        let mut last_error = None;
809
810        for tier in &self.tiers {
811            match tier.set_with_ttl(key, value.clone(), ttl).await {
812                Ok(()) => {
813                    success_count += 1;
814                }
815                Err(e) => {
816                    error!(
817                        "L{} cache set failed for key '{}': {}",
818                        tier.tier_level, key, e
819                    );
820                    last_error = Some(e);
821                }
822            }
823        }
824
825        if success_count > 0 {
826            debug!(
827                "[Cache] Stored '{}' in {}/{} tiers (base TTL: {:?})",
828                key,
829                success_count,
830                self.tiers.len(),
831                ttl
832            );
833            return Ok(());
834        }
835
836        Err(last_error.unwrap_or_else(|| {
837            crate::error::CacheError::InternalError("All tiers failed".to_string())
838        }))
839    }
840
841    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
842    ///
843    /// This method provides comprehensive Cache Stampede protection:
844    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
845    /// 2. Check L2 cache with mutex-based coalescing
846    /// 3. Compute fresh data with protection against concurrent computations
847    ///
848    /// # Arguments
849    /// * `key` - Cache key
850    /// * `strategy` - Cache strategy for TTL and storage behavior
851    /// * `compute_fn` - Async function to compute the value if not in any cache
852    ///
853    /// # Example
854    /// ```ignore
855    /// let api_data = cache_manager.get_or_compute_with(
856    ///     "api_response",
857    ///     CacheStrategy::RealTime,
858    ///     || async {
859    ///         fetch_data_from_api().await
860    ///     }
861    /// ).await?;
862    /// ```
863    #[allow(dead_code)]
864    /// # Errors
865    ///
866    /// Returns an error if compute function fails or cache operations fail.
867    pub async fn get_or_compute_with<F, Fut>(
868        &self,
869        key: &str,
870        strategy: CacheStrategy,
871        compute_fn: F,
872    ) -> CacheResult<Bytes>
873    where
874        F: FnOnce() -> Fut + Send,
875        Fut: Future<Output = CacheResult<Bytes>> + Send,
876    {
877        self.total_requests.fetch_add(1, Ordering::Relaxed);
878
879        // 1. Try tiers sequentially first
880        for (idx, tier) in self.tiers.iter().enumerate() {
881            if let Some((value, _ttl)) = tier.get_with_ttl(key).await {
882                tier.record_hit();
883                if tier.tier_level == 1 {
884                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
885                } else if tier.tier_level == 2 {
886                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
887                }
888
889                // Promotion to L1 if hit was in a lower tier
890                if idx > 0 && tier.promotion_enabled {
891                    // Probabilistic Promotion Check
892                    let should_promote = if tier.promotion_frequency <= 1 {
893                        true
894                    } else {
895                        rand::thread_rng().gen_ratio(
896                            1,
897                            u32::try_from(tier.promotion_frequency).unwrap_or(u32::MAX),
898                        )
899                    };
900
901                    if should_promote && let Some(l1_tier) = self.tiers.first() {
902                        let _ = l1_tier
903                            .set_with_ttl(key, value.clone(), strategy.to_duration())
904                            .await;
905                        self.promotions.fetch_add(1, Ordering::Relaxed);
906                    }
907                }
908                return Ok(value);
909            }
910        }
911
912        // 2. Cache miss across all tiers - use stampede protection
913        let (tx, mut rx): (
914            tokio::sync::broadcast::Sender<CacheResult<Bytes>>,
915            tokio::sync::broadcast::Receiver<CacheResult<Bytes>>,
916        ) = match self.in_flight_requests.entry(key.to_string()) {
917            dashmap::mapref::entry::Entry::Occupied(entry) => {
918                let tx = entry.get().clone();
919                (tx.clone(), tx.subscribe())
920            }
921            dashmap::mapref::entry::Entry::Vacant(entry) => {
922                let (tx, _) = tokio::sync::broadcast::channel(1);
923                entry.insert(tx.clone());
924                (tx.clone(), tx.subscribe())
925            }
926        };
927
928        if tx.receiver_count() > 1 {
929            // Someone else is computing, wait for it
930            match rx.recv().await {
931                Ok(Ok(value)) => return Ok(value),
932                Ok(Err(e)) => {
933                    return Err(crate::error::CacheError::BackendError(format!(
934                        "Computation failed in another thread: {e}"
935                    )));
936                }
937                Err(_) => {} // Fall through to re-compute
938            }
939        }
940
941        // 3. Re-check cache after receiving/creating broadcaster (double-check pattern)
942        for tier in &self.tiers {
943            if let Some((value, _)) = tier.get_with_ttl(key).await {
944                let _ = tx.send(Ok(value.clone()));
945                return Ok(value);
946            }
947        }
948
949        // 4. Miss - compute fresh data
950        debug!(
951            "Computing fresh data for key: '{}' (Stampede protected)",
952            key
953        );
954
955        let result = compute_fn().await;
956
957        // Remove from in_flight BEFORE broadcasting
958        self.in_flight_requests.remove(key);
959
960        match &result {
961            Ok(value) => {
962                let _ = self.set_with_strategy(key, value.clone(), strategy).await;
963                let _ = tx.send(Ok(value.clone()));
964            }
965            Err(e) => {
966                let _ = tx.send(Err(e.clone()));
967            }
968        }
969
970        result
971    }
972
973    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
974    ///
975    /// This method provides the same functionality as `get_or_compute_with()` but with
976    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
977    /// API calls, or any computation that returns structured data.
978    ///
979    /// # Type Safety
980    ///
981    /// - Returns your actual type `T` instead of `serde_json::Value`
982    /// - Compiler enforces Serialize + `DeserializeOwned` bounds
983    /// - No manual JSON conversion needed
984    ///
985    /// # Cache Flow
986    ///
987    /// 1. Check L1 cache → deserialize if found
988    /// 2. Check L2 cache → deserialize + promote to L1 if found
989    /// 3. Execute `compute_fn` → serialize → store in L1+L2
990    /// 4. Full stampede protection (only ONE request computes)
991    ///
992    /// # Arguments
993    ///
994    /// * `key` - Cache key
995    /// * `strategy` - Cache strategy for TTL
996    /// * `compute_fn` - Async function returning `Result<T>`
997    ///
998    /// # Example - Database Query
999    ///
1000    /// ```no_run
1001    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1002    /// # use std::sync::Arc;
1003    /// # use serde::{Serialize, Deserialize};
1004    /// # async fn example() -> anyhow::Result<()> {
1005    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1006    /// # let l2 = Arc::new(L2Cache::new().await?);
1007    /// # let cache_manager = CacheManager::new(l1, l2);
1008    ///
1009    /// #[derive(Serialize, Deserialize)]
1010    /// struct User {
1011    ///     id: i64,
1012    ///     name: String,
1013    /// }
1014    ///
1015    /// // Type-safe database caching (example - requires sqlx)
1016    /// // let user: User = cache_manager.get_or_compute_typed(
1017    /// //     "user:123",
1018    /// //     CacheStrategy::MediumTerm,
1019    /// //     || async {
1020    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1021    /// //             .bind(123)
1022    /// //             .fetch_one(&pool)
1023    /// //             .await
1024    /// //     }
1025    /// // ).await?;
1026    /// # Ok(())
1027    /// # }
1028    /// ```
1029    ///
1030    /// # Example - API Call
1031    ///
1032    /// ```no_run
1033    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1034    /// # use std::sync::Arc;
1035    /// # use serde::{Serialize, Deserialize};
1036    /// # async fn example() -> anyhow::Result<()> {
1037    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1038    /// # let l2 = Arc::new(L2Cache::new().await?);
1039    /// # let cache_manager = CacheManager::new(l1, l2);
1040    /// #[derive(Serialize, Deserialize)]
1041    /// struct ApiResponse {
1042    ///     data: String,
1043    ///     timestamp: i64,
1044    /// }
1045    ///
1046    /// // API call caching (example - requires reqwest)
1047    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1048    /// //     "api:endpoint",
1049    /// //     CacheStrategy::RealTime,
1050    /// //     || async {
1051    /// //         reqwest::get("https://api.example.com/data")
1052    /// //             .await?
1053    /// //             .json::<ApiResponse>()
1054    /// //             .await
1055    /// //     }
1056    /// // ).await?;
1057    /// # Ok(())
1058    /// # }
1059    /// ```
1060    ///
1061    /// # Performance
1062    ///
1063    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1064    /// - L2 hit: 2-5ms + deserialization + L1 promotion
1065    /// - Compute: Your function time + serialization + L1+L2 storage
1066    /// - Stampede protection: 99.6% latency reduction under high concurrency
1067    ///
1068    /// # Errors
1069    ///
1070    /// Returns error if:
1071    /// - Compute function fails
1072    /// - Serialization fails (invalid type for JSON)
1073    /// - Deserialization fails (cache data doesn't match type T)
1074    /// - Cache operations fail (Redis connection issues)
1075    #[allow(clippy::too_many_lines)]
1076    pub async fn get_or_compute_typed<T, F, Fut>(
1077        &self,
1078        key: &str,
1079        strategy: CacheStrategy,
1080        compute_fn: F,
1081    ) -> CacheResult<T>
1082    where
1083        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1084        F: FnOnce() -> Fut + Send,
1085        Fut: Future<Output = CacheResult<T>> + Send,
1086    {
1087        // 1. Try to get typed from cache first
1088        if let Some(value) = self.get_typed::<T>(key).await? {
1089            return Ok(value);
1090        }
1091
1092        // 2. Use get_or_compute_with to handle stampede protection
1093        let serializer = self.serializer.clone();
1094        let bytes_result = self
1095            .get_or_compute_with(key, strategy, || async move {
1096                let val = compute_fn().await?;
1097                serializer.serialize(&val)
1098            })
1099            .await?;
1100
1101        // 3. Deserialize result
1102        self.serializer.deserialize::<T>(&bytes_result)
1103    }
1104
1105    /// Get comprehensive cache statistics
1106    ///
1107    /// In multi-tier mode, aggregates statistics from all tiers.
1108    /// In legacy mode, returns L1 and L2 stats.
1109    #[allow(dead_code)]
1110    pub fn get_stats(&self) -> CacheManagerStats {
1111        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1112        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1113        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1114        let misses = self.misses.load(Ordering::Relaxed);
1115
1116        CacheManagerStats {
1117            total_requests: total_reqs,
1118            l1_hits,
1119            l2_hits,
1120            total_hits: l1_hits + l2_hits,
1121            misses,
1122            hit_rate: if total_reqs > 0 {
1123                #[allow(clippy::cast_precision_loss)]
1124                {
1125                    ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1126                }
1127            } else {
1128                0.0
1129            },
1130            l1_hit_rate: if total_reqs > 0 {
1131                #[allow(clippy::cast_precision_loss)]
1132                {
1133                    (l1_hits as f64 / total_reqs as f64) * 100.0
1134                }
1135            } else {
1136                0.0
1137            },
1138            promotions: self.promotions.load(Ordering::Relaxed),
1139            in_flight_requests: self.in_flight_requests.len(),
1140        }
1141    }
1142
1143    /// Get per-tier statistics (v0.5.0+)
1144    ///
1145    /// Returns statistics for each tier if multi-tier mode is enabled.
1146    /// Returns None if using legacy 2-tier mode.
1147    ///
1148    /// # Example
1149    /// ```rust,ignore
1150    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1151    ///     for stats in tier_stats {
1152    ///         println!("L{}: {} hits ({})",
1153    ///                  stats.tier_level,
1154    ///                  stats.hit_count(),
1155    ///                  stats.backend_name);
1156    ///     }
1157    /// }
1158    /// ```
1159    pub fn get_tier_stats(&self) -> Vec<TierStats> {
1160        self.tiers.iter().map(|tier| tier.stats.clone()).collect()
1161    }
1162}
1163
1164/// Proxy wrapper to allow using `CacheBackend` where `DynL2CacheBackend` is expected
1165/// (Internal helper for `new_with_backends` to wrap L1 `CacheBackend` into `DynL2CacheBackend`)
1166struct ProxyL1ToL2(Arc<dyn CacheBackend>);
1167
1168impl CacheBackend for ProxyL1ToL2 {
1169    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
1170        self.0.get(key)
1171    }
1172
1173    fn set_with_ttl<'a>(
1174        &'a self,
1175        key: &'a str,
1176        value: Bytes,
1177        ttl: Duration,
1178    ) -> BoxFuture<'a, CacheResult<()>> {
1179        self.0.set_with_ttl(key, value, ttl)
1180    }
1181
1182    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
1183        self.0.remove(key)
1184    }
1185
1186    fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
1187        self.0.remove_pattern(pattern)
1188    }
1189
1190    fn health_check(&self) -> BoxFuture<'_, bool> {
1191        self.0.health_check()
1192    }
1193
1194    fn name(&self) -> &'static str {
1195        self.0.name()
1196    }
1197}
1198
1199impl L2CacheBackend for ProxyL1ToL2 {
1200    fn get_with_ttl<'a>(
1201        &'a self,
1202        key: &'a str,
1203    ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
1204        Box::pin(async move { self.0.get(key).await.map(|v| (v, None)) })
1205    }
1206}
1207
1208impl CacheManager {
1209    // ===== Redis Streams Methods =====
1210
1211    /// Publish data to Redis Stream
1212    ///
1213    /// # Arguments
1214    /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1215    /// * `fields` - Field-value pairs to publish
1216    /// * `maxlen` - Optional max length for stream trimming
1217    ///
1218    /// # Returns
1219    /// The entry ID generated by Redis
1220    ///
1221    /// # Errors
1222    /// Returns error if streaming backend is not configured
1223    pub async fn publish_to_stream(
1224        &self,
1225        stream_key: &str,
1226        fields: Vec<(String, String)>,
1227        maxlen: Option<usize>,
1228    ) -> CacheResult<String> {
1229        match &self.streaming_backend {
1230            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1231            None => Err(crate::error::CacheError::ConfigError(
1232                "Streaming backend not configured".to_string(),
1233            )),
1234        }
1235    }
1236
1237    /// Read latest entries from Redis Stream
1238    ///
1239    /// # Arguments
1240    /// * `stream_key` - Name of the stream
1241    /// * `count` - Number of latest entries to retrieve
1242    ///
1243    /// # Returns
1244    /// Vector of (`entry_id`, fields) tuples (newest first)
1245    ///
1246    /// # Errors
1247    /// Returns error if streaming backend is not configured
1248    pub async fn read_stream_latest(
1249        &self,
1250        stream_key: &str,
1251        count: usize,
1252    ) -> CacheResult<Vec<(String, Vec<(String, String)>)>> {
1253        match &self.streaming_backend {
1254            Some(backend) => backend
1255                .stream_read_latest(stream_key, count)
1256                .await,
1257            None => Err(crate::error::CacheError::ConfigError(
1258                "Streaming backend not configured".to_string(),
1259            )),
1260        }
1261    }
1262
1263    /// Read from Redis Stream with optional blocking
1264    ///
1265    /// # Arguments
1266    /// * `stream_key` - Name of the stream
1267    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1268    /// * `count` - Max entries to retrieve
1269    /// * `block_ms` - Optional blocking timeout in ms
1270    ///
1271    /// # Returns
1272    /// Vector of (`entry_id`, fields) tuples
1273    ///
1274    /// # Errors
1275    /// Returns error if streaming backend is not configured
1276    pub async fn read_stream(
1277        &self,
1278        stream_key: &str,
1279        last_id: &str,
1280        count: usize,
1281        block_ms: Option<usize>,
1282    ) -> CacheResult<Vec<(String, Vec<(String, String)>)>> {
1283        match &self.streaming_backend {
1284            Some(backend) => {
1285                backend
1286                    .stream_read(stream_key, last_id, count, block_ms)
1287                    .await
1288            }
1289            None => Err(crate::error::CacheError::ConfigError(
1290                "Streaming backend not configured".to_string(),
1291            )),
1292        }
1293    }
1294
1295    // ===== Cache Invalidation Methods =====
1296
1297    /// Invalidate a cache key across all instances
1298    ///
1299    /// This removes the key from all cache tiers and broadcasts
1300    /// the invalidation to all other cache instances via Redis Pub/Sub.
1301    ///
1302    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1303    ///
1304    /// # Arguments
1305    /// * `key` - Cache key to invalidate
1306    ///
1307    /// # Example
1308    /// ```rust,no_run
1309    /// # use multi_tier_cache::CacheManager;
1310    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1311    /// // Invalidate user cache after profile update
1312    /// cache_manager.invalidate("user:123").await?;
1313    /// # Ok(())
1314    /// # }
1315    /// ```
1316    /// # Errors
1317    ///
1318    /// Returns an error if invalidation fails.
1319    pub async fn invalidate(&self, key: &str) -> CacheResult<()> {
1320        // Remove from ALL tiers
1321        for tier in &self.tiers {
1322            if let Err(e) = tier.remove(key).await {
1323                warn!(
1324                    "Failed to remove '{}' from L{}: {}",
1325                    key, tier.tier_level, e
1326                );
1327            }
1328        }
1329
1330        // Broadcast to other instances
1331        #[cfg(feature = "redis")]
1332        {
1333            if let Some(publisher) = &self.invalidation_publisher {
1334                let mut pub_lock: tokio::sync::MutexGuard<
1335                    '_,
1336                    crate::invalidation::InvalidationPublisher,
1337                > = publisher.lock().await;
1338                let msg = InvalidationMessage::remove(key);
1339                pub_lock.publish(&msg).await?;
1340                self.invalidation_stats
1341                    .messages_sent
1342                    .fetch_add(1, Ordering::Relaxed);
1343            }
1344        }
1345
1346        debug!("Invalidated '{}' across all instances", key);
1347        Ok(())
1348    }
1349
1350    /// Update cache value across all instances
1351    ///
1352    /// This updates the key in all cache tiers and broadcasts
1353    /// the update to all other cache instances, avoiding cache misses.
1354    ///
1355    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1356    ///
1357    /// # Arguments
1358    /// * `key` - Cache key to update
1359    /// * `value` - New value
1360    /// * `ttl` - Optional TTL (uses default if None)
1361    ///
1362    /// # Example
1363    /// ```rust,no_run
1364    /// # use multi_tier_cache::CacheManager;
1365    /// # use std::time::Duration;
1366    /// # use bytes::Bytes;
1367    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1368    /// // Update user cache with new data
1369    /// let user_data = Bytes::from("alice");
1370    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1371    /// # Ok(())
1372    /// # }
1373    /// ```
1374    /// # Errors
1375    ///
1376    /// Returns an error if cache update fails.
1377    pub async fn update_cache(
1378        &self,
1379        key: &str,
1380        value: Bytes,
1381        ttl: Option<Duration>,
1382    ) -> CacheResult<()> {
1383        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1384
1385        // Update ALL tiers
1386        for tier in &self.tiers {
1387            if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1388                warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1389            }
1390        }
1391
1392        // Broadcast update to other instances
1393        #[cfg(feature = "redis")]
1394        if let Some(publisher) = &self.invalidation_publisher {
1395            let mut pub_lock = publisher.lock().await;
1396            let msg = InvalidationMessage::update(key, value, Some(ttl));
1397            pub_lock.publish(&msg).await?;
1398            self.invalidation_stats
1399                .messages_sent
1400                .fetch_add(1, Ordering::Relaxed);
1401        }
1402
1403        debug!("Updated '{}' across all instances", key);
1404        Ok(())
1405    }
1406
1407    /// Invalidate all keys matching a pattern
1408    ///
1409    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1410    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1411    ///
1412    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1413    ///
1414    /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1415    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1416    ///
1417    /// # Arguments
1418    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1419    ///
1420    /// # Example
1421    /// ```rust,no_run
1422    /// # use multi_tier_cache::CacheManager;
1423    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1424    /// // Invalidate all user caches
1425    /// cache_manager.invalidate_pattern("user:*").await?;
1426    ///
1427    /// // Invalidate specific user's related caches
1428    /// cache_manager.invalidate_pattern("user:123:*").await?;
1429    /// # Ok(())
1430    /// # }
1431    /// ```
1432    /// # Errors
1433    ///
1434    /// Returns an error if invalidation fails.
1435    pub async fn invalidate_pattern(&self, pattern: &str) -> CacheResult<()> {
1436        debug!(pattern = %pattern, "Invalidating pattern across all tiers");
1437
1438        // 1. Invalidate in all configured tiers
1439        for tier in &self.tiers {
1440            debug!(tier = %tier.tier_level, "Invalidating pattern in tier");
1441            tier.backend.remove_pattern(pattern).await?;
1442        }
1443
1444        // 2. Broadcast invalidation if publisher is configured
1445        #[cfg(feature = "redis")]
1446        {
1447            if let Some(publisher) = &self.invalidation_publisher {
1448                let msg = InvalidationMessage::remove_pattern(pattern);
1449                publisher.lock().await.publish(&msg).await?;
1450                debug!(pattern = %pattern, "Broadcasted pattern invalidation");
1451            }
1452        }
1453
1454        Ok(())
1455    }
1456
1457    /// Set value with automatic broadcast to all instances
1458    ///
1459    /// This is a write-through operation that updates the cache and
1460    /// broadcasts the update to all other instances automatically.
1461    ///
1462    /// # Arguments
1463    /// * `key` - Cache key
1464    /// * `value` - Value to cache
1465    /// * `strategy` - Cache strategy (determines TTL)
1466    ///
1467    /// # Example
1468    /// ```rust,no_run
1469    /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1470    /// # use bytes::Bytes;
1471    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1472    /// // Update and broadcast in one call
1473    /// let data = Bytes::from("active");
1474    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1475    /// # Ok(())
1476    /// # }
1477    /// ```
1478    /// # Errors
1479    ///
1480    /// Returns an error if cache set or broadcast fails.
1481    pub async fn set_with_broadcast(
1482        &self,
1483        key: &str,
1484        value: Bytes,
1485        strategy: CacheStrategy,
1486    ) -> CacheResult<()> {
1487        #[cfg(feature = "redis")] let ttl = strategy.to_duration();
1488
1489        // Set in local caches
1490        self.set_with_strategy(key, value.clone(), strategy).await?;
1491
1492        // Broadcast update if invalidation is enabled
1493        #[cfg(feature = "redis")]
1494        if let Some(publisher) = &self.invalidation_publisher {
1495            let mut pub_lock = publisher.lock().await;
1496            let msg = InvalidationMessage::update(key, value, Some(ttl));
1497            pub_lock.publish(&msg).await?;
1498            self.invalidation_stats
1499                .messages_sent
1500                .fetch_add(1, Ordering::Relaxed);
1501        }
1502
1503        Ok(())
1504    }
1505
1506    /// Get invalidation statistics
1507    ///
1508    /// Returns statistics about invalidation operations if invalidation is enabled.
1509    #[cfg(feature = "redis")]
1510    pub fn invalidation_stats(&self) -> Option<crate::invalidation::InvalidationStats> {
1511        #[cfg(feature = "redis")]
1512        {
1513            Some(self.invalidation_stats.snapshot())
1514        }
1515        #[cfg(not(feature = "redis"))]
1516        {
1517            None
1518        }
1519    }
1520}
1521
1522/// Cache Manager statistics
1523#[allow(dead_code)]
1524#[derive(Debug, Clone)]
1525pub struct CacheManagerStats {
1526    pub total_requests: u64,
1527    pub l1_hits: u64,
1528    pub l2_hits: u64,
1529    pub total_hits: u64,
1530    pub misses: u64,
1531    pub hit_rate: f64,
1532    pub l1_hit_rate: f64,
1533    pub promotions: usize,
1534    pub in_flight_requests: usize,
1535}