multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use anyhow::Result;
6use dashmap::DashMap;
7use serde_json;
8use std::future::Future;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::time::Duration;
12use tokio::sync::{Mutex, broadcast};
13
14use tracing::{debug, error, info, warn};
15use rand::Rng;
16
17use crate::invalidation::{
18 AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
19 InvalidationStats, InvalidationSubscriber,
20};
21use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
22use crate::{L1Cache, L2Cache};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25
26///// Type alias for the in-flight requests map
27/// Stores a broadcast sender for each active key computation
28type InFlightMap = DashMap<String, broadcast::Sender<Result<Bytes, Arc<anyhow::Error>>>>;
29
30/// Cache strategies for different data types
31#[derive(Debug, Clone)]
32#[allow(dead_code)]
33pub enum CacheStrategy {
34 /// Real-time data - 10 seconds TTL
35 RealTime,
36 /// Short-term data - 5 minutes TTL
37 ShortTerm,
38 /// Medium-term data - 1 hour TTL
39 MediumTerm,
40 /// Long-term data - 3 hours TTL
41 LongTerm,
42 /// Custom TTL
43 Custom(Duration),
44 /// Default strategy (5 minutes)
45 Default,
46}
47
48impl CacheStrategy {
49 /// Convert strategy to duration
50 #[must_use]
51 pub fn to_duration(&self) -> Duration {
52 match self {
53 Self::RealTime => Duration::from_secs(10),
54 Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
55 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
56 Self::LongTerm => Duration::from_secs(10800), // 3 hours
57 Self::Custom(duration) => *duration,
58 }
59 }
60}
61
62/// Statistics for a single cache tier
63#[derive(Debug)]
64pub struct TierStats {
65 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
66 pub tier_level: usize,
67 /// Number of cache hits at this tier
68 pub hits: AtomicU64,
69 /// Backend name for identification
70 pub backend_name: String,
71}
72
73impl Clone for TierStats {
74 fn clone(&self) -> Self {
75 Self {
76 tier_level: self.tier_level,
77 hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
78 backend_name: self.backend_name.clone(),
79 }
80 }
81}
82
83impl TierStats {
84 fn new(tier_level: usize, backend_name: String) -> Self {
85 Self {
86 tier_level,
87 hits: AtomicU64::new(0),
88 backend_name,
89 }
90 }
91
92 /// Get current hit count
93 pub fn hit_count(&self) -> u64 {
94 self.hits.load(Ordering::Relaxed)
95 }
96}
97
98/// A single cache tier in the multi-tier architecture
99#[derive(Clone)]
100pub struct CacheTier {
101 /// The backend for this tier
102 pub backend: Arc<dyn L2CacheBackend>,
103 /// Tier level (1 for fastest, increases for slower/cheaper tiers)
104 pub tier_level: usize,
105 /// Whether to promote keys FROM lower tiers TO this tier
106 pub promotion_enabled: bool,
107 /// Promotion frequency (N) - promote with 1/N probability
108 pub promotion_frequency: usize,
109 /// TTL multiplier for this tier (e.g., L2 might store for 2x L1 TTL)
110 pub ttl_scale: f64,
111 /// Statistics for this tier
112 pub stats: TierStats,
113}
114
115impl CacheTier {
116 /// Create a new cache tier
117 pub fn new(
118 backend: Arc<dyn L2CacheBackend>,
119 tier_level: usize,
120 promotion_enabled: bool,
121 promotion_frequency: usize,
122 ttl_scale: f64,
123 ) -> Self {
124 let backend_name = backend.name().to_string();
125 Self {
126 backend,
127 tier_level,
128 promotion_enabled,
129 promotion_frequency,
130 ttl_scale,
131 stats: TierStats::new(tier_level, backend_name),
132 }
133 }
134
135 /// Get value with TTL from this tier
136 async fn get_with_ttl(&self, key: &str) -> Option<(Bytes, Option<Duration>)> {
137 self.backend.get_with_ttl(key).await
138 }
139
140 /// Set value with TTL in this tier
141 async fn set_with_ttl(&self, key: &str, value: Bytes, ttl: Duration) -> Result<()> {
142 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
143 self.backend.set_with_ttl(key, value, scaled_ttl).await
144 }
145
146 /// Remove value from this tier
147 async fn remove(&self, key: &str) -> Result<()> {
148 self.backend.remove(key).await
149 }
150
151 /// Record a cache hit for this tier
152 fn record_hit(&self) {
153 self.stats.hits.fetch_add(1, Ordering::Relaxed);
154 }
155}
156
157/// Configuration for a cache tier (used in builder pattern)
158#[derive(Debug, Clone)]
159pub struct TierConfig {
160 /// Tier level (1, 2, 3, 4...)
161 pub tier_level: usize,
162 /// Enable promotion to upper tiers on hit
163 pub promotion_enabled: bool,
164 /// Promotion frequency (N) - promote with 1/N probability (default 10)
165 pub promotion_frequency: usize,
166 /// TTL scale factor (1.0 = same as base TTL)
167 pub ttl_scale: f64,
168}
169
170impl TierConfig {
171 /// Create new tier configuration
172 #[must_use]
173 pub fn new(tier_level: usize) -> Self {
174 Self {
175 tier_level,
176 promotion_enabled: true,
177 promotion_frequency: 10,
178 ttl_scale: 1.0,
179 }
180 }
181
182 /// Configure as L1 (hot tier)
183 #[must_use]
184 pub fn as_l1() -> Self {
185 Self {
186 tier_level: 1,
187 promotion_enabled: false, // L1 is already top tier
188 promotion_frequency: 1, // Doesn't matter but use 1
189 ttl_scale: 1.0,
190 }
191 }
192
193 /// Configure as L2 (warm tier)
194 #[must_use]
195 pub fn as_l2() -> Self {
196 Self {
197 tier_level: 2,
198 promotion_enabled: true,
199 promotion_frequency: 10,
200 ttl_scale: 1.0,
201 }
202 }
203
204 /// Configure as L3 (cold tier) with longer TTL
205 #[must_use]
206 pub fn as_l3() -> Self {
207 Self {
208 tier_level: 3,
209 promotion_enabled: true,
210 promotion_frequency: 10,
211 ttl_scale: 2.0, // Keep data 2x longer
212 }
213 }
214
215 /// Configure as L4 (archive tier) with much longer TTL
216 #[must_use]
217 pub fn as_l4() -> Self {
218 Self {
219 tier_level: 4,
220 promotion_enabled: true,
221 promotion_frequency: 10,
222 ttl_scale: 8.0, // Keep data 8x longer
223 }
224 }
225
226 /// Set promotion enabled
227 #[must_use]
228 pub fn with_promotion(mut self, enabled: bool) -> Self {
229 self.promotion_enabled = enabled;
230 self
231 }
232
233 /// Set promotion frequency (N)
234 #[must_use]
235 pub fn with_promotion_frequency(mut self, n: usize) -> Self {
236 self.promotion_frequency = n;
237 self
238 }
239
240 /// Set TTL scale factor
241 #[must_use]
242 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
243 self.ttl_scale = scale;
244 self
245 }
246
247 /// Set tier level
248 #[must_use]
249 pub fn with_level(mut self, level: usize) -> Self {
250 self.tier_level = level;
251 self
252 }
253}
254
255pub struct CacheManager {
256 /// Ordered list of cache tiers (L1, L2, L3, ...)
257 tiers: Vec<CacheTier>,
258
259 /// Optional streaming backend
260 streaming_backend: Option<Arc<dyn StreamingBackend>>,
261 /// Statistics
262 total_requests: AtomicU64,
263 l1_hits: AtomicU64,
264 l2_hits: AtomicU64,
265 misses: AtomicU64,
266 promotions: AtomicUsize,
267 /// In-flight requests map (Broadcaster integration will replace this in Step 4)
268 in_flight_requests: Arc<InFlightMap>,
269 /// Invalidation publisher
270 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
271 /// Invalidation subscriber
272 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
273 /// Invalidation statistics
274 invalidation_stats: Arc<AtomicInvalidationStats>,
275}
276
277impl CacheManager {
278 /// Create new cache manager with trait objects (pluggable backends)
279 ///
280 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
281 ///
282 /// # Arguments
283 ///
284 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
285 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
286 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
287 ///
288 /// # Example
289 ///
290 /// ```rust,ignore
291 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
292 /// use std::sync::Arc;
293 ///
294 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
295 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
296 ///
297 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
298 /// ```
299 /// # Errors
300 ///
301 /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
302 pub fn new_with_backends(
303 l1_cache: Arc<dyn CacheBackend>,
304 l2_cache: Arc<dyn L2CacheBackend>,
305 streaming_backend: Option<Arc<dyn StreamingBackend>>,
306 ) -> Result<Self> {
307 debug!("Initializing Cache Manager with L1+L2 backends...");
308
309 let tiers = vec![
310 CacheTier::new(Arc::new(ProxyL1ToL2(l1_cache)), 1, false, 1, 1.0),
311 CacheTier::new(l2_cache, 2, true, 10, 1.0),
312 ];
313
314 Ok(Self {
315 tiers,
316 streaming_backend,
317 total_requests: AtomicU64::new(0),
318 l1_hits: AtomicU64::new(0),
319 l2_hits: AtomicU64::new(0),
320 misses: AtomicU64::new(0),
321 promotions: AtomicUsize::new(0),
322 in_flight_requests: Arc::new(DashMap::new()),
323 invalidation_publisher: None,
324 invalidation_subscriber: None,
325 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
326 })
327 }
328
329 /// Create new cache manager with default backends (backward compatible)
330 ///
331 /// This is the legacy constructor maintained for backward compatibility.
332 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
333 ///
334 /// # Arguments
335 ///
336 /// * `l1_cache` - Moka L1 cache instance
337 /// * `l2_cache` - Redis L2 cache instance
338 /// # Errors
339 ///
340 /// Returns an error if Redis connection fails.
341 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
342 debug!("Initializing Cache Manager...");
343
344 // Convert concrete types to trait objects
345 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
346 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
347
348 // Create RedisStreams backend for streaming functionality
349 let redis_url =
350 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
351 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
352 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
353
354 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend))
355 }
356
357 /// Create new cache manager with invalidation support
358 ///
359 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
360 ///
361 /// # Arguments
362 ///
363 /// * `l1_cache` - Moka L1 cache instance
364 /// * `l2_cache` - Redis L2 cache instance
365 /// * `redis_url` - Redis connection URL for Pub/Sub
366 /// * `config` - Invalidation configuration
367 ///
368 /// # Example
369 ///
370 /// ```rust,ignore
371 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
372 ///
373 /// let config = InvalidationConfig {
374 /// channel: "my_app:cache:invalidate".to_string(),
375 /// ..Default::default()
376 /// };
377 ///
378 /// let manager = CacheManager::new_with_invalidation(
379 /// l1, l2, "redis://localhost", config
380 /// ).await?;
381 /// ```
382 /// # Errors
383 ///
384 /// Returns an error if Redis connection fails or invalidation setup fails.
385 pub async fn new_with_invalidation(
386 l1_cache: Arc<L1Cache>,
387 l2_cache: Arc<L2Cache>,
388 redis_url: &str,
389 config: InvalidationConfig,
390 ) -> Result<Self> {
391 debug!("Initializing Cache Manager with Invalidation...");
392 debug!(" Pub/Sub channel: {}", config.channel);
393
394 // Convert concrete types to trait objects
395 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
396 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
397
398 // Create RedisStreams backend for streaming functionality
399 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
400 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
401
402 // Create publisher
403 let client = redis::Client::open(redis_url)?;
404 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
405 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
406
407 // Create subscriber
408 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
409 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
410
411 let tiers = vec![
412 CacheTier::new(Arc::new(ProxyL1ToL2(l1_backend)), 1, false, 1, 1.0),
413 CacheTier::new(l2_backend, 2, true, 10, 1.0),
414 ];
415
416 let manager = Self {
417 tiers,
418 streaming_backend: Some(streaming_backend),
419 total_requests: AtomicU64::new(0),
420 l1_hits: AtomicU64::new(0),
421 l2_hits: AtomicU64::new(0),
422 misses: AtomicU64::new(0),
423 promotions: AtomicUsize::new(0),
424 in_flight_requests: Arc::new(DashMap::new()),
425 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
426 invalidation_subscriber: Some(Arc::new(subscriber)),
427 invalidation_stats,
428 };
429
430 // Start subscriber with handler
431 manager.start_invalidation_subscriber();
432
433 info!("Cache Manager initialized with invalidation support");
434
435 Ok(manager)
436 }
437
438 /// Create new cache manager with multi-tier architecture (v0.5.0+)
439 ///
440 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
441 /// Tiers are checked in order (lower `tier_level` = faster/hotter).
442 ///
443 /// # Arguments
444 ///
445 /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
446 /// * `streaming_backend` - Optional streaming backend
447 ///
448 /// # Example
449 ///
450 /// ```rust,ignore
451 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
452 /// use std::sync::Arc;
453 ///
454 /// // L1 + L2 + L3 setup
455 /// let l1 = Arc::new(L1Cache::new()?);
456 /// let l2 = Arc::new(L2Cache::new().await?);
457 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
458 ///
459 /// let tiers = vec![
460 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
461 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
462 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
463 /// ];
464 ///
465 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
466 /// ```
467 /// # Errors
468 ///
469 /// Returns an error if tiers are not sorted by level or if no tiers are provided.
470 pub fn new_with_tiers(
471 tiers: Vec<CacheTier>,
472 streaming_backend: Option<Arc<dyn StreamingBackend>>,
473 ) -> Result<Self> {
474 debug!("Initializing Multi-Tier Cache Manager...");
475 debug!(" Tier count: {}", tiers.len());
476 for tier in &tiers {
477 debug!(
478 " L{}: {} (promotion={}, ttl_scale={})",
479 tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
480 );
481 }
482
483 // Validate tiers are sorted by level
484 for i in 1..tiers.len() {
485 if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1))
486 && current.tier_level <= prev.tier_level
487 {
488 anyhow::bail!(
489 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
490 current.tier_level,
491 prev.tier_level
492 );
493 }
494 }
495
496 Ok(Self {
497 tiers,
498 streaming_backend,
499 total_requests: AtomicU64::new(0),
500 l1_hits: AtomicU64::new(0),
501 l2_hits: AtomicU64::new(0),
502 misses: AtomicU64::new(0),
503 promotions: AtomicUsize::new(0),
504 in_flight_requests: Arc::new(DashMap::new()),
505 invalidation_publisher: None,
506 invalidation_subscriber: None,
507 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
508 })
509 }
510
511 /// Start the invalidation subscriber background task
512 fn start_invalidation_subscriber(&self) {
513 if let Some(subscriber) = &self.invalidation_subscriber {
514 let tiers = self.tiers.clone();
515
516 subscriber.start(move |msg| {
517 let tiers = tiers.clone();
518 async move {
519 for tier in &tiers {
520 match &msg {
521 InvalidationMessage::Remove { key } => {
522 if let Err(e) = tier.backend.remove(key).await {
523 warn!(
524 "Failed to remove '{}' from L{}: {}",
525 key, tier.tier_level, e
526 );
527 }
528 }
529 InvalidationMessage::Update {
530 key,
531 value,
532 ttl_secs,
533 } => {
534 let ttl = ttl_secs
535 .map_or_else(|| Duration::from_secs(300), Duration::from_secs);
536 if let Err(e) =
537 tier.backend.set_with_ttl(key, value.clone(), ttl).await
538 {
539 warn!(
540 "Failed to update '{}' in L{}: {}",
541 key, tier.tier_level, e
542 );
543 }
544 }
545 InvalidationMessage::RemovePattern { pattern } => {
546 if let Err(e) = tier.backend.remove_pattern(pattern).await {
547 warn!(
548 "Failed to remove pattern '{}' from L{}: {}",
549 pattern, tier.tier_level, e
550 );
551 }
552 }
553 InvalidationMessage::RemoveBulk { keys } => {
554 for key in keys {
555 if let Err(e) = tier.backend.remove(key).await {
556 warn!(
557 "Failed to remove '{}' from L{}: {}",
558 key, tier.tier_level, e
559 );
560 }
561 }
562 }
563 }
564 }
565 Ok(())
566 }
567 });
568
569 info!("Invalidation subscriber started across all tiers");
570 }
571 }
572
573 /// Get value from cache using multi-tier architecture (v0.5.0+)
574 ///
575 /// This method iterates through all configured tiers and automatically promotes
576 /// to upper tiers on cache hit.
577 async fn get_multi_tier(&self, key: &str) -> Result<Option<Bytes>> {
578 // Try each tier sequentially (sorted by tier_level)
579 for (tier_index, tier) in self.tiers.iter().enumerate() {
580 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
581 // Cache hit!
582 tier.record_hit();
583
584 // Promote to all upper tiers (if promotion enabled)
585 if tier.promotion_enabled && tier_index > 0 {
586 // Probabilistic Promotion Check
587 let should_promote = if tier.promotion_frequency <= 1 {
588 true
589 } else {
590 rand::thread_rng().gen_ratio(1, tier.promotion_frequency as u32)
591 };
592
593 if should_promote {
594 let promotion_ttl =
595 ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
596
597 // Promote to all tiers above this one
598 for upper_tier in self.tiers.iter().take(tier_index).rev() {
599 if let Err(e) = upper_tier
600 .set_with_ttl(key, value.clone(), promotion_ttl)
601 .await
602 {
603 warn!(
604 "Failed to promote '{}' from L{} to L{}: {}",
605 key, tier.tier_level, upper_tier.tier_level, e
606 );
607 } else {
608 self.promotions.fetch_add(1, Ordering::Relaxed);
609 debug!(
610 "Promoted '{}' from L{} to L{} (TTL: {:?})",
611 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
612 );
613 }
614 }
615 } else {
616 debug!(
617 "Probabilistic skip promotion for '{}' from L{} (N={})",
618 key, tier.tier_level, tier.promotion_frequency
619 );
620 }
621 }
622
623 return Ok(Some(value));
624 }
625 }
626
627 // Cache miss across all tiers
628 Ok(None)
629 }
630
631 /// Get value from cache (L1 first, then L2 fallback with promotion)
632 ///
633 /// This method now includes built-in Cache Stampede protection when cache misses occur.
634 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
635 /// unnecessary duplicate work on external data sources.
636 ///
637 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
638 ///
639 /// # Arguments
640 /// * `key` - Cache key to retrieve
641 ///
642 /// # Returns
643 /// * `Ok(Some(value))` - Cache hit, value found in any tier
644 /// * `Ok(None)` - Cache miss, value not found in any cache
645 /// * `Err(error)` - Cache operation failed
646 /// # Errors
647 ///
648 /// Returns an error if cache operation fails.
649 ///
650 /// # Panics
651 ///
652 /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
653 pub async fn get(&self, key: &str) -> Result<Option<Bytes>> {
654 self.total_requests.fetch_add(1, Ordering::Relaxed);
655
656 // Fast path for L1 (first tier) - no locking needed
657 if let Some(tier1) = self.tiers.first()
658 && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
659 {
660 tier1.record_hit();
661 // Update legacy stats for backward compatibility
662 self.l1_hits.fetch_add(1, Ordering::Relaxed);
663 return Ok(Some(value));
664 }
665
666 // L1 miss - use stampede protection for lower tiers
667 let key_owned = key.to_string();
668 let lock_guard = self
669 .in_flight_requests
670 .entry(key_owned.clone())
671 .or_insert_with(|| broadcast::Sender::new(1))
672 .clone();
673
674 let mut rx = lock_guard.subscribe();
675
676 // If there are other receivers, someone else is computing, wait for it
677 if lock_guard.receiver_count() > 1 {
678 match rx.recv().await {
679 Ok(Ok(value)) => return Ok(Some(value)),
680 Ok(Err(e)) => {
681 return Err(anyhow::anyhow!("Computation failed in another thread: {e}"));
682 }
683 Err(_) => {} // Fall through to re-compute if sender dropped or channel empty
684 }
685 }
686
687 // Double-check L1 after acquiring lock (or if we are the first to compute)
688 if let Some(tier1) = self.tiers.first()
689 && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
690 {
691 tier1.record_hit();
692 self.l1_hits.fetch_add(1, Ordering::Relaxed);
693 let _ = lock_guard.send(Ok(value.clone())); // Notify any waiting subscribers
694 return Ok(Some(value));
695 }
696
697 // Check remaining tiers with promotion
698 let result = self.get_multi_tier(key).await?;
699
700 if let Some(val) = result.clone() {
701 // Hit in L2+ tier - update legacy stats
702 if self.tiers.len() >= 2 {
703 self.l2_hits.fetch_add(1, Ordering::Relaxed);
704 }
705
706 // Notify any waiting subscribers
707 let _ = lock_guard.send(Ok(val.clone()));
708
709 // Remove the in-flight entry after computation/retrieval
710 self.in_flight_requests.remove(key);
711
712 Ok(Some(val))
713 } else {
714 self.misses.fetch_add(1, Ordering::Relaxed);
715
716 // Remove the in-flight entry after computation/retrieval
717 self.in_flight_requests.remove(key);
718
719 Ok(None)
720 }
721 }
722
723 /// Set value with specific cache strategy (all tiers)
724 ///
725 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
726 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
727 /// # Errors
728 ///
729 /// Returns an error if cache set operation fails.
730 pub async fn set_with_strategy(
731 &self,
732 key: &str,
733 value: Bytes,
734 strategy: CacheStrategy,
735 ) -> Result<()> {
736 let ttl = strategy.to_duration();
737
738 let mut success_count = 0;
739 let mut last_error = None;
740
741 for tier in &self.tiers {
742 match tier.set_with_ttl(key, value.clone(), ttl).await {
743 Ok(()) => {
744 success_count += 1;
745 }
746 Err(e) => {
747 error!(
748 "L{} cache set failed for key '{}': {}",
749 tier.tier_level, key, e
750 );
751 last_error = Some(e);
752 }
753 }
754 }
755
756 if success_count > 0 {
757 debug!(
758 "[Cache] Stored '{}' in {}/{} tiers (base TTL: {:?})",
759 key,
760 success_count,
761 self.tiers.len(),
762 ttl
763 );
764 return Ok(());
765 }
766
767 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{key}'")))
768 }
769
770 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
771 ///
772 /// This method provides comprehensive Cache Stampede protection:
773 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
774 /// 2. Check L2 cache with mutex-based coalescing
775 /// 3. Compute fresh data with protection against concurrent computations
776 ///
777 /// # Arguments
778 /// * `key` - Cache key
779 /// * `strategy` - Cache strategy for TTL and storage behavior
780 /// * `compute_fn` - Async function to compute the value if not in any cache
781 ///
782 /// # Example
783 /// ```ignore
784 /// let api_data = cache_manager.get_or_compute_with(
785 /// "api_response",
786 /// CacheStrategy::RealTime,
787 /// || async {
788 /// fetch_data_from_api().await
789 /// }
790 /// ).await?;
791 /// ```
792 #[allow(dead_code)]
793 /// # Errors
794 ///
795 /// Returns an error if compute function fails or cache operations fail.
796 pub async fn get_or_compute_with<F, Fut>(
797 &self,
798 key: &str,
799 strategy: CacheStrategy,
800 compute_fn: F,
801 ) -> Result<Bytes>
802 where
803 F: FnOnce() -> Fut + Send,
804 Fut: Future<Output = Result<Bytes>> + Send,
805 {
806 self.total_requests.fetch_add(1, Ordering::Relaxed);
807
808 // 1. Try tiers sequentially first
809 for (idx, tier) in self.tiers.iter().enumerate() {
810 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
811 tier.record_hit();
812 if tier.tier_level == 1 {
813 self.l1_hits.fetch_add(1, Ordering::Relaxed);
814 } else if tier.tier_level == 2 {
815 self.l2_hits.fetch_add(1, Ordering::Relaxed);
816 }
817
818 // Promotion to L1 if hit was in a lower tier
819 if idx > 0 && tier.promotion_enabled {
820 // Probabilistic Promotion Check
821 let should_promote = if tier.promotion_frequency <= 1 {
822 true
823 } else {
824 rand::thread_rng().gen_ratio(1, tier.promotion_frequency as u32)
825 };
826
827 if should_promote {
828 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
829 if let Some(l1_tier) = self.tiers.first() {
830 let _ = l1_tier
831 .set_with_ttl(key, value.clone(), promotion_ttl)
832 .await;
833 }
834 }
835 }
836
837 return Ok(value);
838 }
839 }
840
841 // 2. Cache miss across all tiers - use stampede protection
842 let (tx, mut rx) = match self.in_flight_requests.entry(key.to_string()) {
843 dashmap::mapref::entry::Entry::Occupied(entry) => {
844 let tx = entry.get().clone();
845 (tx.clone(), tx.subscribe())
846 }
847 dashmap::mapref::entry::Entry::Vacant(entry) => {
848 let (tx, _) = broadcast::channel(1);
849 entry.insert(tx.clone());
850 (tx.clone(), tx.subscribe())
851 }
852 };
853
854 if tx.receiver_count() > 1 {
855 // Someone else is computing, wait for it
856 match rx.recv().await {
857 Ok(Ok(value)) => return Ok(value),
858 Ok(Err(e)) => {
859 return Err(anyhow::anyhow!("Computation failed in another thread: {e}"));
860 }
861 Err(_) => {} // Fall through to re-compute
862 }
863 }
864
865 // 3. Re-check cache after receiving/creating broadcaster (double-check pattern)
866 for tier in &self.tiers {
867 if let Some((value, _)) = tier.get_with_ttl(key).await {
868 let _ = tx.send(Ok(value.clone()));
869 return Ok(value);
870 }
871 }
872
873 // 4. Miss - compute fresh data
874 debug!(
875 "Computing fresh data for key: '{}' (Stampede protected)",
876 key
877 );
878
879 let result = compute_fn().await;
880
881 // Remove from in_flight BEFORE broadcasting
882 self.in_flight_requests.remove(key);
883
884 match &result {
885 Ok(value) => {
886 let _ = self.set_with_strategy(key, value.clone(), strategy).await;
887 let _ = tx.send(Ok(value.clone()));
888 }
889 Err(e) => {
890 let _ = tx.send(Err(Arc::new(anyhow::anyhow!("{e}"))));
891 }
892 }
893
894 result
895 }
896
897 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
898 ///
899 /// This method provides the same functionality as `get_or_compute_with()` but with
900 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
901 /// API calls, or any computation that returns structured data.
902 ///
903 /// # Type Safety
904 ///
905 /// - Returns your actual type `T` instead of `serde_json::Value`
906 /// - Compiler enforces Serialize + `DeserializeOwned` bounds
907 /// - No manual JSON conversion needed
908 ///
909 /// # Cache Flow
910 ///
911 /// 1. Check L1 cache → deserialize if found
912 /// 2. Check L2 cache → deserialize + promote to L1 if found
913 /// 3. Execute `compute_fn` → serialize → store in L1+L2
914 /// 4. Full stampede protection (only ONE request computes)
915 ///
916 /// # Arguments
917 ///
918 /// * `key` - Cache key
919 /// * `strategy` - Cache strategy for TTL
920 /// * `compute_fn` - Async function returning `Result<T>`
921 ///
922 /// # Example - Database Query
923 ///
924 /// ```no_run
925 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
926 /// # use std::sync::Arc;
927 /// # use serde::{Serialize, Deserialize};
928 /// # async fn example() -> anyhow::Result<()> {
929 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
930 /// # let l2 = Arc::new(L2Cache::new().await?);
931 /// # let cache_manager = CacheManager::new(l1, l2);
932 ///
933 /// #[derive(Serialize, Deserialize)]
934 /// struct User {
935 /// id: i64,
936 /// name: String,
937 /// }
938 ///
939 /// // Type-safe database caching (example - requires sqlx)
940 /// // let user: User = cache_manager.get_or_compute_typed(
941 /// // "user:123",
942 /// // CacheStrategy::MediumTerm,
943 /// // || async {
944 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
945 /// // .bind(123)
946 /// // .fetch_one(&pool)
947 /// // .await
948 /// // }
949 /// // ).await?;
950 /// # Ok(())
951 /// # }
952 /// ```
953 ///
954 /// # Example - API Call
955 ///
956 /// ```no_run
957 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
958 /// # use std::sync::Arc;
959 /// # use serde::{Serialize, Deserialize};
960 /// # async fn example() -> anyhow::Result<()> {
961 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
962 /// # let l2 = Arc::new(L2Cache::new().await?);
963 /// # let cache_manager = CacheManager::new(l1, l2);
964 /// #[derive(Serialize, Deserialize)]
965 /// struct ApiResponse {
966 /// data: String,
967 /// timestamp: i64,
968 /// }
969 ///
970 /// // API call caching (example - requires reqwest)
971 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
972 /// // "api:endpoint",
973 /// // CacheStrategy::RealTime,
974 /// // || async {
975 /// // reqwest::get("https://api.example.com/data")
976 /// // .await?
977 /// // .json::<ApiResponse>()
978 /// // .await
979 /// // }
980 /// // ).await?;
981 /// # Ok(())
982 /// # }
983 /// ```
984 ///
985 /// # Performance
986 ///
987 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
988 /// - L2 hit: 2-5ms + deserialization + L1 promotion
989 /// - Compute: Your function time + serialization + L1+L2 storage
990 /// - Stampede protection: 99.6% latency reduction under high concurrency
991 ///
992 /// # Errors
993 ///
994 /// Returns error if:
995 /// - Compute function fails
996 /// - Serialization fails (invalid type for JSON)
997 /// - Deserialization fails (cache data doesn't match type T)
998 /// - Cache operations fail (Redis connection issues)
999 #[allow(clippy::too_many_lines)]
1000 pub async fn get_or_compute_typed<T, F, Fut>(
1001 &self,
1002 key: &str,
1003 strategy: CacheStrategy,
1004 compute_fn: F,
1005 ) -> Result<T>
1006 where
1007 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1008 F: FnOnce() -> Fut + Send,
1009 Fut: Future<Output = Result<T>> + Send,
1010 {
1011 self.total_requests.fetch_add(1, Ordering::Relaxed);
1012
1013 // 1. Try L1 first with zero-cost typed access (if backend supports it)
1014 // Note: For now we still use bytes path in CacheManager to keep it generic,
1015 // but backends like Moka use their internal typed cache.
1016 // We might want a dedicated Tier trait method for typed access later.
1017
1018 for (idx, tier) in self.tiers.iter().enumerate() {
1019 if let Some((bytes, ttl)) = tier.get_with_ttl(key).await {
1020 tier.record_hit();
1021 if tier.tier_level == 1 {
1022 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1023 } else if tier.tier_level == 2 {
1024 self.l2_hits.fetch_add(1, Ordering::Relaxed);
1025 }
1026
1027 match serde_json::from_slice::<T>(&bytes) {
1028 Ok(value) => {
1029 // Promotion
1030 if idx > 0 && tier.promotion_enabled {
1031 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1032 if let Some(l1_tier) = self.tiers.first() {
1033 let _ = l1_tier.set_with_ttl(key, bytes, promotion_ttl).await;
1034 self.promotions.fetch_add(1, Ordering::Relaxed);
1035 }
1036 }
1037 return Ok(value);
1038 }
1039 Err(e) => {
1040 warn!("Deserialization failed for key '{}': {}", key, e);
1041 // Fall through to next tier or compute
1042 }
1043 }
1044 }
1045 }
1046
1047 // 2. Compute with stampede protection (reuse get_or_compute_with logic for Bytes)
1048 let compute_fn_wrapped = || async {
1049 let val = compute_fn().await?;
1050 Ok(Bytes::from(serde_json::to_vec(&val)?))
1051 };
1052
1053 let bytes = self
1054 .get_or_compute_with(key, strategy, compute_fn_wrapped)
1055 .await?;
1056
1057 match serde_json::from_slice::<T>(&bytes) {
1058 Ok(val) => Ok(val),
1059 Err(e) => Err(anyhow::anyhow!(
1060 "Coalesced result deserialization failed: {e}"
1061 )),
1062 }
1063 }
1064
1065 /// Get comprehensive cache statistics
1066 ///
1067 /// In multi-tier mode, aggregates statistics from all tiers.
1068 /// In legacy mode, returns L1 and L2 stats.
1069 #[allow(dead_code)]
1070 pub fn get_stats(&self) -> CacheManagerStats {
1071 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1072 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1073 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1074 let misses = self.misses.load(Ordering::Relaxed);
1075
1076 CacheManagerStats {
1077 total_requests: total_reqs,
1078 l1_hits,
1079 l2_hits,
1080 total_hits: l1_hits + l2_hits,
1081 misses,
1082 hit_rate: if total_reqs > 0 {
1083 #[allow(clippy::cast_precision_loss)]
1084 {
1085 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1086 }
1087 } else {
1088 0.0
1089 },
1090 l1_hit_rate: if total_reqs > 0 {
1091 #[allow(clippy::cast_precision_loss)]
1092 {
1093 (l1_hits as f64 / total_reqs as f64) * 100.0
1094 }
1095 } else {
1096 0.0
1097 },
1098 promotions: self.promotions.load(Ordering::Relaxed),
1099 in_flight_requests: self.in_flight_requests.len(),
1100 }
1101 }
1102
1103 /// Get per-tier statistics (v0.5.0+)
1104 ///
1105 /// Returns statistics for each tier if multi-tier mode is enabled.
1106 /// Returns None if using legacy 2-tier mode.
1107 ///
1108 /// # Example
1109 /// ```rust,ignore
1110 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1111 /// for stats in tier_stats {
1112 /// println!("L{}: {} hits ({})",
1113 /// stats.tier_level,
1114 /// stats.hit_count(),
1115 /// stats.backend_name);
1116 /// }
1117 /// }
1118 /// ```
1119 pub fn get_tier_stats(&self) -> Vec<TierStats> {
1120 self.tiers.iter().map(|tier| tier.stats.clone()).collect()
1121 }
1122}
1123
1124/// Proxy wrapper to allow using `CacheBackend` where `DynL2CacheBackend` is expected
1125/// (Internal helper for `new_with_backends` to wrap L1 `CacheBackend` into `DynL2CacheBackend`)
1126struct ProxyL1ToL2(Arc<dyn CacheBackend>);
1127
1128impl CacheBackend for ProxyL1ToL2 {
1129 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
1130 self.0.get(key)
1131 }
1132
1133 fn set_with_ttl<'a>(
1134 &'a self,
1135 key: &'a str,
1136 value: Bytes,
1137 ttl: Duration,
1138 ) -> BoxFuture<'a, Result<()>> {
1139 self.0.set_with_ttl(key, value, ttl)
1140 }
1141
1142 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Result<()>> {
1143 self.0.remove(key)
1144 }
1145
1146 fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, Result<()>> {
1147 self.0.remove_pattern(pattern)
1148 }
1149
1150 fn health_check(&self) -> BoxFuture<'_, bool> {
1151 self.0.health_check()
1152 }
1153
1154 fn name(&self) -> &'static str {
1155 self.0.name()
1156 }
1157}
1158
1159impl L2CacheBackend for ProxyL1ToL2 {
1160 fn get_with_ttl<'a>(
1161 &'a self,
1162 key: &'a str,
1163 ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
1164 Box::pin(async move { self.0.get(key).await.map(|v| (v, None)) })
1165 }
1166}
1167
1168impl CacheManager {
1169 // ===== Redis Streams Methods =====
1170
1171 /// Publish data to Redis Stream
1172 ///
1173 /// # Arguments
1174 /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1175 /// * `fields` - Field-value pairs to publish
1176 /// * `maxlen` - Optional max length for stream trimming
1177 ///
1178 /// # Returns
1179 /// The entry ID generated by Redis
1180 ///
1181 /// # Errors
1182 /// Returns error if streaming backend is not configured
1183 pub async fn publish_to_stream(
1184 &self,
1185 stream_key: &str,
1186 fields: Vec<(String, String)>,
1187 maxlen: Option<usize>,
1188 ) -> Result<String> {
1189 match &self.streaming_backend {
1190 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1191 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1192 }
1193 }
1194
1195 /// Read latest entries from Redis Stream
1196 ///
1197 /// # Arguments
1198 /// * `stream_key` - Name of the stream
1199 /// * `count` - Number of latest entries to retrieve
1200 ///
1201 /// # Returns
1202 /// Vector of (`entry_id`, fields) tuples (newest first)
1203 ///
1204 /// # Errors
1205 /// Returns error if streaming backend is not configured
1206 pub async fn read_stream_latest(
1207 &self,
1208 stream_key: &str,
1209 count: usize,
1210 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1211 match &self.streaming_backend {
1212 Some(backend) => backend.stream_read_latest(stream_key, count).await,
1213 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1214 }
1215 }
1216
1217 /// Read from Redis Stream with optional blocking
1218 ///
1219 /// # Arguments
1220 /// * `stream_key` - Name of the stream
1221 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1222 /// * `count` - Max entries to retrieve
1223 /// * `block_ms` - Optional blocking timeout in ms
1224 ///
1225 /// # Returns
1226 /// Vector of (`entry_id`, fields) tuples
1227 ///
1228 /// # Errors
1229 /// Returns error if streaming backend is not configured
1230 pub async fn read_stream(
1231 &self,
1232 stream_key: &str,
1233 last_id: &str,
1234 count: usize,
1235 block_ms: Option<usize>,
1236 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1237 match &self.streaming_backend {
1238 Some(backend) => {
1239 backend
1240 .stream_read(stream_key, last_id, count, block_ms)
1241 .await
1242 }
1243 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1244 }
1245 }
1246
1247 // ===== Cache Invalidation Methods =====
1248
1249 /// Invalidate a cache key across all instances
1250 ///
1251 /// This removes the key from all cache tiers and broadcasts
1252 /// the invalidation to all other cache instances via Redis Pub/Sub.
1253 ///
1254 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1255 ///
1256 /// # Arguments
1257 /// * `key` - Cache key to invalidate
1258 ///
1259 /// # Example
1260 /// ```rust,no_run
1261 /// # use multi_tier_cache::CacheManager;
1262 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1263 /// // Invalidate user cache after profile update
1264 /// cache_manager.invalidate("user:123").await?;
1265 /// # Ok(())
1266 /// # }
1267 /// ```
1268 /// # Errors
1269 ///
1270 /// Returns an error if invalidation fails.
1271 pub async fn invalidate(&self, key: &str) -> Result<()> {
1272 // Remove from ALL tiers
1273 for tier in &self.tiers {
1274 if let Err(e) = tier.remove(key).await {
1275 warn!(
1276 "Failed to remove '{}' from L{}: {}",
1277 key, tier.tier_level, e
1278 );
1279 }
1280 }
1281
1282 // Broadcast to other instances
1283 if let Some(publisher) = &self.invalidation_publisher {
1284 let mut pub_lock = publisher.lock().await;
1285 let msg = InvalidationMessage::remove(key);
1286 pub_lock.publish(&msg).await?;
1287 self.invalidation_stats
1288 .messages_sent
1289 .fetch_add(1, Ordering::Relaxed);
1290 }
1291
1292 debug!("Invalidated '{}' across all instances", key);
1293 Ok(())
1294 }
1295
1296 /// Update cache value across all instances
1297 ///
1298 /// This updates the key in all cache tiers and broadcasts
1299 /// the update to all other cache instances, avoiding cache misses.
1300 ///
1301 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1302 ///
1303 /// # Arguments
1304 /// * `key` - Cache key to update
1305 /// * `value` - New value
1306 /// * `ttl` - Optional TTL (uses default if None)
1307 ///
1308 /// # Example
1309 /// ```rust,no_run
1310 /// # use multi_tier_cache::CacheManager;
1311 /// # use std::time::Duration;
1312 /// # use bytes::Bytes;
1313 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1314 /// // Update user cache with new data
1315 /// let user_data = Bytes::from("alice");
1316 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1317 /// # Ok(())
1318 /// # }
1319 /// ```
1320 /// # Errors
1321 ///
1322 /// Returns an error if cache update fails.
1323 pub async fn update_cache(&self, key: &str, value: Bytes, ttl: Option<Duration>) -> Result<()> {
1324 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1325
1326 // Update ALL tiers
1327 for tier in &self.tiers {
1328 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1329 warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1330 }
1331 }
1332
1333 // Broadcast update to other instances
1334 if let Some(publisher) = &self.invalidation_publisher {
1335 let mut pub_lock = publisher.lock().await;
1336 let msg = InvalidationMessage::update(key, value, Some(ttl));
1337 pub_lock.publish(&msg).await?;
1338 self.invalidation_stats
1339 .messages_sent
1340 .fetch_add(1, Ordering::Relaxed);
1341 }
1342
1343 debug!("Updated '{}' across all instances", key);
1344 Ok(())
1345 }
1346
1347 /// Invalidate all keys matching a pattern
1348 ///
1349 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1350 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1351 ///
1352 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1353 ///
1354 /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1355 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1356 ///
1357 /// # Arguments
1358 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1359 ///
1360 /// # Example
1361 /// ```rust,no_run
1362 /// # use multi_tier_cache::CacheManager;
1363 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1364 /// // Invalidate all user caches
1365 /// cache_manager.invalidate_pattern("user:*").await?;
1366 ///
1367 /// // Invalidate specific user's related caches
1368 /// cache_manager.invalidate_pattern("user:123:*").await?;
1369 /// # Ok(())
1370 /// # }
1371 /// ```
1372 /// # Errors
1373 ///
1374 /// Returns an error if invalidation fails.
1375 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1376 debug!(pattern = %pattern, "Invalidating pattern across all tiers");
1377
1378 // 1. Invalidate in all configured tiers
1379 for tier in &self.tiers {
1380 debug!(tier = %tier.tier_level, "Invalidating pattern in tier");
1381 tier.backend.remove_pattern(pattern).await?;
1382 }
1383
1384 // 2. Broadcast invalidation if publisher is configured
1385 if let Some(publisher) = &self.invalidation_publisher {
1386 let msg = InvalidationMessage::remove_pattern(pattern);
1387 publisher.lock().await.publish(&msg).await?;
1388 debug!(pattern = %pattern, "Broadcasted pattern invalidation");
1389 }
1390
1391 Ok(())
1392 }
1393
1394 /// Set value with automatic broadcast to all instances
1395 ///
1396 /// This is a write-through operation that updates the cache and
1397 /// broadcasts the update to all other instances automatically.
1398 ///
1399 /// # Arguments
1400 /// * `key` - Cache key
1401 /// * `value` - Value to cache
1402 /// * `strategy` - Cache strategy (determines TTL)
1403 ///
1404 /// # Example
1405 /// ```rust,no_run
1406 /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1407 /// # use bytes::Bytes;
1408 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1409 /// // Update and broadcast in one call
1410 /// let data = Bytes::from("active");
1411 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1412 /// # Ok(())
1413 /// # }
1414 /// ```
1415 /// # Errors
1416 ///
1417 /// Returns an error if cache set or broadcast fails.
1418 pub async fn set_with_broadcast(
1419 &self,
1420 key: &str,
1421 value: Bytes,
1422 strategy: CacheStrategy,
1423 ) -> Result<()> {
1424 let ttl = strategy.to_duration();
1425
1426 // Set in local caches
1427 self.set_with_strategy(key, value.clone(), strategy).await?;
1428
1429 // Broadcast update if invalidation is enabled
1430 if let Some(publisher) = &self.invalidation_publisher {
1431 let mut pub_lock = publisher.lock().await;
1432 let msg = InvalidationMessage::update(key, value, Some(ttl));
1433 pub_lock.publish(&msg).await?;
1434 self.invalidation_stats
1435 .messages_sent
1436 .fetch_add(1, Ordering::Relaxed);
1437 }
1438
1439 Ok(())
1440 }
1441
1442 /// Get invalidation statistics
1443 ///
1444 /// Returns statistics about invalidation operations if invalidation is enabled.
1445 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1446 if self.invalidation_subscriber.is_some() {
1447 Some(self.invalidation_stats.snapshot())
1448 } else {
1449 None
1450 }
1451 }
1452}
1453
1454/// Cache Manager statistics
1455#[allow(dead_code)]
1456#[derive(Debug, Clone)]
1457pub struct CacheManagerStats {
1458 pub total_requests: u64,
1459 pub l1_hits: u64,
1460 pub l2_hits: u64,
1461 pub total_hits: u64,
1462 pub misses: u64,
1463 pub hit_rate: f64,
1464 pub l1_hit_rate: f64,
1465 pub promotions: usize,
1466 pub in_flight_requests: usize,
1467}