Skip to main content

multi_tier_cache/
cache_manager.rs

1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use crate::error::CacheResult;
6use dashmap::DashMap;
7use rand::Rng;
8use std::future::Future;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::time::Duration;
12#[cfg(feature = "redis")]
13use tokio::sync::Mutex;
14use tokio::sync::broadcast;
15use tracing::{debug, error, info, warn};
16
17#[cfg(feature = "moka")]
18use crate::L1Cache;
19#[cfg(feature = "redis")]
20use crate::L2Cache;
21#[cfg(feature = "redis")]
22use crate::invalidation::{
23    AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
24    InvalidationSubscriber,
25};
26use crate::serialization::{CacheSerializer, JsonSerializer};
27use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
28use bytes::Bytes;
29use futures_util::future::BoxFuture;
30
31///// Type alias for the in-flight requests map
32/// Stores a broadcast sender for each active key computation
33type InFlightMap = DashMap<String, broadcast::Sender<CacheResult<Bytes>>>;
34
35/// Cache strategies for different data types
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub enum CacheStrategy {
39    /// Real-time data - 10 seconds TTL
40    RealTime,
41    /// Short-term data - 5 minutes TTL  
42    ShortTerm,
43    /// Medium-term data - 1 hour TTL
44    MediumTerm,
45    /// Long-term data - 3 hours TTL
46    LongTerm,
47    /// Custom TTL
48    Custom(Duration),
49    /// Default strategy (5 minutes)
50    Default,
51}
52
53impl CacheStrategy {
54    /// Convert strategy to duration
55    #[must_use]
56    pub fn to_duration(&self) -> Duration {
57        match self {
58            Self::RealTime => Duration::from_secs(10),
59            Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
60            Self::MediumTerm => Duration::from_secs(3600),               // 1 hour
61            Self::LongTerm => Duration::from_secs(10800),                // 3 hours
62            Self::Custom(duration) => *duration,
63        }
64    }
65}
66
67/// Statistics for a single cache tier
68#[derive(Debug)]
69pub struct TierStats {
70    /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
71    pub tier_level: usize,
72    /// Number of cache hits at this tier
73    pub hits: AtomicU64,
74    /// Backend name for identification
75    pub backend_name: String,
76}
77
78impl Clone for TierStats {
79    fn clone(&self) -> Self {
80        Self {
81            tier_level: self.tier_level,
82            hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
83            backend_name: self.backend_name.clone(),
84        }
85    }
86}
87
88impl TierStats {
89    fn new(tier_level: usize, backend_name: String) -> Self {
90        Self {
91            tier_level,
92            hits: AtomicU64::new(0),
93            backend_name,
94        }
95    }
96
97    /// Get current hit count
98    pub fn hit_count(&self) -> u64 {
99        self.hits.load(Ordering::Relaxed)
100    }
101}
102
103/// A single cache tier in the multi-tier architecture
104#[derive(Clone)]
105pub struct CacheTier {
106    /// The backend for this tier
107    pub backend: Arc<dyn L2CacheBackend>,
108    /// Tier level (1 for fastest, increases for slower/cheaper tiers)
109    pub tier_level: usize,
110    /// Whether to promote keys FROM lower tiers TO this tier
111    pub promotion_enabled: bool,
112    /// Promotion frequency (N) - promote with 1/N probability
113    pub promotion_frequency: usize,
114    /// TTL multiplier for this tier (e.g., L2 might store for 2x L1 TTL)
115    pub ttl_scale: f64,
116    /// Statistics for this tier
117    pub stats: TierStats,
118}
119
120impl CacheTier {
121    /// Create a new cache tier
122    pub fn new(
123        backend: Arc<dyn L2CacheBackend>,
124        tier_level: usize,
125        promotion_enabled: bool,
126        promotion_frequency: usize,
127        ttl_scale: f64,
128    ) -> Self {
129        let backend_name = backend.name().to_string();
130        Self {
131            backend,
132            tier_level,
133            promotion_enabled,
134            promotion_frequency,
135            ttl_scale,
136            stats: TierStats::new(tier_level, backend_name),
137        }
138    }
139
140    /// Get value with TTL from this tier
141    async fn get_with_ttl(&self, key: &str) -> Option<(Bytes, Option<Duration>)> {
142        self.backend.get_with_ttl(key).await
143    }
144
145    /// Set value with TTL in this tier
146    async fn set_with_ttl(&self, key: &str, value: Bytes, ttl: Duration) -> CacheResult<()> {
147        let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
148        self.backend.set_with_ttl(key, value, scaled_ttl).await
149    }
150
151    /// Remove value from this tier
152    async fn remove(&self, key: &str) -> CacheResult<()> {
153        self.backend.remove(key).await
154    }
155
156    /// Record a cache hit for this tier
157    fn record_hit(&self) {
158        self.stats.hits.fetch_add(1, Ordering::Relaxed);
159    }
160}
161
162/// Configuration for a cache tier (used in builder pattern)
163#[derive(Debug, Clone)]
164pub struct TierConfig {
165    /// Tier level (1, 2, 3, 4...)
166    pub tier_level: usize,
167    /// Enable promotion to upper tiers on hit
168    pub promotion_enabled: bool,
169    /// Promotion frequency (N) - promote with 1/N probability (default 10)
170    pub promotion_frequency: usize,
171    /// TTL scale factor (1.0 = same as base TTL)
172    pub ttl_scale: f64,
173}
174
175impl TierConfig {
176    /// Create new tier configuration
177    #[must_use]
178    pub fn new(tier_level: usize) -> Self {
179        Self {
180            tier_level,
181            promotion_enabled: true,
182            promotion_frequency: 10,
183            ttl_scale: 1.0,
184        }
185    }
186
187    /// Configure as L1 (hot tier)
188    #[must_use]
189    pub fn as_l1() -> Self {
190        Self {
191            tier_level: 1,
192            promotion_enabled: false, // L1 is already top tier
193            promotion_frequency: 1,   // Doesn't matter but use 1
194            ttl_scale: 1.0,
195        }
196    }
197
198    /// Configure as L2 (warm tier)
199    #[must_use]
200    pub fn as_l2() -> Self {
201        Self {
202            tier_level: 2,
203            promotion_enabled: true,
204            promotion_frequency: 10,
205            ttl_scale: 1.0,
206        }
207    }
208
209    /// Configure as L3 (cold tier) with longer TTL
210    #[must_use]
211    pub fn as_l3() -> Self {
212        Self {
213            tier_level: 3,
214            promotion_enabled: true,
215            promotion_frequency: 10,
216            ttl_scale: 2.0, // Keep data 2x longer
217        }
218    }
219
220    /// Configure as L4 (archive tier) with much longer TTL
221    #[must_use]
222    pub fn as_l4() -> Self {
223        Self {
224            tier_level: 4,
225            promotion_enabled: true,
226            promotion_frequency: 10,
227            ttl_scale: 8.0, // Keep data 8x longer
228        }
229    }
230
231    /// Set promotion enabled
232    #[must_use]
233    pub fn with_promotion(mut self, enabled: bool) -> Self {
234        self.promotion_enabled = enabled;
235        self
236    }
237
238    /// Set promotion frequency (N)
239    #[must_use]
240    pub fn with_promotion_frequency(mut self, n: usize) -> Self {
241        self.promotion_frequency = n;
242        self
243    }
244
245    /// Set TTL scale factor
246    #[must_use]
247    pub fn with_ttl_scale(mut self, scale: f64) -> Self {
248        self.ttl_scale = scale;
249        self
250    }
251
252    /// Set tier level
253    #[must_use]
254    pub fn with_level(mut self, level: usize) -> Self {
255        self.tier_level = level;
256        self
257    }
258}
259
260pub struct CacheManager {
261    /// Ordered list of cache tiers (L1, L2, L3, ...)
262    tiers: Vec<CacheTier>,
263
264    /// Optional streaming backend
265    streaming_backend: Option<Arc<dyn StreamingBackend>>,
266    /// Statistics
267    total_requests: AtomicU64,
268    l1_hits: AtomicU64,
269    l2_hits: AtomicU64,
270    misses: AtomicU64,
271    /// In-flight requests map (Broadcaster integration will replace this in Step 4)
272    in_flight_requests: Arc<InFlightMap>,
273    /// Pluggable serializer
274    serializer: Arc<CacheSerializer>,
275    /// Invalidation publisher
276    #[cfg(feature = "redis")]
277    invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
278    /// Invalidation subscriber
279    #[cfg(feature = "redis")]
280    invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
281    /// Invalidation statistics
282    #[cfg(feature = "redis")]
283    invalidation_stats: Arc<AtomicInvalidationStats>,
284    /// Number of promotions performed
285    promotions: AtomicUsize,
286}
287
288impl CacheManager {
289    /// Create new cache manager with trait objects (pluggable backends)
290    ///
291    /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
292    ///
293    /// # Arguments
294    ///
295    /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
296    /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
297    /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
298    ///
299    /// # Example
300    ///
301    /// ```rust,ignore
302    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
303    /// use std::sync::Arc;
304    ///
305    /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
306    /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
307    ///
308    /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
309    /// ```
310    /// # Errors
311    ///
312    /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
313    pub fn new_with_backends(
314        l1_cache: Arc<dyn CacheBackend>,
315        l2_cache: Arc<dyn L2CacheBackend>,
316        streaming_backend: Option<Arc<dyn StreamingBackend>>,
317    ) -> CacheResult<Self> {
318        debug!("Initializing Cache Manager with L1+L2 backends...");
319
320        let tiers = vec![
321            CacheTier::new(Arc::new(ProxyL1ToL2(l1_cache)), 1, false, 1, 1.0),
322            CacheTier::new(l2_cache, 2, true, 10, 1.0),
323        ];
324
325        Ok(Self {
326            tiers,
327            streaming_backend,
328            total_requests: AtomicU64::new(0),
329            l1_hits: AtomicU64::new(0),
330            l2_hits: AtomicU64::new(0),
331            misses: AtomicU64::new(0),
332            promotions: AtomicUsize::new(0),
333            in_flight_requests: Arc::new(DashMap::new()),
334            serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
335            #[cfg(feature = "redis")]
336            invalidation_publisher: None,
337            #[cfg(feature = "redis")]
338            invalidation_subscriber: None,
339            #[cfg(feature = "redis")]
340            #[cfg(feature = "redis")]
341            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
342        })
343    }
344
345    /// Create new cache manager with default backends (backward compatible)
346    ///
347    /// This is the legacy constructor maintained for backward compatibility.
348    /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
349    ///
350    /// # Arguments
351    ///
352    /// * `l1_cache` - Moka L1 cache instance
353    /// * `l2_cache` - Redis L2 cache instance
354    /// # Errors
355    ///
356    /// Returns an error if Redis connection fails.
357    #[cfg(all(feature = "moka", feature = "redis"))]
358    pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> CacheResult<Self> {
359        debug!("Initializing Cache Manager...");
360
361        // Convert concrete types to trait objects
362        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
363        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
364
365        // Create RedisStreams backend for streaming functionality
366        let streaming_backend: Option<Arc<dyn StreamingBackend>> = {
367            let redis_url =
368                std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
369            let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
370            Some(Arc::new(redis_streams))
371        };
372
373        Self::new_with_backends(l1_backend, l2_backend, streaming_backend)
374    }
375
376    /// Create new cache manager with invalidation support
377    ///
378    /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
379    ///
380    /// # Arguments
381    ///
382    /// * `l1_cache` - Moka L1 cache instance
383    /// * `l2_cache` - Redis L2 cache instance
384    /// * `redis_url` - Redis connection URL for Pub/Sub
385    /// * `config` - Invalidation configuration
386    ///
387    /// # Example
388    ///
389    /// ```rust,ignore
390    /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
391    ///
392    /// let config = InvalidationConfig {
393    ///     channel: "my_app:cache:invalidate".to_string(),
394    ///     ..Default::default()
395    /// };
396    ///
397    /// let manager = CacheManager::new_with_invalidation(
398    ///     l1, l2, "redis://localhost", config
399    /// ).await?;
400    /// ```
401    /// # Errors
402    ///
403    /// Returns an error if Redis connection fails or invalidation setup fails.
404    #[cfg(all(feature = "moka", feature = "redis"))]
405    pub async fn new_with_invalidation(
406        l1_cache: Arc<L1Cache>,
407        l2_cache: Arc<L2Cache>,
408        redis_url: &str,
409        config: InvalidationConfig,
410    ) -> CacheResult<Self> {
411        debug!("Initializing Cache Manager with Invalidation...");
412        debug!("  Pub/Sub channel: {}", config.channel);
413
414        // Convert concrete types to trait objects
415        let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
416        let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
417
418        // Create RedisStreams backend for streaming functionality
419        let streaming_backend: Option<Arc<dyn StreamingBackend>> = {
420            let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
421            Some(Arc::new(redis_streams))
422        };
423
424        // Create publisher
425        let (invalidation_publisher, invalidation_subscriber) = {
426            let client = redis::Client::open(redis_url)?;
427            let conn_manager = redis::aio::ConnectionManager::new(client).await?;
428            let publisher = InvalidationPublisher::new(conn_manager, config.clone());
429
430            // Create subscriber
431            let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
432            (
433                Some(Arc::new(Mutex::new(publisher))),
434                Some(Arc::new(subscriber)),
435            )
436        };
437
438        let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
439
440        let tiers = vec![
441            CacheTier::new(Arc::new(ProxyL1ToL2(l1_backend)), 1, false, 1, 1.0),
442            CacheTier::new(l2_backend, 2, true, 10, 1.0),
443        ];
444
445        let manager = Self {
446            tiers,
447            streaming_backend,
448            total_requests: AtomicU64::new(0),
449            l1_hits: AtomicU64::new(0),
450            l2_hits: AtomicU64::new(0),
451            misses: AtomicU64::new(0),
452            promotions: AtomicUsize::new(0),
453            in_flight_requests: Arc::new(DashMap::new()),
454            serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
455            invalidation_publisher,
456            invalidation_subscriber,
457            invalidation_stats,
458        };
459
460        // Start subscriber with handler
461        manager.start_invalidation_subscriber();
462
463        info!("Cache Manager initialized with invalidation support");
464
465        Ok(manager)
466    }
467
468    /// Create new cache manager with multi-tier architecture (v0.5.0+)
469    ///
470    /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
471    /// Tiers are checked in order (lower `tier_level` = faster/hotter).
472    ///
473    /// # Arguments
474    ///
475    /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
476    /// * `streaming_backend` - Optional streaming backend
477    ///
478    /// # Example
479    ///
480    /// ```rust,ignore
481    /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
482    /// use std::sync::Arc;
483    ///
484    /// // L1 + L2 + L3 setup
485    /// let l1 = Arc::new(L1Cache::new()?);
486    /// let l2 = Arc::new(L2Cache::new().await?);
487    /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
488    ///
489    /// let tiers = vec![
490    ///     CacheTier::new(l1, 1, false, 1.0),  // L1 - no promotion
491    ///     CacheTier::new(l2, 2, true, 1.0),   // L2 - promote to L1
492    ///     CacheTier::new(l3, 3, true, 2.0),   // L3 - promote to L2&L1, 2x TTL
493    /// ];
494    ///
495    /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
496    /// ```
497    /// # Errors
498    ///
499    /// Returns an error if tiers are not sorted by level or if no tiers are provided.
500    pub fn new_with_tiers(
501        tiers: Vec<CacheTier>,
502        streaming_backend: Option<Arc<dyn StreamingBackend>>,
503    ) -> CacheResult<Self> {
504        debug!("Initializing Multi-Tier Cache Manager...");
505        debug!("  Tier count: {}", tiers.len());
506        for tier in &tiers {
507            debug!(
508                "  L{}: {} (promotion={}, ttl_scale={})",
509                tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
510            );
511        }
512
513        // Validate tiers are sorted by level
514        for i in 1..tiers.len() {
515            if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1))
516                && current.tier_level <= prev.tier_level
517            {
518                return Err(crate::error::CacheError::ConfigError(format!(
519                    "Tiers must be sorted by tier_level ascending (found L{} after L{})",
520                    current.tier_level, prev.tier_level
521                )));
522            }
523        }
524
525        Ok(Self {
526            tiers,
527            streaming_backend,
528            total_requests: AtomicU64::new(0),
529            l1_hits: AtomicU64::new(0),
530            l2_hits: AtomicU64::new(0),
531            misses: AtomicU64::new(0),
532            promotions: AtomicUsize::new(0),
533            in_flight_requests: Arc::new(DashMap::new()),
534            serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
535            #[cfg(feature = "redis")]
536            invalidation_publisher: None,
537            #[cfg(feature = "redis")]
538            invalidation_subscriber: None,
539            #[cfg(feature = "redis")]
540            #[cfg(feature = "redis")]
541            invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
542        })
543    }
544
545    /// Set a custom serializer for the cache manager
546    pub fn set_serializer(&mut self, serializer: CacheSerializer) {
547        debug!(name = %serializer.name(), "Switching cache serializer");
548        self.serializer = Arc::new(serializer);
549    }
550
551    /// Start the invalidation subscriber background task
552    #[cfg(feature = "redis")]
553    fn start_invalidation_subscriber(&self) {
554        #[cfg(feature = "redis")]
555        if let Some(subscriber) = &self.invalidation_subscriber {
556            let tiers = self.tiers.clone();
557
558            subscriber.start(move |msg: crate::invalidation::InvalidationMessage| {
559                let tiers = tiers.clone();
560                async move {
561                    for tier in &tiers {
562                        match &msg {
563                            InvalidationMessage::Remove { key } => {
564                                tier.backend.remove(key).await.ok();
565                            }
566                            InvalidationMessage::Update {
567                                key,
568                                value,
569                                ttl_secs,
570                            } => {
571                                if let Some(secs) = ttl_secs {
572                                    tier.backend
573                                        .set_with_ttl(
574                                            key,
575                                            value.clone(),
576                                            Duration::from_secs(*secs),
577                                        )
578                                        .await
579                                        .ok();
580                                } else {
581                                    tier.backend.set(key, value.clone()).await.ok();
582                                }
583                            }
584                            InvalidationMessage::RemovePattern { pattern } => {
585                                if let Err(e) = tier.backend.remove_pattern(pattern).await {
586                                    warn!(
587                                        "Failed to remove pattern '{}' from L{}: {}",
588                                        pattern, tier.tier_level, e
589                                    );
590                                }
591                            }
592                            InvalidationMessage::RemoveBulk { keys } => {
593                                for key in keys {
594                                    if let Err(e) = tier.backend.remove(key).await {
595                                        warn!(
596                                            "Failed to remove '{}' from L{}: {}",
597                                            key, tier.tier_level, e
598                                        );
599                                    }
600                                }
601                            }
602                        }
603                    }
604                    Ok(())
605                }
606            });
607
608            info!("Invalidation subscriber started across all tiers");
609        }
610    }
611
612    /// Get value from cache using multi-tier architecture (v0.5.0+)
613    ///
614    /// This method iterates through all configured tiers and automatically promotes
615    /// to upper tiers on cache hit.
616    async fn get_multi_tier(&self, key: &str) -> CacheResult<Option<Bytes>> {
617        // Try each tier sequentially (sorted by tier_level)
618        for (tier_index, tier) in self.tiers.iter().enumerate() {
619            if let Some((value, ttl)) = tier.get_with_ttl(key).await {
620                // Cache hit!
621                tier.record_hit();
622
623                // Promote to all upper tiers (if promotion enabled)
624                if tier.promotion_enabled && tier_index > 0 {
625                    // Probabilistic Promotion Check
626                    let should_promote = if tier.promotion_frequency <= 1 {
627                        true
628                    } else {
629                        rand::thread_rng().gen_ratio(
630                            1,
631                            u32::try_from(tier.promotion_frequency).unwrap_or(u32::MAX),
632                        )
633                    };
634
635                    if should_promote {
636                        let promotion_ttl =
637                            ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
638
639                        // Promote to all tiers above this one
640                        for upper_tier in self.tiers.iter().take(tier_index).rev() {
641                            if let Err(e) = upper_tier
642                                .set_with_ttl(key, value.clone(), promotion_ttl)
643                                .await
644                            {
645                                warn!(
646                                    "Failed to promote '{}' from L{} to L{}: {}",
647                                    key, tier.tier_level, upper_tier.tier_level, e
648                                );
649                            } else {
650                                self.promotions.fetch_add(1, Ordering::Relaxed);
651                                debug!(
652                                    "Promoted '{}' from L{} to L{} (TTL: {:?})",
653                                    key, tier.tier_level, upper_tier.tier_level, promotion_ttl
654                                );
655                            }
656                        }
657                    } else {
658                        debug!(
659                            "Probabilistic skip promotion for '{}' from L{} (N={})",
660                            key, tier.tier_level, tier.promotion_frequency
661                        );
662                    }
663                }
664
665                return Ok(Some(value));
666            }
667        }
668
669        // Cache miss across all tiers
670        Ok(None)
671    }
672
673    /// Get value from cache (L1 first, then L2 fallback with promotion)
674    ///
675    /// This method now includes built-in Cache Stampede protection when cache misses occur.
676    /// Multiple concurrent requests for the same missing key will be coalesced to prevent
677    /// unnecessary duplicate work on external data sources.
678    ///
679    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
680    ///
681    /// # Arguments
682    /// * `key` - Cache key to retrieve
683    ///
684    /// # Returns
685    /// * `Ok(Some(value))` - Cache hit, value found in any tier
686    /// * `Ok(None)` - Cache miss, value not found in any cache
687    /// * `Err(error)` - Cache operation failed
688    /// # Errors
689    ///
690    /// Returns an error if cache operation fails.
691    ///
692    /// # Panics
693    ///
694    /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
695    pub async fn get(&self, key: &str) -> CacheResult<Option<Bytes>> {
696        self.total_requests.fetch_add(1, Ordering::Relaxed);
697
698        // Fast path for L1 (first tier) - no locking needed
699        if let Some(tier1) = self.tiers.first()
700            && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
701        {
702            tier1.record_hit();
703            // Update legacy stats for backward compatibility
704            self.l1_hits.fetch_add(1, Ordering::Relaxed);
705            return Ok(Some(value));
706        }
707
708        // L1 miss - use stampede protection for lower tiers
709        let key_owned = key.to_string();
710        let lock_guard = self
711            .in_flight_requests
712            .entry(key_owned.clone())
713            .or_insert_with(|| broadcast::Sender::new(1))
714            .clone();
715
716        let mut rx = lock_guard.subscribe();
717
718        // If there are other receivers, someone else is computing, wait for it
719        if lock_guard.receiver_count() > 1 {
720            match rx.recv().await {
721                Ok(Ok(value)) => return Ok(Some(value)),
722                Ok(Err(e)) => {
723                    return Err(crate::error::CacheError::BackendError(format!(
724                        "Computation failed in another thread: {e}"
725                    )));
726                }
727                Err(_) => {} // Fall through to re-compute if sender dropped or channel empty
728            }
729        }
730
731        // Double-check L1 after acquiring lock (or if we are the first to compute)
732        if let Some(tier1) = self.tiers.first()
733            && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
734        {
735            tier1.record_hit();
736            self.l1_hits.fetch_add(1, Ordering::Relaxed);
737            let _ = lock_guard.send(Ok(value.clone())); // Notify any waiting subscribers
738            return Ok(Some(value));
739        }
740
741        // Check remaining tiers with promotion
742        let result = self.get_multi_tier(key).await?;
743
744        if let Some(val) = result.clone() {
745            // Hit in L2+ tier - update legacy stats
746            if self.tiers.len() >= 2 {
747                self.l2_hits.fetch_add(1, Ordering::Relaxed);
748            }
749
750            // Notify any waiting subscribers
751            let _ = lock_guard.send(Ok(val.clone()));
752
753            // Remove the in-flight entry after computation/retrieval
754            self.in_flight_requests.remove(key);
755
756            Ok(Some(val))
757        } else {
758            self.misses.fetch_add(1, Ordering::Relaxed);
759
760            // Remove the in-flight entry after computation/retrieval
761            self.in_flight_requests.remove(key);
762
763            Ok(None)
764        }
765    }
766
767    /// Get a value from cache and deserialize it (Type-Safe Version)
768    ///
769    /// # Errors
770    ///
771    /// Returns a `SerializationError` if deserialization fails, or a `BackendError` if the cache retrieval fails.
772    pub async fn get_typed<T>(&self, key: &str) -> CacheResult<Option<T>>
773    where
774        T: serde::de::DeserializeOwned,
775    {
776        if let Some(bytes) = self.get(key).await? {
777            return Ok(Some(self.serializer.deserialize::<T>(&bytes)?));
778        }
779        Ok(None)
780    }
781
782    /// Set value with specific cache strategy (all tiers)
783    ///
784    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
785    /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
786    /// # Errors
787    ///
788    /// Returns an error if cache set operation fails.
789    pub async fn set_with_strategy(
790        &self,
791        key: &str,
792        value: Bytes,
793        strategy: CacheStrategy,
794    ) -> CacheResult<()> {
795        let ttl = strategy.to_duration();
796
797        let mut success_count = 0;
798        let mut last_error = None;
799
800        for tier in &self.tiers {
801            match tier.set_with_ttl(key, value.clone(), ttl).await {
802                Ok(()) => {
803                    success_count += 1;
804                }
805                Err(e) => {
806                    error!(
807                        "L{} cache set failed for key '{}': {}",
808                        tier.tier_level, key, e
809                    );
810                    last_error = Some(e);
811                }
812            }
813        }
814
815        if success_count > 0 {
816            debug!(
817                "[Cache] Stored '{}' in {}/{} tiers (base TTL: {:?})",
818                key,
819                success_count,
820                self.tiers.len(),
821                ttl
822            );
823            return Ok(());
824        }
825
826        Err(last_error.unwrap_or_else(|| {
827            crate::error::CacheError::InternalError("All tiers failed".to_string())
828        }))
829    }
830
831    /// Get or compute value with Cache Stampede protection across L1+L2+Compute
832    ///
833    /// This method provides comprehensive Cache Stampede protection:
834    /// 1. Check L1 cache first (uses Moka's built-in coalescing)
835    /// 2. Check L2 cache with mutex-based coalescing
836    /// 3. Compute fresh data with protection against concurrent computations
837    ///
838    /// # Arguments
839    /// * `key` - Cache key
840    /// * `strategy` - Cache strategy for TTL and storage behavior
841    /// * `compute_fn` - Async function to compute the value if not in any cache
842    ///
843    /// # Example
844    /// ```ignore
845    /// let api_data = cache_manager.get_or_compute_with(
846    ///     "api_response",
847    ///     CacheStrategy::RealTime,
848    ///     || async {
849    ///         fetch_data_from_api().await
850    ///     }
851    /// ).await?;
852    /// ```
853    #[allow(dead_code)]
854    /// # Errors
855    ///
856    /// Returns an error if compute function fails or cache operations fail.
857    pub async fn get_or_compute_with<F, Fut>(
858        &self,
859        key: &str,
860        strategy: CacheStrategy,
861        compute_fn: F,
862    ) -> CacheResult<Bytes>
863    where
864        F: FnOnce() -> Fut + Send,
865        Fut: Future<Output = CacheResult<Bytes>> + Send,
866    {
867        self.total_requests.fetch_add(1, Ordering::Relaxed);
868
869        // 1. Try tiers sequentially first
870        for (idx, tier) in self.tiers.iter().enumerate() {
871            if let Some((value, _ttl)) = tier.get_with_ttl(key).await {
872                tier.record_hit();
873                if tier.tier_level == 1 {
874                    self.l1_hits.fetch_add(1, Ordering::Relaxed);
875                } else if tier.tier_level == 2 {
876                    self.l2_hits.fetch_add(1, Ordering::Relaxed);
877                }
878
879                // Promotion to L1 if hit was in a lower tier
880                if idx > 0 && tier.promotion_enabled {
881                    // Probabilistic Promotion Check
882                    let should_promote = if tier.promotion_frequency <= 1 {
883                        true
884                    } else {
885                        rand::thread_rng().gen_ratio(
886                            1,
887                            u32::try_from(tier.promotion_frequency).unwrap_or(u32::MAX),
888                        )
889                    };
890
891                    if should_promote && let Some(l1_tier) = self.tiers.first() {
892                        let _ = l1_tier
893                            .set_with_ttl(key, value.clone(), strategy.to_duration())
894                            .await;
895                        self.promotions.fetch_add(1, Ordering::Relaxed);
896                    }
897                }
898                return Ok(value);
899            }
900        }
901
902        // 2. Cache miss across all tiers - use stampede protection
903        let (tx, mut rx): (
904            tokio::sync::broadcast::Sender<CacheResult<Bytes>>,
905            tokio::sync::broadcast::Receiver<CacheResult<Bytes>>,
906        ) = match self.in_flight_requests.entry(key.to_string()) {
907            dashmap::mapref::entry::Entry::Occupied(entry) => {
908                let tx = entry.get().clone();
909                (tx.clone(), tx.subscribe())
910            }
911            dashmap::mapref::entry::Entry::Vacant(entry) => {
912                let (tx, _) = tokio::sync::broadcast::channel(1);
913                entry.insert(tx.clone());
914                (tx.clone(), tx.subscribe())
915            }
916        };
917
918        if tx.receiver_count() > 1 {
919            // Someone else is computing, wait for it
920            match rx.recv().await {
921                Ok(Ok(value)) => return Ok(value),
922                Ok(Err(e)) => {
923                    return Err(crate::error::CacheError::BackendError(format!(
924                        "Computation failed in another thread: {e}"
925                    )));
926                }
927                Err(_) => {} // Fall through to re-compute
928            }
929        }
930
931        // 3. Re-check cache after receiving/creating broadcaster (double-check pattern)
932        for tier in &self.tiers {
933            if let Some((value, _)) = tier.get_with_ttl(key).await {
934                let _ = tx.send(Ok(value.clone()));
935                return Ok(value);
936            }
937        }
938
939        // 4. Miss - compute fresh data
940        debug!(
941            "Computing fresh data for key: '{}' (Stampede protected)",
942            key
943        );
944
945        let result = compute_fn().await;
946
947        // Remove from in_flight BEFORE broadcasting
948        self.in_flight_requests.remove(key);
949
950        match &result {
951            Ok(value) => {
952                let _ = self.set_with_strategy(key, value.clone(), strategy).await;
953                let _ = tx.send(Ok(value.clone()));
954            }
955            Err(e) => {
956                let _ = tx.send(Err(e.clone()));
957            }
958        }
959
960        result
961    }
962
963    /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
964    ///
965    /// This method provides the same functionality as `get_or_compute_with()` but with
966    /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
967    /// API calls, or any computation that returns structured data.
968    ///
969    /// # Type Safety
970    ///
971    /// - Returns your actual type `T` instead of `serde_json::Value`
972    /// - Compiler enforces Serialize + `DeserializeOwned` bounds
973    /// - No manual JSON conversion needed
974    ///
975    /// # Cache Flow
976    ///
977    /// 1. Check L1 cache → deserialize if found
978    /// 2. Check L2 cache → deserialize + promote to L1 if found
979    /// 3. Execute `compute_fn` → serialize → store in L1+L2
980    /// 4. Full stampede protection (only ONE request computes)
981    ///
982    /// # Arguments
983    ///
984    /// * `key` - Cache key
985    /// * `strategy` - Cache strategy for TTL
986    /// * `compute_fn` - Async function returning `Result<T>`
987    ///
988    /// # Example - Database Query
989    ///
990    /// ```no_run
991    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
992    /// # use std::sync::Arc;
993    /// # use serde::{Serialize, Deserialize};
994    /// # async fn example() -> anyhow::Result<()> {
995    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
996    /// # let l2 = Arc::new(L2Cache::new().await?);
997    /// # let cache_manager = CacheManager::new(l1, l2);
998    ///
999    /// #[derive(Serialize, Deserialize)]
1000    /// struct User {
1001    ///     id: i64,
1002    ///     name: String,
1003    /// }
1004    ///
1005    /// // Type-safe database caching (example - requires sqlx)
1006    /// // let user: User = cache_manager.get_or_compute_typed(
1007    /// //     "user:123",
1008    /// //     CacheStrategy::MediumTerm,
1009    /// //     || async {
1010    /// //         sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1011    /// //             .bind(123)
1012    /// //             .fetch_one(&pool)
1013    /// //             .await
1014    /// //     }
1015    /// // ).await?;
1016    /// # Ok(())
1017    /// # }
1018    /// ```
1019    ///
1020    /// # Example - API Call
1021    ///
1022    /// ```no_run
1023    /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1024    /// # use std::sync::Arc;
1025    /// # use serde::{Serialize, Deserialize};
1026    /// # async fn example() -> anyhow::Result<()> {
1027    /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1028    /// # let l2 = Arc::new(L2Cache::new().await?);
1029    /// # let cache_manager = CacheManager::new(l1, l2);
1030    /// #[derive(Serialize, Deserialize)]
1031    /// struct ApiResponse {
1032    ///     data: String,
1033    ///     timestamp: i64,
1034    /// }
1035    ///
1036    /// // API call caching (example - requires reqwest)
1037    /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1038    /// //     "api:endpoint",
1039    /// //     CacheStrategy::RealTime,
1040    /// //     || async {
1041    /// //         reqwest::get("https://api.example.com/data")
1042    /// //             .await?
1043    /// //             .json::<ApiResponse>()
1044    /// //             .await
1045    /// //     }
1046    /// // ).await?;
1047    /// # Ok(())
1048    /// # }
1049    /// ```
1050    ///
1051    /// # Performance
1052    ///
1053    /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1054    /// - L2 hit: 2-5ms + deserialization + L1 promotion
1055    /// - Compute: Your function time + serialization + L1+L2 storage
1056    /// - Stampede protection: 99.6% latency reduction under high concurrency
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns error if:
1061    /// - Compute function fails
1062    /// - Serialization fails (invalid type for JSON)
1063    /// - Deserialization fails (cache data doesn't match type T)
1064    /// - Cache operations fail (Redis connection issues)
1065    #[allow(clippy::too_many_lines)]
1066    pub async fn get_or_compute_typed<T, F, Fut>(
1067        &self,
1068        key: &str,
1069        strategy: CacheStrategy,
1070        compute_fn: F,
1071    ) -> CacheResult<T>
1072    where
1073        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1074        F: FnOnce() -> Fut + Send,
1075        Fut: Future<Output = CacheResult<T>> + Send,
1076    {
1077        // 1. Try to get typed from cache first
1078        if let Some(value) = self.get_typed::<T>(key).await? {
1079            return Ok(value);
1080        }
1081
1082        // 2. Use get_or_compute_with to handle stampede protection
1083        let serializer = self.serializer.clone();
1084        let bytes_result = self
1085            .get_or_compute_with(key, strategy, || async move {
1086                let val = compute_fn().await?;
1087                serializer.serialize(&val)
1088            })
1089            .await?;
1090
1091        // 3. Deserialize result
1092        self.serializer.deserialize::<T>(&bytes_result)
1093    }
1094
1095    /// Get comprehensive cache statistics
1096    ///
1097    /// In multi-tier mode, aggregates statistics from all tiers.
1098    /// In legacy mode, returns L1 and L2 stats.
1099    #[allow(dead_code)]
1100    pub fn get_stats(&self) -> CacheManagerStats {
1101        let total_reqs = self.total_requests.load(Ordering::Relaxed);
1102        let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1103        let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1104        let misses = self.misses.load(Ordering::Relaxed);
1105
1106        CacheManagerStats {
1107            total_requests: total_reqs,
1108            l1_hits,
1109            l2_hits,
1110            total_hits: l1_hits + l2_hits,
1111            misses,
1112            hit_rate: if total_reqs > 0 {
1113                #[allow(clippy::cast_precision_loss)]
1114                {
1115                    ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1116                }
1117            } else {
1118                0.0
1119            },
1120            l1_hit_rate: if total_reqs > 0 {
1121                #[allow(clippy::cast_precision_loss)]
1122                {
1123                    (l1_hits as f64 / total_reqs as f64) * 100.0
1124                }
1125            } else {
1126                0.0
1127            },
1128            promotions: self.promotions.load(Ordering::Relaxed),
1129            in_flight_requests: self.in_flight_requests.len(),
1130        }
1131    }
1132
1133    /// Get per-tier statistics (v0.5.0+)
1134    ///
1135    /// Returns statistics for each tier if multi-tier mode is enabled.
1136    /// Returns None if using legacy 2-tier mode.
1137    ///
1138    /// # Example
1139    /// ```rust,ignore
1140    /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1141    ///     for stats in tier_stats {
1142    ///         println!("L{}: {} hits ({})",
1143    ///                  stats.tier_level,
1144    ///                  stats.hit_count(),
1145    ///                  stats.backend_name);
1146    ///     }
1147    /// }
1148    /// ```
1149    pub fn get_tier_stats(&self) -> Vec<TierStats> {
1150        self.tiers.iter().map(|tier| tier.stats.clone()).collect()
1151    }
1152}
1153
1154/// Proxy wrapper to allow using `CacheBackend` where `DynL2CacheBackend` is expected
1155/// (Internal helper for `new_with_backends` to wrap L1 `CacheBackend` into `DynL2CacheBackend`)
1156struct ProxyL1ToL2(Arc<dyn CacheBackend>);
1157
1158impl CacheBackend for ProxyL1ToL2 {
1159    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
1160        self.0.get(key)
1161    }
1162
1163    fn set_with_ttl<'a>(
1164        &'a self,
1165        key: &'a str,
1166        value: Bytes,
1167        ttl: Duration,
1168    ) -> BoxFuture<'a, CacheResult<()>> {
1169        self.0.set_with_ttl(key, value, ttl)
1170    }
1171
1172    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
1173        self.0.remove(key)
1174    }
1175
1176    fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
1177        self.0.remove_pattern(pattern)
1178    }
1179
1180    fn health_check(&self) -> BoxFuture<'_, bool> {
1181        self.0.health_check()
1182    }
1183
1184    fn name(&self) -> &'static str {
1185        self.0.name()
1186    }
1187}
1188
1189impl L2CacheBackend for ProxyL1ToL2 {
1190    fn get_with_ttl<'a>(
1191        &'a self,
1192        key: &'a str,
1193    ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
1194        Box::pin(async move { self.0.get(key).await.map(|v| (v, None)) })
1195    }
1196}
1197
1198impl CacheManager {
1199    // ===== Redis Streams Methods =====
1200
1201    /// Publish data to Redis Stream
1202    ///
1203    /// # Arguments
1204    /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1205    /// * `fields` - Field-value pairs to publish
1206    /// * `maxlen` - Optional max length for stream trimming
1207    ///
1208    /// # Returns
1209    /// The entry ID generated by Redis
1210    ///
1211    /// # Errors
1212    /// Returns error if streaming backend is not configured
1213    pub async fn publish_to_stream(
1214        &self,
1215        stream_key: &str,
1216        fields: Vec<(String, String)>,
1217        maxlen: Option<usize>,
1218    ) -> CacheResult<String> {
1219        match &self.streaming_backend {
1220            Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1221            None => Err(crate::error::CacheError::ConfigError(
1222                "Streaming backend not configured".to_string(),
1223            )),
1224        }
1225    }
1226
1227    /// Read latest entries from Redis Stream
1228    ///
1229    /// # Arguments
1230    /// * `stream_key` - Name of the stream
1231    /// * `count` - Number of latest entries to retrieve
1232    ///
1233    /// # Returns
1234    /// Vector of (`entry_id`, fields) tuples (newest first)
1235    ///
1236    /// # Errors
1237    /// Returns error if streaming backend is not configured
1238    pub async fn read_stream_latest(
1239        &self,
1240        stream_key: &str,
1241        count: usize,
1242    ) -> CacheResult<Vec<(String, Vec<(String, String)>)>> {
1243        match &self.streaming_backend {
1244            Some(backend) => backend
1245                .stream_read_latest(stream_key, count)
1246                .await,
1247            None => Err(crate::error::CacheError::ConfigError(
1248                "Streaming backend not configured".to_string(),
1249            )),
1250        }
1251    }
1252
1253    /// Read from Redis Stream with optional blocking
1254    ///
1255    /// # Arguments
1256    /// * `stream_key` - Name of the stream
1257    /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1258    /// * `count` - Max entries to retrieve
1259    /// * `block_ms` - Optional blocking timeout in ms
1260    ///
1261    /// # Returns
1262    /// Vector of (`entry_id`, fields) tuples
1263    ///
1264    /// # Errors
1265    /// Returns error if streaming backend is not configured
1266    pub async fn read_stream(
1267        &self,
1268        stream_key: &str,
1269        last_id: &str,
1270        count: usize,
1271        block_ms: Option<usize>,
1272    ) -> CacheResult<Vec<(String, Vec<(String, String)>)>> {
1273        match &self.streaming_backend {
1274            Some(backend) => {
1275                backend
1276                    .stream_read(stream_key, last_id, count, block_ms)
1277                    .await
1278            }
1279            None => Err(crate::error::CacheError::ConfigError(
1280                "Streaming backend not configured".to_string(),
1281            )),
1282        }
1283    }
1284
1285    // ===== Cache Invalidation Methods =====
1286
1287    /// Invalidate a cache key across all instances
1288    ///
1289    /// This removes the key from all cache tiers and broadcasts
1290    /// the invalidation to all other cache instances via Redis Pub/Sub.
1291    ///
1292    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1293    ///
1294    /// # Arguments
1295    /// * `key` - Cache key to invalidate
1296    ///
1297    /// # Example
1298    /// ```rust,no_run
1299    /// # use multi_tier_cache::CacheManager;
1300    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1301    /// // Invalidate user cache after profile update
1302    /// cache_manager.invalidate("user:123").await?;
1303    /// # Ok(())
1304    /// # }
1305    /// ```
1306    /// # Errors
1307    ///
1308    /// Returns an error if invalidation fails.
1309    pub async fn invalidate(&self, key: &str) -> CacheResult<()> {
1310        // Remove from ALL tiers
1311        for tier in &self.tiers {
1312            if let Err(e) = tier.remove(key).await {
1313                warn!(
1314                    "Failed to remove '{}' from L{}: {}",
1315                    key, tier.tier_level, e
1316                );
1317            }
1318        }
1319
1320        // Broadcast to other instances
1321        #[cfg(feature = "redis")]
1322        {
1323            if let Some(publisher) = &self.invalidation_publisher {
1324                let mut pub_lock: tokio::sync::MutexGuard<
1325                    '_,
1326                    crate::invalidation::InvalidationPublisher,
1327                > = publisher.lock().await;
1328                let msg = InvalidationMessage::remove(key);
1329                pub_lock.publish(&msg).await?;
1330                self.invalidation_stats
1331                    .messages_sent
1332                    .fetch_add(1, Ordering::Relaxed);
1333            }
1334        }
1335
1336        debug!("Invalidated '{}' across all instances", key);
1337        Ok(())
1338    }
1339
1340    /// Update cache value across all instances
1341    ///
1342    /// This updates the key in all cache tiers and broadcasts
1343    /// the update to all other cache instances, avoiding cache misses.
1344    ///
1345    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1346    ///
1347    /// # Arguments
1348    /// * `key` - Cache key to update
1349    /// * `value` - New value
1350    /// * `ttl` - Optional TTL (uses default if None)
1351    ///
1352    /// # Example
1353    /// ```rust,no_run
1354    /// # use multi_tier_cache::CacheManager;
1355    /// # use std::time::Duration;
1356    /// # use bytes::Bytes;
1357    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1358    /// // Update user cache with new data
1359    /// let user_data = Bytes::from("alice");
1360    /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1361    /// # Ok(())
1362    /// # }
1363    /// ```
1364    /// # Errors
1365    ///
1366    /// Returns an error if cache update fails.
1367    pub async fn update_cache(
1368        &self,
1369        key: &str,
1370        value: Bytes,
1371        ttl: Option<Duration>,
1372    ) -> CacheResult<()> {
1373        let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1374
1375        // Update ALL tiers
1376        for tier in &self.tiers {
1377            if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1378                warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1379            }
1380        }
1381
1382        // Broadcast update to other instances
1383        #[cfg(feature = "redis")]
1384        if let Some(publisher) = &self.invalidation_publisher {
1385            let mut pub_lock = publisher.lock().await;
1386            let msg = InvalidationMessage::update(key, value, Some(ttl));
1387            pub_lock.publish(&msg).await?;
1388            self.invalidation_stats
1389                .messages_sent
1390                .fetch_add(1, Ordering::Relaxed);
1391        }
1392
1393        debug!("Updated '{}' across all instances", key);
1394        Ok(())
1395    }
1396
1397    /// Invalidate all keys matching a pattern
1398    ///
1399    /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1400    /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1401    ///
1402    /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1403    ///
1404    /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1405    /// In multi-tier mode, this scans from L2 but removes from all tiers.
1406    ///
1407    /// # Arguments
1408    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1409    ///
1410    /// # Example
1411    /// ```rust,no_run
1412    /// # use multi_tier_cache::CacheManager;
1413    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1414    /// // Invalidate all user caches
1415    /// cache_manager.invalidate_pattern("user:*").await?;
1416    ///
1417    /// // Invalidate specific user's related caches
1418    /// cache_manager.invalidate_pattern("user:123:*").await?;
1419    /// # Ok(())
1420    /// # }
1421    /// ```
1422    /// # Errors
1423    ///
1424    /// Returns an error if invalidation fails.
1425    pub async fn invalidate_pattern(&self, pattern: &str) -> CacheResult<()> {
1426        debug!(pattern = %pattern, "Invalidating pattern across all tiers");
1427
1428        // 1. Invalidate in all configured tiers
1429        for tier in &self.tiers {
1430            debug!(tier = %tier.tier_level, "Invalidating pattern in tier");
1431            tier.backend.remove_pattern(pattern).await?;
1432        }
1433
1434        // 2. Broadcast invalidation if publisher is configured
1435        #[cfg(feature = "redis")]
1436        {
1437            if let Some(publisher) = &self.invalidation_publisher {
1438                let msg = InvalidationMessage::remove_pattern(pattern);
1439                publisher.lock().await.publish(&msg).await?;
1440                debug!(pattern = %pattern, "Broadcasted pattern invalidation");
1441            }
1442        }
1443
1444        Ok(())
1445    }
1446
1447    /// Set value with automatic broadcast to all instances
1448    ///
1449    /// This is a write-through operation that updates the cache and
1450    /// broadcasts the update to all other instances automatically.
1451    ///
1452    /// # Arguments
1453    /// * `key` - Cache key
1454    /// * `value` - Value to cache
1455    /// * `strategy` - Cache strategy (determines TTL)
1456    ///
1457    /// # Example
1458    /// ```rust,no_run
1459    /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1460    /// # use bytes::Bytes;
1461    /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1462    /// // Update and broadcast in one call
1463    /// let data = Bytes::from("active");
1464    /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1465    /// # Ok(())
1466    /// # }
1467    /// ```
1468    /// # Errors
1469    ///
1470    /// Returns an error if cache set or broadcast fails.
1471    pub async fn set_with_broadcast(
1472        &self,
1473        key: &str,
1474        value: Bytes,
1475        strategy: CacheStrategy,
1476    ) -> CacheResult<()> {
1477        #[cfg(feature = "redis")] let ttl = strategy.to_duration();
1478
1479        // Set in local caches
1480        self.set_with_strategy(key, value.clone(), strategy).await?;
1481
1482        // Broadcast update if invalidation is enabled
1483        #[cfg(feature = "redis")]
1484        if let Some(publisher) = &self.invalidation_publisher {
1485            let mut pub_lock = publisher.lock().await;
1486            let msg = InvalidationMessage::update(key, value, Some(ttl));
1487            pub_lock.publish(&msg).await?;
1488            self.invalidation_stats
1489                .messages_sent
1490                .fetch_add(1, Ordering::Relaxed);
1491        }
1492
1493        Ok(())
1494    }
1495
1496    /// Get invalidation statistics
1497    ///
1498    /// Returns statistics about invalidation operations if invalidation is enabled.
1499    #[cfg(feature = "redis")]
1500    pub fn invalidation_stats(&self) -> Option<crate::invalidation::InvalidationStats> {
1501        #[cfg(feature = "redis")]
1502        {
1503            Some(self.invalidation_stats.snapshot())
1504        }
1505        #[cfg(not(feature = "redis"))]
1506        {
1507            None
1508        }
1509    }
1510}
1511
1512/// Cache Manager statistics
1513#[allow(dead_code)]
1514#[derive(Debug, Clone)]
1515pub struct CacheManagerStats {
1516    pub total_requests: u64,
1517    pub l1_hits: u64,
1518    pub l2_hits: u64,
1519    pub total_hits: u64,
1520    pub misses: u64,
1521    pub hit_rate: f64,
1522    pub l1_hit_rate: f64,
1523    pub promotions: usize,
1524    pub in_flight_requests: usize,
1525}