multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use crate::error::CacheResult;
6use dashmap::DashMap;
7use rand::Rng;
8use std::future::Future;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::time::Duration;
12#[cfg(feature = "redis")]
13use tokio::sync::Mutex;
14use tokio::sync::broadcast;
15use tracing::{debug, error, info, warn};
16
17#[cfg(feature = "moka")]
18use crate::L1Cache;
19#[cfg(feature = "redis")]
20use crate::L2Cache;
21#[cfg(feature = "redis")]
22use crate::invalidation::{
23 AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
24 InvalidationSubscriber,
25};
26use crate::serialization::{CacheSerializer, JsonSerializer};
27use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
28use bytes::Bytes;
29use futures_util::future::BoxFuture;
30
31///// Type alias for the in-flight requests map
32/// Stores a broadcast sender for each active key computation
33type InFlightMap = DashMap<String, broadcast::Sender<CacheResult<Bytes>>>;
34
35/// Cache strategies for different data types
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub enum CacheStrategy {
39 /// Real-time data - 10 seconds TTL
40 RealTime,
41 /// Short-term data - 5 minutes TTL
42 ShortTerm,
43 /// Medium-term data - 1 hour TTL
44 MediumTerm,
45 /// Long-term data - 3 hours TTL
46 LongTerm,
47 /// Custom TTL
48 Custom(Duration),
49 /// Default strategy (5 minutes)
50 Default,
51}
52
53impl CacheStrategy {
54 /// Convert strategy to duration
55 #[must_use]
56 pub fn to_duration(&self) -> Duration {
57 match self {
58 Self::RealTime => Duration::from_secs(10),
59 Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
60 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
61 Self::LongTerm => Duration::from_secs(10800), // 3 hours
62 Self::Custom(duration) => *duration,
63 }
64 }
65}
66
67/// Statistics for a single cache tier
68#[derive(Debug)]
69pub struct TierStats {
70 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
71 pub tier_level: usize,
72 /// Number of cache hits at this tier
73 pub hits: AtomicU64,
74 /// Backend name for identification
75 pub backend_name: String,
76}
77
78impl Clone for TierStats {
79 fn clone(&self) -> Self {
80 Self {
81 tier_level: self.tier_level,
82 hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
83 backend_name: self.backend_name.clone(),
84 }
85 }
86}
87
88impl TierStats {
89 fn new(tier_level: usize, backend_name: String) -> Self {
90 Self {
91 tier_level,
92 hits: AtomicU64::new(0),
93 backend_name,
94 }
95 }
96
97 /// Get current hit count
98 pub fn hit_count(&self) -> u64 {
99 self.hits.load(Ordering::Relaxed)
100 }
101}
102
103/// A single cache tier in the multi-tier architecture
104#[derive(Clone)]
105pub struct CacheTier {
106 /// The backend for this tier
107 pub backend: Arc<dyn L2CacheBackend>,
108 /// Tier level (1 for fastest, increases for slower/cheaper tiers)
109 pub tier_level: usize,
110 /// Whether to promote keys FROM lower tiers TO this tier
111 pub promotion_enabled: bool,
112 /// Promotion frequency (N) - promote with 1/N probability
113 pub promotion_frequency: usize,
114 /// TTL multiplier for this tier (e.g., L2 might store for 2x L1 TTL)
115 pub ttl_scale: f64,
116 /// Statistics for this tier
117 pub stats: TierStats,
118}
119
120impl CacheTier {
121 /// Create a new cache tier
122 pub fn new(
123 backend: Arc<dyn L2CacheBackend>,
124 tier_level: usize,
125 promotion_enabled: bool,
126 promotion_frequency: usize,
127 ttl_scale: f64,
128 ) -> Self {
129 let backend_name = backend.name().to_string();
130 Self {
131 backend,
132 tier_level,
133 promotion_enabled,
134 promotion_frequency,
135 ttl_scale,
136 stats: TierStats::new(tier_level, backend_name),
137 }
138 }
139
140 /// Get value with TTL from this tier
141 async fn get_with_ttl(&self, key: &str) -> Option<(Bytes, Option<Duration>)> {
142 self.backend.get_with_ttl(key).await
143 }
144
145 /// Set value with TTL in this tier
146 async fn set_with_ttl(&self, key: &str, value: Bytes, ttl: Duration) -> CacheResult<()> {
147 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
148 self.backend.set_with_ttl(key, value, scaled_ttl).await
149 }
150
151 /// Remove value from this tier
152 async fn remove(&self, key: &str) -> CacheResult<()> {
153 self.backend.remove(key).await
154 }
155
156 /// Record a cache hit for this tier
157 fn record_hit(&self) {
158 self.stats.hits.fetch_add(1, Ordering::Relaxed);
159 }
160}
161
162/// Configuration for a cache tier (used in builder pattern)
163#[derive(Debug, Clone)]
164pub struct TierConfig {
165 /// Tier level (1, 2, 3, 4...)
166 pub tier_level: usize,
167 /// Enable promotion to upper tiers on hit
168 pub promotion_enabled: bool,
169 /// Promotion frequency (N) - promote with 1/N probability (default 10)
170 pub promotion_frequency: usize,
171 /// TTL scale factor (1.0 = same as base TTL)
172 pub ttl_scale: f64,
173}
174
175impl TierConfig {
176 /// Create new tier configuration
177 #[must_use]
178 pub fn new(tier_level: usize) -> Self {
179 Self {
180 tier_level,
181 promotion_enabled: true,
182 promotion_frequency: 10,
183 ttl_scale: 1.0,
184 }
185 }
186
187 /// Configure as L1 (hot tier)
188 #[must_use]
189 pub fn as_l1() -> Self {
190 Self {
191 tier_level: 1,
192 promotion_enabled: false, // L1 is already top tier
193 promotion_frequency: 1, // Doesn't matter but use 1
194 ttl_scale: 1.0,
195 }
196 }
197
198 /// Configure as L2 (warm tier)
199 #[must_use]
200 pub fn as_l2() -> Self {
201 Self {
202 tier_level: 2,
203 promotion_enabled: true,
204 promotion_frequency: 10,
205 ttl_scale: 1.0,
206 }
207 }
208
209 /// Configure as L3 (cold tier) with longer TTL
210 #[must_use]
211 pub fn as_l3() -> Self {
212 Self {
213 tier_level: 3,
214 promotion_enabled: true,
215 promotion_frequency: 10,
216 ttl_scale: 2.0, // Keep data 2x longer
217 }
218 }
219
220 /// Configure as L4 (archive tier) with much longer TTL
221 #[must_use]
222 pub fn as_l4() -> Self {
223 Self {
224 tier_level: 4,
225 promotion_enabled: true,
226 promotion_frequency: 10,
227 ttl_scale: 8.0, // Keep data 8x longer
228 }
229 }
230
231 /// Set promotion enabled
232 #[must_use]
233 pub fn with_promotion(mut self, enabled: bool) -> Self {
234 self.promotion_enabled = enabled;
235 self
236 }
237
238 /// Set promotion frequency (N)
239 #[must_use]
240 pub fn with_promotion_frequency(mut self, n: usize) -> Self {
241 self.promotion_frequency = n;
242 self
243 }
244
245 /// Set TTL scale factor
246 #[must_use]
247 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
248 self.ttl_scale = scale;
249 self
250 }
251
252 /// Set tier level
253 #[must_use]
254 pub fn with_level(mut self, level: usize) -> Self {
255 self.tier_level = level;
256 self
257 }
258}
259
260pub struct CacheManager {
261 /// Ordered list of cache tiers (L1, L2, L3, ...)
262 tiers: Vec<CacheTier>,
263
264 /// Optional streaming backend
265 streaming_backend: Option<Arc<dyn StreamingBackend>>,
266 /// Statistics
267 total_requests: AtomicU64,
268 l1_hits: AtomicU64,
269 l2_hits: AtomicU64,
270 misses: AtomicU64,
271 /// In-flight requests map (Broadcaster integration will replace this in Step 4)
272 in_flight_requests: Arc<InFlightMap>,
273 /// Pluggable serializer
274 serializer: Arc<CacheSerializer>,
275 /// Invalidation publisher
276 #[cfg(feature = "redis")]
277 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
278 /// Invalidation subscriber
279 #[cfg(feature = "redis")]
280 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
281 /// Invalidation statistics
282 #[cfg(feature = "redis")]
283 invalidation_stats: Arc<AtomicInvalidationStats>,
284 /// Number of promotions performed
285 promotions: AtomicUsize,
286}
287
288impl CacheManager {
289 /// Create new cache manager with trait objects (pluggable backends)
290 ///
291 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
292 ///
293 /// # Arguments
294 ///
295 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
296 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
297 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
298 ///
299 /// # Example
300 ///
301 /// ```rust,ignore
302 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
303 /// use std::sync::Arc;
304 ///
305 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
306 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
307 ///
308 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
309 /// ```
310 /// # Errors
311 ///
312 /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
313 pub fn new_with_backends(
314 l1_cache: Arc<dyn CacheBackend>,
315 l2_cache: Arc<dyn L2CacheBackend>,
316 streaming_backend: Option<Arc<dyn StreamingBackend>>,
317 ) -> CacheResult<Self> {
318 debug!("Initializing Cache Manager with L1+L2 backends...");
319
320 let tiers = vec![
321 CacheTier::new(Arc::new(ProxyL1ToL2(l1_cache)), 1, false, 1, 1.0),
322 CacheTier::new(l2_cache, 2, true, 10, 1.0),
323 ];
324
325 Ok(Self {
326 tiers,
327 streaming_backend,
328 total_requests: AtomicU64::new(0),
329 l1_hits: AtomicU64::new(0),
330 l2_hits: AtomicU64::new(0),
331 misses: AtomicU64::new(0),
332 promotions: AtomicUsize::new(0),
333 in_flight_requests: Arc::new(DashMap::new()),
334 serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
335 #[cfg(feature = "redis")]
336 invalidation_publisher: None,
337 #[cfg(feature = "redis")]
338 invalidation_subscriber: None,
339 #[cfg(feature = "redis")]
340 #[cfg(feature = "redis")]
341 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
342 })
343 }
344
345 /// Create new cache manager with default backends (backward compatible)
346 ///
347 /// This is the legacy constructor maintained for backward compatibility.
348 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
349 ///
350 /// # Arguments
351 ///
352 /// * `l1_cache` - Moka L1 cache instance
353 /// * `l2_cache` - Redis L2 cache instance
354 /// # Errors
355 ///
356 /// Returns an error if Redis connection fails.
357 #[cfg(all(feature = "moka", feature = "redis"))]
358 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> CacheResult<Self> {
359 debug!("Initializing Cache Manager...");
360
361 // Convert concrete types to trait objects
362 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
363 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
364
365 // Create RedisStreams backend for streaming functionality
366 let streaming_backend: Option<Arc<dyn StreamingBackend>> = {
367 let redis_url =
368 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
369 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
370 Some(Arc::new(redis_streams))
371 };
372
373 Self::new_with_backends(l1_backend, l2_backend, streaming_backend)
374 }
375
376 /// Create new cache manager with invalidation support
377 ///
378 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
379 ///
380 /// # Arguments
381 ///
382 /// * `l1_cache` - Moka L1 cache instance
383 /// * `l2_cache` - Redis L2 cache instance
384 /// * `redis_url` - Redis connection URL for Pub/Sub
385 /// * `config` - Invalidation configuration
386 ///
387 /// # Example
388 ///
389 /// ```rust,ignore
390 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
391 ///
392 /// let config = InvalidationConfig {
393 /// channel: "my_app:cache:invalidate".to_string(),
394 /// ..Default::default()
395 /// };
396 ///
397 /// let manager = CacheManager::new_with_invalidation(
398 /// l1, l2, "redis://localhost", config
399 /// ).await?;
400 /// ```
401 /// # Errors
402 ///
403 /// Returns an error if Redis connection fails or invalidation setup fails.
404 #[cfg(all(feature = "moka", feature = "redis"))]
405 pub async fn new_with_invalidation(
406 l1_cache: Arc<L1Cache>,
407 l2_cache: Arc<L2Cache>,
408 redis_url: &str,
409 config: InvalidationConfig,
410 ) -> CacheResult<Self> {
411 debug!("Initializing Cache Manager with Invalidation...");
412 debug!(" Pub/Sub channel: {}", config.channel);
413
414 // Convert concrete types to trait objects
415 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
416 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
417
418 // Create RedisStreams backend for streaming functionality
419 let streaming_backend: Option<Arc<dyn StreamingBackend>> = {
420 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
421 Some(Arc::new(redis_streams))
422 };
423
424 // Create publisher
425 let (invalidation_publisher, invalidation_subscriber) = {
426 let client = redis::Client::open(redis_url)?;
427 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
428 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
429
430 // Create subscriber
431 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
432 (
433 Some(Arc::new(Mutex::new(publisher))),
434 Some(Arc::new(subscriber)),
435 )
436 };
437
438 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
439
440 let tiers = vec![
441 CacheTier::new(Arc::new(ProxyL1ToL2(l1_backend)), 1, false, 1, 1.0),
442 CacheTier::new(l2_backend, 2, true, 10, 1.0),
443 ];
444
445 let manager = Self {
446 tiers,
447 streaming_backend,
448 total_requests: AtomicU64::new(0),
449 l1_hits: AtomicU64::new(0),
450 l2_hits: AtomicU64::new(0),
451 misses: AtomicU64::new(0),
452 promotions: AtomicUsize::new(0),
453 in_flight_requests: Arc::new(DashMap::new()),
454 serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
455 invalidation_publisher,
456 invalidation_subscriber,
457 invalidation_stats,
458 };
459
460 // Start subscriber with handler
461 manager.start_invalidation_subscriber();
462
463 info!("Cache Manager initialized with invalidation support");
464
465 Ok(manager)
466 }
467
468 /// Create new cache manager with multi-tier architecture (v0.5.0+)
469 ///
470 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
471 /// Tiers are checked in order (lower `tier_level` = faster/hotter).
472 ///
473 /// # Arguments
474 ///
475 /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
476 /// * `streaming_backend` - Optional streaming backend
477 ///
478 /// # Example
479 ///
480 /// ```rust,ignore
481 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
482 /// use std::sync::Arc;
483 ///
484 /// // L1 + L2 + L3 setup
485 /// let l1 = Arc::new(L1Cache::new()?);
486 /// let l2 = Arc::new(L2Cache::new().await?);
487 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
488 ///
489 /// let tiers = vec![
490 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
491 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
492 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
493 /// ];
494 ///
495 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
496 /// ```
497 /// # Errors
498 ///
499 /// Returns an error if tiers are not sorted by level or if no tiers are provided.
500 pub fn new_with_tiers(
501 tiers: Vec<CacheTier>,
502 streaming_backend: Option<Arc<dyn StreamingBackend>>,
503 ) -> CacheResult<Self> {
504 debug!("Initializing Multi-Tier Cache Manager...");
505 debug!(" Tier count: {}", tiers.len());
506 for tier in &tiers {
507 debug!(
508 " L{}: {} (promotion={}, ttl_scale={})",
509 tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
510 );
511 }
512
513 // Validate tiers are sorted by level
514 for i in 1..tiers.len() {
515 if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1))
516 && current.tier_level <= prev.tier_level
517 {
518 return Err(crate::error::CacheError::ConfigError(format!(
519 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
520 current.tier_level, prev.tier_level
521 )));
522 }
523 }
524
525 Ok(Self {
526 tiers,
527 streaming_backend,
528 total_requests: AtomicU64::new(0),
529 l1_hits: AtomicU64::new(0),
530 l2_hits: AtomicU64::new(0),
531 misses: AtomicU64::new(0),
532 promotions: AtomicUsize::new(0),
533 in_flight_requests: Arc::new(DashMap::new()),
534 serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
535 #[cfg(feature = "redis")]
536 invalidation_publisher: None,
537 #[cfg(feature = "redis")]
538 invalidation_subscriber: None,
539 #[cfg(feature = "redis")]
540 #[cfg(feature = "redis")]
541 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
542 })
543 }
544
545 /// Set a custom serializer for the cache manager
546 pub fn set_serializer(&mut self, serializer: CacheSerializer) {
547 debug!(name = %serializer.name(), "Switching cache serializer");
548 self.serializer = Arc::new(serializer);
549 }
550
551 /// Start the invalidation subscriber background task
552 #[cfg(feature = "redis")]
553 fn start_invalidation_subscriber(&self) {
554 #[cfg(feature = "redis")]
555 if let Some(subscriber) = &self.invalidation_subscriber {
556 let tiers = self.tiers.clone();
557
558 subscriber.start(move |msg: crate::invalidation::InvalidationMessage| {
559 let tiers = tiers.clone();
560 async move {
561 for tier in &tiers {
562 match &msg {
563 InvalidationMessage::Remove { key } => {
564 tier.backend.remove(key).await.ok();
565 }
566 InvalidationMessage::Update {
567 key,
568 value,
569 ttl_secs,
570 } => {
571 if let Some(secs) = ttl_secs {
572 tier.backend
573 .set_with_ttl(
574 key,
575 value.clone(),
576 Duration::from_secs(*secs),
577 )
578 .await
579 .ok();
580 } else {
581 tier.backend.set(key, value.clone()).await.ok();
582 }
583 }
584 InvalidationMessage::RemovePattern { pattern } => {
585 if let Err(e) = tier.backend.remove_pattern(pattern).await {
586 warn!(
587 "Failed to remove pattern '{}' from L{}: {}",
588 pattern, tier.tier_level, e
589 );
590 }
591 }
592 InvalidationMessage::RemoveBulk { keys } => {
593 for key in keys {
594 if let Err(e) = tier.backend.remove(key).await {
595 warn!(
596 "Failed to remove '{}' from L{}: {}",
597 key, tier.tier_level, e
598 );
599 }
600 }
601 }
602 }
603 }
604 Ok(())
605 }
606 });
607
608 info!("Invalidation subscriber started across all tiers");
609 }
610 }
611
612 /// Get value from cache using multi-tier architecture (v0.5.0+)
613 ///
614 /// This method iterates through all configured tiers and automatically promotes
615 /// to upper tiers on cache hit.
616 async fn get_multi_tier(&self, key: &str) -> CacheResult<Option<Bytes>> {
617 // Try each tier sequentially (sorted by tier_level)
618 for (tier_index, tier) in self.tiers.iter().enumerate() {
619 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
620 // Cache hit!
621 tier.record_hit();
622
623 // Promote to all upper tiers (if promotion enabled)
624 if tier.promotion_enabled && tier_index > 0 {
625 // Probabilistic Promotion Check
626 let should_promote = if tier.promotion_frequency <= 1 {
627 true
628 } else {
629 rand::thread_rng().gen_ratio(
630 1,
631 u32::try_from(tier.promotion_frequency).unwrap_or(u32::MAX),
632 )
633 };
634
635 if should_promote {
636 let promotion_ttl =
637 ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
638
639 // Promote to all tiers above this one
640 for upper_tier in self.tiers.iter().take(tier_index).rev() {
641 if let Err(e) = upper_tier
642 .set_with_ttl(key, value.clone(), promotion_ttl)
643 .await
644 {
645 warn!(
646 "Failed to promote '{}' from L{} to L{}: {}",
647 key, tier.tier_level, upper_tier.tier_level, e
648 );
649 } else {
650 self.promotions.fetch_add(1, Ordering::Relaxed);
651 debug!(
652 "Promoted '{}' from L{} to L{} (TTL: {:?})",
653 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
654 );
655 }
656 }
657 } else {
658 debug!(
659 "Probabilistic skip promotion for '{}' from L{} (N={})",
660 key, tier.tier_level, tier.promotion_frequency
661 );
662 }
663 }
664
665 return Ok(Some(value));
666 }
667 }
668
669 // Cache miss across all tiers
670 Ok(None)
671 }
672
673 /// Get value from cache (L1 first, then L2 fallback with promotion)
674 ///
675 /// This method now includes built-in Cache Stampede protection when cache misses occur.
676 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
677 /// unnecessary duplicate work on external data sources.
678 ///
679 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
680 ///
681 /// # Arguments
682 /// * `key` - Cache key to retrieve
683 ///
684 /// # Returns
685 /// * `Ok(Some(value))` - Cache hit, value found in any tier
686 /// * `Ok(None)` - Cache miss, value not found in any cache
687 /// * `Err(error)` - Cache operation failed
688 /// # Errors
689 ///
690 /// Returns an error if cache operation fails.
691 ///
692 /// # Panics
693 ///
694 /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
695 pub async fn get(&self, key: &str) -> CacheResult<Option<Bytes>> {
696 self.total_requests.fetch_add(1, Ordering::Relaxed);
697
698 // Fast path for L1 (first tier) - no locking needed
699 if let Some(tier1) = self.tiers.first()
700 && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
701 {
702 tier1.record_hit();
703 // Update legacy stats for backward compatibility
704 self.l1_hits.fetch_add(1, Ordering::Relaxed);
705 return Ok(Some(value));
706 }
707
708 // L1 miss - use stampede protection for lower tiers
709 let key_owned = key.to_string();
710 let lock_guard = self
711 .in_flight_requests
712 .entry(key_owned.clone())
713 .or_insert_with(|| broadcast::Sender::new(1))
714 .clone();
715
716 let mut rx = lock_guard.subscribe();
717
718 // If there are other receivers, someone else is computing, wait for it
719 if lock_guard.receiver_count() > 1 {
720 match rx.recv().await {
721 Ok(Ok(value)) => return Ok(Some(value)),
722 Ok(Err(e)) => {
723 return Err(crate::error::CacheError::BackendError(format!(
724 "Computation failed in another thread: {e}"
725 )));
726 }
727 Err(_) => {} // Fall through to re-compute if sender dropped or channel empty
728 }
729 }
730
731 // Double-check L1 after acquiring lock (or if we are the first to compute)
732 if let Some(tier1) = self.tiers.first()
733 && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
734 {
735 tier1.record_hit();
736 self.l1_hits.fetch_add(1, Ordering::Relaxed);
737 let _ = lock_guard.send(Ok(value.clone())); // Notify any waiting subscribers
738 return Ok(Some(value));
739 }
740
741 // Check remaining tiers with promotion
742 let result = self.get_multi_tier(key).await?;
743
744 if let Some(val) = result.clone() {
745 // Hit in L2+ tier - update legacy stats
746 if self.tiers.len() >= 2 {
747 self.l2_hits.fetch_add(1, Ordering::Relaxed);
748 }
749
750 // Notify any waiting subscribers
751 let _ = lock_guard.send(Ok(val.clone()));
752
753 // Remove the in-flight entry after computation/retrieval
754 self.in_flight_requests.remove(key);
755
756 Ok(Some(val))
757 } else {
758 self.misses.fetch_add(1, Ordering::Relaxed);
759
760 // Remove the in-flight entry after computation/retrieval
761 self.in_flight_requests.remove(key);
762
763 Ok(None)
764 }
765 }
766
767 /// Get a value from cache and deserialize it (Type-Safe Version)
768 ///
769 /// # Errors
770 ///
771 /// Returns a `SerializationError` if deserialization fails, or a `BackendError` if the cache retrieval fails.
772 pub async fn get_typed<T>(&self, key: &str) -> CacheResult<Option<T>>
773 where
774 T: serde::de::DeserializeOwned,
775 {
776 if let Some(bytes) = self.get(key).await? {
777 return Ok(Some(self.serializer.deserialize::<T>(&bytes)?));
778 }
779 Ok(None)
780 }
781
782 /// Set value with specific cache strategy (all tiers)
783 ///
784 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
785 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
786 /// # Errors
787 ///
788 /// Returns an error if cache set operation fails.
789 pub async fn set_with_strategy(
790 &self,
791 key: &str,
792 value: Bytes,
793 strategy: CacheStrategy,
794 ) -> CacheResult<()> {
795 let ttl = strategy.to_duration();
796
797 let mut success_count = 0;
798 let mut last_error = None;
799
800 for tier in &self.tiers {
801 match tier.set_with_ttl(key, value.clone(), ttl).await {
802 Ok(()) => {
803 success_count += 1;
804 }
805 Err(e) => {
806 error!(
807 "L{} cache set failed for key '{}': {}",
808 tier.tier_level, key, e
809 );
810 last_error = Some(e);
811 }
812 }
813 }
814
815 if success_count > 0 {
816 debug!(
817 "[Cache] Stored '{}' in {}/{} tiers (base TTL: {:?})",
818 key,
819 success_count,
820 self.tiers.len(),
821 ttl
822 );
823 return Ok(());
824 }
825
826 Err(last_error.unwrap_or_else(|| {
827 crate::error::CacheError::InternalError("All tiers failed".to_string())
828 }))
829 }
830
831 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
832 ///
833 /// This method provides comprehensive Cache Stampede protection:
834 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
835 /// 2. Check L2 cache with mutex-based coalescing
836 /// 3. Compute fresh data with protection against concurrent computations
837 ///
838 /// # Arguments
839 /// * `key` - Cache key
840 /// * `strategy` - Cache strategy for TTL and storage behavior
841 /// * `compute_fn` - Async function to compute the value if not in any cache
842 ///
843 /// # Example
844 /// ```ignore
845 /// let api_data = cache_manager.get_or_compute_with(
846 /// "api_response",
847 /// CacheStrategy::RealTime,
848 /// || async {
849 /// fetch_data_from_api().await
850 /// }
851 /// ).await?;
852 /// ```
853 #[allow(dead_code)]
854 /// # Errors
855 ///
856 /// Returns an error if compute function fails or cache operations fail.
857 pub async fn get_or_compute_with<F, Fut>(
858 &self,
859 key: &str,
860 strategy: CacheStrategy,
861 compute_fn: F,
862 ) -> CacheResult<Bytes>
863 where
864 F: FnOnce() -> Fut + Send,
865 Fut: Future<Output = CacheResult<Bytes>> + Send,
866 {
867 self.total_requests.fetch_add(1, Ordering::Relaxed);
868
869 // 1. Try tiers sequentially first
870 for (idx, tier) in self.tiers.iter().enumerate() {
871 if let Some((value, _ttl)) = tier.get_with_ttl(key).await {
872 tier.record_hit();
873 if tier.tier_level == 1 {
874 self.l1_hits.fetch_add(1, Ordering::Relaxed);
875 } else if tier.tier_level == 2 {
876 self.l2_hits.fetch_add(1, Ordering::Relaxed);
877 }
878
879 // Promotion to L1 if hit was in a lower tier
880 if idx > 0 && tier.promotion_enabled {
881 // Probabilistic Promotion Check
882 let should_promote = if tier.promotion_frequency <= 1 {
883 true
884 } else {
885 rand::thread_rng().gen_ratio(
886 1,
887 u32::try_from(tier.promotion_frequency).unwrap_or(u32::MAX),
888 )
889 };
890
891 if should_promote && let Some(l1_tier) = self.tiers.first() {
892 let _ = l1_tier
893 .set_with_ttl(key, value.clone(), strategy.to_duration())
894 .await;
895 self.promotions.fetch_add(1, Ordering::Relaxed);
896 }
897 }
898 return Ok(value);
899 }
900 }
901
902 // 2. Cache miss across all tiers - use stampede protection
903 let (tx, mut rx): (
904 tokio::sync::broadcast::Sender<CacheResult<Bytes>>,
905 tokio::sync::broadcast::Receiver<CacheResult<Bytes>>,
906 ) = match self.in_flight_requests.entry(key.to_string()) {
907 dashmap::mapref::entry::Entry::Occupied(entry) => {
908 let tx = entry.get().clone();
909 (tx.clone(), tx.subscribe())
910 }
911 dashmap::mapref::entry::Entry::Vacant(entry) => {
912 let (tx, _) = tokio::sync::broadcast::channel(1);
913 entry.insert(tx.clone());
914 (tx.clone(), tx.subscribe())
915 }
916 };
917
918 if tx.receiver_count() > 1 {
919 // Someone else is computing, wait for it
920 match rx.recv().await {
921 Ok(Ok(value)) => return Ok(value),
922 Ok(Err(e)) => {
923 return Err(crate::error::CacheError::BackendError(format!(
924 "Computation failed in another thread: {e}"
925 )));
926 }
927 Err(_) => {} // Fall through to re-compute
928 }
929 }
930
931 // 3. Re-check cache after receiving/creating broadcaster (double-check pattern)
932 for tier in &self.tiers {
933 if let Some((value, _)) = tier.get_with_ttl(key).await {
934 let _ = tx.send(Ok(value.clone()));
935 return Ok(value);
936 }
937 }
938
939 // 4. Miss - compute fresh data
940 debug!(
941 "Computing fresh data for key: '{}' (Stampede protected)",
942 key
943 );
944
945 let result = compute_fn().await;
946
947 // Remove from in_flight BEFORE broadcasting
948 self.in_flight_requests.remove(key);
949
950 match &result {
951 Ok(value) => {
952 let _ = self.set_with_strategy(key, value.clone(), strategy).await;
953 let _ = tx.send(Ok(value.clone()));
954 }
955 Err(e) => {
956 let _ = tx.send(Err(e.clone()));
957 }
958 }
959
960 result
961 }
962
963 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
964 ///
965 /// This method provides the same functionality as `get_or_compute_with()` but with
966 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
967 /// API calls, or any computation that returns structured data.
968 ///
969 /// # Type Safety
970 ///
971 /// - Returns your actual type `T` instead of `serde_json::Value`
972 /// - Compiler enforces Serialize + `DeserializeOwned` bounds
973 /// - No manual JSON conversion needed
974 ///
975 /// # Cache Flow
976 ///
977 /// 1. Check L1 cache → deserialize if found
978 /// 2. Check L2 cache → deserialize + promote to L1 if found
979 /// 3. Execute `compute_fn` → serialize → store in L1+L2
980 /// 4. Full stampede protection (only ONE request computes)
981 ///
982 /// # Arguments
983 ///
984 /// * `key` - Cache key
985 /// * `strategy` - Cache strategy for TTL
986 /// * `compute_fn` - Async function returning `Result<T>`
987 ///
988 /// # Example - Database Query
989 ///
990 /// ```no_run
991 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
992 /// # use std::sync::Arc;
993 /// # use serde::{Serialize, Deserialize};
994 /// # async fn example() -> anyhow::Result<()> {
995 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
996 /// # let l2 = Arc::new(L2Cache::new().await?);
997 /// # let cache_manager = CacheManager::new(l1, l2);
998 ///
999 /// #[derive(Serialize, Deserialize)]
1000 /// struct User {
1001 /// id: i64,
1002 /// name: String,
1003 /// }
1004 ///
1005 /// // Type-safe database caching (example - requires sqlx)
1006 /// // let user: User = cache_manager.get_or_compute_typed(
1007 /// // "user:123",
1008 /// // CacheStrategy::MediumTerm,
1009 /// // || async {
1010 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1011 /// // .bind(123)
1012 /// // .fetch_one(&pool)
1013 /// // .await
1014 /// // }
1015 /// // ).await?;
1016 /// # Ok(())
1017 /// # }
1018 /// ```
1019 ///
1020 /// # Example - API Call
1021 ///
1022 /// ```no_run
1023 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1024 /// # use std::sync::Arc;
1025 /// # use serde::{Serialize, Deserialize};
1026 /// # async fn example() -> anyhow::Result<()> {
1027 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1028 /// # let l2 = Arc::new(L2Cache::new().await?);
1029 /// # let cache_manager = CacheManager::new(l1, l2);
1030 /// #[derive(Serialize, Deserialize)]
1031 /// struct ApiResponse {
1032 /// data: String,
1033 /// timestamp: i64,
1034 /// }
1035 ///
1036 /// // API call caching (example - requires reqwest)
1037 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1038 /// // "api:endpoint",
1039 /// // CacheStrategy::RealTime,
1040 /// // || async {
1041 /// // reqwest::get("https://api.example.com/data")
1042 /// // .await?
1043 /// // .json::<ApiResponse>()
1044 /// // .await
1045 /// // }
1046 /// // ).await?;
1047 /// # Ok(())
1048 /// # }
1049 /// ```
1050 ///
1051 /// # Performance
1052 ///
1053 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1054 /// - L2 hit: 2-5ms + deserialization + L1 promotion
1055 /// - Compute: Your function time + serialization + L1+L2 storage
1056 /// - Stampede protection: 99.6% latency reduction under high concurrency
1057 ///
1058 /// # Errors
1059 ///
1060 /// Returns error if:
1061 /// - Compute function fails
1062 /// - Serialization fails (invalid type for JSON)
1063 /// - Deserialization fails (cache data doesn't match type T)
1064 /// - Cache operations fail (Redis connection issues)
1065 #[allow(clippy::too_many_lines)]
1066 pub async fn get_or_compute_typed<T, F, Fut>(
1067 &self,
1068 key: &str,
1069 strategy: CacheStrategy,
1070 compute_fn: F,
1071 ) -> CacheResult<T>
1072 where
1073 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1074 F: FnOnce() -> Fut + Send,
1075 Fut: Future<Output = CacheResult<T>> + Send,
1076 {
1077 // 1. Try to get typed from cache first
1078 if let Some(value) = self.get_typed::<T>(key).await? {
1079 return Ok(value);
1080 }
1081
1082 // 2. Use get_or_compute_with to handle stampede protection
1083 let serializer = self.serializer.clone();
1084 let bytes_result = self
1085 .get_or_compute_with(key, strategy, || async move {
1086 let val = compute_fn().await?;
1087 serializer.serialize(&val)
1088 })
1089 .await?;
1090
1091 // 3. Deserialize result
1092 self.serializer.deserialize::<T>(&bytes_result)
1093 }
1094
1095 /// Get comprehensive cache statistics
1096 ///
1097 /// In multi-tier mode, aggregates statistics from all tiers.
1098 /// In legacy mode, returns L1 and L2 stats.
1099 #[allow(dead_code)]
1100 pub fn get_stats(&self) -> CacheManagerStats {
1101 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1102 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1103 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1104 let misses = self.misses.load(Ordering::Relaxed);
1105
1106 CacheManagerStats {
1107 total_requests: total_reqs,
1108 l1_hits,
1109 l2_hits,
1110 total_hits: l1_hits + l2_hits,
1111 misses,
1112 hit_rate: if total_reqs > 0 {
1113 #[allow(clippy::cast_precision_loss)]
1114 {
1115 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1116 }
1117 } else {
1118 0.0
1119 },
1120 l1_hit_rate: if total_reqs > 0 {
1121 #[allow(clippy::cast_precision_loss)]
1122 {
1123 (l1_hits as f64 / total_reqs as f64) * 100.0
1124 }
1125 } else {
1126 0.0
1127 },
1128 promotions: self.promotions.load(Ordering::Relaxed),
1129 in_flight_requests: self.in_flight_requests.len(),
1130 }
1131 }
1132
1133 /// Get per-tier statistics (v0.5.0+)
1134 ///
1135 /// Returns statistics for each tier if multi-tier mode is enabled.
1136 /// Returns None if using legacy 2-tier mode.
1137 ///
1138 /// # Example
1139 /// ```rust,ignore
1140 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1141 /// for stats in tier_stats {
1142 /// println!("L{}: {} hits ({})",
1143 /// stats.tier_level,
1144 /// stats.hit_count(),
1145 /// stats.backend_name);
1146 /// }
1147 /// }
1148 /// ```
1149 pub fn get_tier_stats(&self) -> Vec<TierStats> {
1150 self.tiers.iter().map(|tier| tier.stats.clone()).collect()
1151 }
1152}
1153
1154/// Proxy wrapper to allow using `CacheBackend` where `DynL2CacheBackend` is expected
1155/// (Internal helper for `new_with_backends` to wrap L1 `CacheBackend` into `DynL2CacheBackend`)
1156struct ProxyL1ToL2(Arc<dyn CacheBackend>);
1157
1158impl CacheBackend for ProxyL1ToL2 {
1159 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
1160 self.0.get(key)
1161 }
1162
1163 fn set_with_ttl<'a>(
1164 &'a self,
1165 key: &'a str,
1166 value: Bytes,
1167 ttl: Duration,
1168 ) -> BoxFuture<'a, CacheResult<()>> {
1169 self.0.set_with_ttl(key, value, ttl)
1170 }
1171
1172 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
1173 self.0.remove(key)
1174 }
1175
1176 fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
1177 self.0.remove_pattern(pattern)
1178 }
1179
1180 fn health_check(&self) -> BoxFuture<'_, bool> {
1181 self.0.health_check()
1182 }
1183
1184 fn name(&self) -> &'static str {
1185 self.0.name()
1186 }
1187}
1188
1189impl L2CacheBackend for ProxyL1ToL2 {
1190 fn get_with_ttl<'a>(
1191 &'a self,
1192 key: &'a str,
1193 ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
1194 Box::pin(async move { self.0.get(key).await.map(|v| (v, None)) })
1195 }
1196}
1197
1198impl CacheManager {
1199 // ===== Redis Streams Methods =====
1200
1201 /// Publish data to Redis Stream
1202 ///
1203 /// # Arguments
1204 /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1205 /// * `fields` - Field-value pairs to publish
1206 /// * `maxlen` - Optional max length for stream trimming
1207 ///
1208 /// # Returns
1209 /// The entry ID generated by Redis
1210 ///
1211 /// # Errors
1212 /// Returns error if streaming backend is not configured
1213 pub async fn publish_to_stream(
1214 &self,
1215 stream_key: &str,
1216 fields: Vec<(String, String)>,
1217 maxlen: Option<usize>,
1218 ) -> CacheResult<String> {
1219 match &self.streaming_backend {
1220 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1221 None => Err(crate::error::CacheError::ConfigError(
1222 "Streaming backend not configured".to_string(),
1223 )),
1224 }
1225 }
1226
1227 /// Read latest entries from Redis Stream
1228 ///
1229 /// # Arguments
1230 /// * `stream_key` - Name of the stream
1231 /// * `count` - Number of latest entries to retrieve
1232 ///
1233 /// # Returns
1234 /// Vector of (`entry_id`, fields) tuples (newest first)
1235 ///
1236 /// # Errors
1237 /// Returns error if streaming backend is not configured
1238 pub async fn read_stream_latest(
1239 &self,
1240 stream_key: &str,
1241 count: usize,
1242 ) -> CacheResult<Vec<(String, Vec<(String, String)>)>> {
1243 match &self.streaming_backend {
1244 Some(backend) => backend
1245 .stream_read_latest(stream_key, count)
1246 .await,
1247 None => Err(crate::error::CacheError::ConfigError(
1248 "Streaming backend not configured".to_string(),
1249 )),
1250 }
1251 }
1252
1253 /// Read from Redis Stream with optional blocking
1254 ///
1255 /// # Arguments
1256 /// * `stream_key` - Name of the stream
1257 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1258 /// * `count` - Max entries to retrieve
1259 /// * `block_ms` - Optional blocking timeout in ms
1260 ///
1261 /// # Returns
1262 /// Vector of (`entry_id`, fields) tuples
1263 ///
1264 /// # Errors
1265 /// Returns error if streaming backend is not configured
1266 pub async fn read_stream(
1267 &self,
1268 stream_key: &str,
1269 last_id: &str,
1270 count: usize,
1271 block_ms: Option<usize>,
1272 ) -> CacheResult<Vec<(String, Vec<(String, String)>)>> {
1273 match &self.streaming_backend {
1274 Some(backend) => {
1275 backend
1276 .stream_read(stream_key, last_id, count, block_ms)
1277 .await
1278 }
1279 None => Err(crate::error::CacheError::ConfigError(
1280 "Streaming backend not configured".to_string(),
1281 )),
1282 }
1283 }
1284
1285 // ===== Cache Invalidation Methods =====
1286
1287 /// Invalidate a cache key across all instances
1288 ///
1289 /// This removes the key from all cache tiers and broadcasts
1290 /// the invalidation to all other cache instances via Redis Pub/Sub.
1291 ///
1292 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1293 ///
1294 /// # Arguments
1295 /// * `key` - Cache key to invalidate
1296 ///
1297 /// # Example
1298 /// ```rust,no_run
1299 /// # use multi_tier_cache::CacheManager;
1300 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1301 /// // Invalidate user cache after profile update
1302 /// cache_manager.invalidate("user:123").await?;
1303 /// # Ok(())
1304 /// # }
1305 /// ```
1306 /// # Errors
1307 ///
1308 /// Returns an error if invalidation fails.
1309 pub async fn invalidate(&self, key: &str) -> CacheResult<()> {
1310 // Remove from ALL tiers
1311 for tier in &self.tiers {
1312 if let Err(e) = tier.remove(key).await {
1313 warn!(
1314 "Failed to remove '{}' from L{}: {}",
1315 key, tier.tier_level, e
1316 );
1317 }
1318 }
1319
1320 // Broadcast to other instances
1321 #[cfg(feature = "redis")]
1322 {
1323 if let Some(publisher) = &self.invalidation_publisher {
1324 let mut pub_lock: tokio::sync::MutexGuard<
1325 '_,
1326 crate::invalidation::InvalidationPublisher,
1327 > = publisher.lock().await;
1328 let msg = InvalidationMessage::remove(key);
1329 pub_lock.publish(&msg).await?;
1330 self.invalidation_stats
1331 .messages_sent
1332 .fetch_add(1, Ordering::Relaxed);
1333 }
1334 }
1335
1336 debug!("Invalidated '{}' across all instances", key);
1337 Ok(())
1338 }
1339
1340 /// Update cache value across all instances
1341 ///
1342 /// This updates the key in all cache tiers and broadcasts
1343 /// the update to all other cache instances, avoiding cache misses.
1344 ///
1345 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1346 ///
1347 /// # Arguments
1348 /// * `key` - Cache key to update
1349 /// * `value` - New value
1350 /// * `ttl` - Optional TTL (uses default if None)
1351 ///
1352 /// # Example
1353 /// ```rust,no_run
1354 /// # use multi_tier_cache::CacheManager;
1355 /// # use std::time::Duration;
1356 /// # use bytes::Bytes;
1357 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1358 /// // Update user cache with new data
1359 /// let user_data = Bytes::from("alice");
1360 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1361 /// # Ok(())
1362 /// # }
1363 /// ```
1364 /// # Errors
1365 ///
1366 /// Returns an error if cache update fails.
1367 pub async fn update_cache(
1368 &self,
1369 key: &str,
1370 value: Bytes,
1371 ttl: Option<Duration>,
1372 ) -> CacheResult<()> {
1373 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1374
1375 // Update ALL tiers
1376 for tier in &self.tiers {
1377 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1378 warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1379 }
1380 }
1381
1382 // Broadcast update to other instances
1383 #[cfg(feature = "redis")]
1384 if let Some(publisher) = &self.invalidation_publisher {
1385 let mut pub_lock = publisher.lock().await;
1386 let msg = InvalidationMessage::update(key, value, Some(ttl));
1387 pub_lock.publish(&msg).await?;
1388 self.invalidation_stats
1389 .messages_sent
1390 .fetch_add(1, Ordering::Relaxed);
1391 }
1392
1393 debug!("Updated '{}' across all instances", key);
1394 Ok(())
1395 }
1396
1397 /// Invalidate all keys matching a pattern
1398 ///
1399 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1400 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1401 ///
1402 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1403 ///
1404 /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1405 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1406 ///
1407 /// # Arguments
1408 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1409 ///
1410 /// # Example
1411 /// ```rust,no_run
1412 /// # use multi_tier_cache::CacheManager;
1413 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1414 /// // Invalidate all user caches
1415 /// cache_manager.invalidate_pattern("user:*").await?;
1416 ///
1417 /// // Invalidate specific user's related caches
1418 /// cache_manager.invalidate_pattern("user:123:*").await?;
1419 /// # Ok(())
1420 /// # }
1421 /// ```
1422 /// # Errors
1423 ///
1424 /// Returns an error if invalidation fails.
1425 pub async fn invalidate_pattern(&self, pattern: &str) -> CacheResult<()> {
1426 debug!(pattern = %pattern, "Invalidating pattern across all tiers");
1427
1428 // 1. Invalidate in all configured tiers
1429 for tier in &self.tiers {
1430 debug!(tier = %tier.tier_level, "Invalidating pattern in tier");
1431 tier.backend.remove_pattern(pattern).await?;
1432 }
1433
1434 // 2. Broadcast invalidation if publisher is configured
1435 #[cfg(feature = "redis")]
1436 {
1437 if let Some(publisher) = &self.invalidation_publisher {
1438 let msg = InvalidationMessage::remove_pattern(pattern);
1439 publisher.lock().await.publish(&msg).await?;
1440 debug!(pattern = %pattern, "Broadcasted pattern invalidation");
1441 }
1442 }
1443
1444 Ok(())
1445 }
1446
1447 /// Set value with automatic broadcast to all instances
1448 ///
1449 /// This is a write-through operation that updates the cache and
1450 /// broadcasts the update to all other instances automatically.
1451 ///
1452 /// # Arguments
1453 /// * `key` - Cache key
1454 /// * `value` - Value to cache
1455 /// * `strategy` - Cache strategy (determines TTL)
1456 ///
1457 /// # Example
1458 /// ```rust,no_run
1459 /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1460 /// # use bytes::Bytes;
1461 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1462 /// // Update and broadcast in one call
1463 /// let data = Bytes::from("active");
1464 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1465 /// # Ok(())
1466 /// # }
1467 /// ```
1468 /// # Errors
1469 ///
1470 /// Returns an error if cache set or broadcast fails.
1471 pub async fn set_with_broadcast(
1472 &self,
1473 key: &str,
1474 value: Bytes,
1475 strategy: CacheStrategy,
1476 ) -> CacheResult<()> {
1477 #[cfg(feature = "redis")] let ttl = strategy.to_duration();
1478
1479 // Set in local caches
1480 self.set_with_strategy(key, value.clone(), strategy).await?;
1481
1482 // Broadcast update if invalidation is enabled
1483 #[cfg(feature = "redis")]
1484 if let Some(publisher) = &self.invalidation_publisher {
1485 let mut pub_lock = publisher.lock().await;
1486 let msg = InvalidationMessage::update(key, value, Some(ttl));
1487 pub_lock.publish(&msg).await?;
1488 self.invalidation_stats
1489 .messages_sent
1490 .fetch_add(1, Ordering::Relaxed);
1491 }
1492
1493 Ok(())
1494 }
1495
1496 /// Get invalidation statistics
1497 ///
1498 /// Returns statistics about invalidation operations if invalidation is enabled.
1499 #[cfg(feature = "redis")]
1500 pub fn invalidation_stats(&self) -> Option<crate::invalidation::InvalidationStats> {
1501 #[cfg(feature = "redis")]
1502 {
1503 Some(self.invalidation_stats.snapshot())
1504 }
1505 #[cfg(not(feature = "redis"))]
1506 {
1507 None
1508 }
1509 }
1510}
1511
1512/// Cache Manager statistics
1513#[allow(dead_code)]
1514#[derive(Debug, Clone)]
1515pub struct CacheManagerStats {
1516 pub total_requests: u64,
1517 pub l1_hits: u64,
1518 pub l2_hits: u64,
1519 pub total_hits: u64,
1520 pub misses: u64,
1521 pub hit_rate: f64,
1522 pub l1_hit_rate: f64,
1523 pub promotions: usize,
1524 pub in_flight_requests: usize,
1525}