multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use anyhow::Result;
6use dashmap::DashMap;
7use serde_json;
8use std::future::Future;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::time::Duration;
12use tokio::sync::{Mutex, broadcast};
13
14use tracing::{debug, error, info, warn};
15
16use crate::invalidation::{
17 AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
18 InvalidationStats, InvalidationSubscriber,
19};
20use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
21use crate::{L1Cache, L2Cache};
22use bytes::Bytes;
23use futures_util::future::BoxFuture;
24
25///// Type alias for the in-flight requests map
26/// Stores a broadcast sender for each active key computation
27type InFlightMap = DashMap<String, broadcast::Sender<Result<Bytes, Arc<anyhow::Error>>>>;
28
29/// Cache strategies for different data types
30#[derive(Debug, Clone)]
31#[allow(dead_code)]
32pub enum CacheStrategy {
33 /// Real-time data - 10 seconds TTL
34 RealTime,
35 /// Short-term data - 5 minutes TTL
36 ShortTerm,
37 /// Medium-term data - 1 hour TTL
38 MediumTerm,
39 /// Long-term data - 3 hours TTL
40 LongTerm,
41 /// Custom TTL
42 Custom(Duration),
43 /// Default strategy (5 minutes)
44 Default,
45}
46
47impl CacheStrategy {
48 /// Convert strategy to duration
49 #[must_use]
50 pub fn to_duration(&self) -> Duration {
51 match self {
52 Self::RealTime => Duration::from_secs(10),
53 Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
54 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
55 Self::LongTerm => Duration::from_secs(10800), // 3 hours
56 Self::Custom(duration) => *duration,
57 }
58 }
59}
60
61/// Statistics for a single cache tier
62#[derive(Debug)]
63pub struct TierStats {
64 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
65 pub tier_level: usize,
66 /// Number of cache hits at this tier
67 pub hits: AtomicU64,
68 /// Backend name for identification
69 pub backend_name: String,
70}
71
72impl Clone for TierStats {
73 fn clone(&self) -> Self {
74 Self {
75 tier_level: self.tier_level,
76 hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
77 backend_name: self.backend_name.clone(),
78 }
79 }
80}
81
82impl TierStats {
83 fn new(tier_level: usize, backend_name: String) -> Self {
84 Self {
85 tier_level,
86 hits: AtomicU64::new(0),
87 backend_name,
88 }
89 }
90
91 /// Get current hit count
92 pub fn hit_count(&self) -> u64 {
93 self.hits.load(Ordering::Relaxed)
94 }
95}
96
97/// A single cache tier in the multi-tier architecture
98#[derive(Clone)]
99pub struct CacheTier {
100 /// The backend for this tier
101 pub backend: Arc<dyn L2CacheBackend>,
102 /// Tier level (1 for fastest, increases for slower/cheaper tiers)
103 pub tier_level: usize,
104 /// Whether to promote keys FROM lower tiers TO this tier
105 pub promotion_enabled: bool,
106 /// TTL multiplier for this tier (e.g., L2 might store for 2x L1 TTL)
107 pub ttl_scale: f64,
108 /// Statistics for this tier
109 pub stats: TierStats,
110}
111
112impl CacheTier {
113 /// Create a new cache tier
114 pub fn new(
115 backend: Arc<dyn L2CacheBackend>,
116 tier_level: usize,
117 promotion_enabled: bool,
118 ttl_scale: f64,
119 ) -> Self {
120 let backend_name = backend.name().to_string();
121 Self {
122 backend,
123 tier_level,
124 promotion_enabled,
125 ttl_scale,
126 stats: TierStats::new(tier_level, backend_name),
127 }
128 }
129
130 /// Get value with TTL from this tier
131 async fn get_with_ttl(&self, key: &str) -> Option<(Bytes, Option<Duration>)> {
132 self.backend.get_with_ttl(key).await
133 }
134
135 /// Set value with TTL in this tier
136 async fn set_with_ttl(&self, key: &str, value: Bytes, ttl: Duration) -> Result<()> {
137 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
138 self.backend.set_with_ttl(key, value, scaled_ttl).await
139 }
140
141 /// Remove value from this tier
142 async fn remove(&self, key: &str) -> Result<()> {
143 self.backend.remove(key).await
144 }
145
146 /// Record a cache hit for this tier
147 fn record_hit(&self) {
148 self.stats.hits.fetch_add(1, Ordering::Relaxed);
149 }
150}
151
152/// Configuration for a cache tier (used in builder pattern)
153#[derive(Debug, Clone)]
154pub struct TierConfig {
155 /// Tier level (1, 2, 3, 4...)
156 pub tier_level: usize,
157 /// Enable promotion to upper tiers on hit
158 pub promotion_enabled: bool,
159 /// TTL scale factor (1.0 = same as base TTL)
160 pub ttl_scale: f64,
161}
162
163impl TierConfig {
164 /// Create new tier configuration
165 #[must_use]
166 pub fn new(tier_level: usize) -> Self {
167 Self {
168 tier_level,
169 promotion_enabled: true,
170 ttl_scale: 1.0,
171 }
172 }
173
174 /// Configure as L1 (hot tier)
175 #[must_use]
176 pub fn as_l1() -> Self {
177 Self {
178 tier_level: 1,
179 promotion_enabled: false, // L1 is already top tier
180 ttl_scale: 1.0,
181 }
182 }
183
184 /// Configure as L2 (warm tier)
185 #[must_use]
186 pub fn as_l2() -> Self {
187 Self {
188 tier_level: 2,
189 promotion_enabled: true,
190 ttl_scale: 1.0,
191 }
192 }
193
194 /// Configure as L3 (cold tier) with longer TTL
195 #[must_use]
196 pub fn as_l3() -> Self {
197 Self {
198 tier_level: 3,
199 promotion_enabled: true,
200 ttl_scale: 2.0, // Keep data 2x longer
201 }
202 }
203
204 /// Configure as L4 (archive tier) with much longer TTL
205 #[must_use]
206 pub fn as_l4() -> Self {
207 Self {
208 tier_level: 4,
209 promotion_enabled: true,
210 ttl_scale: 8.0, // Keep data 8x longer
211 }
212 }
213
214 /// Set promotion enabled
215 #[must_use]
216 pub fn with_promotion(mut self, enabled: bool) -> Self {
217 self.promotion_enabled = enabled;
218 self
219 }
220
221 /// Set TTL scale factor
222 #[must_use]
223 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
224 self.ttl_scale = scale;
225 self
226 }
227
228 /// Set tier level
229 #[must_use]
230 pub fn with_level(mut self, level: usize) -> Self {
231 self.tier_level = level;
232 self
233 }
234}
235
236pub struct CacheManager {
237 /// Ordered list of cache tiers (L1, L2, L3, ...)
238 tiers: Vec<CacheTier>,
239
240 /// Optional streaming backend
241 streaming_backend: Option<Arc<dyn StreamingBackend>>,
242 /// Statistics
243 total_requests: AtomicU64,
244 l1_hits: AtomicU64,
245 l2_hits: AtomicU64,
246 misses: AtomicU64,
247 promotions: AtomicUsize,
248 /// In-flight requests map (Broadcaster integration will replace this in Step 4)
249 in_flight_requests: Arc<InFlightMap>,
250 /// Invalidation publisher
251 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
252 /// Invalidation subscriber
253 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
254 /// Invalidation statistics
255 invalidation_stats: Arc<AtomicInvalidationStats>,
256}
257
258impl CacheManager {
259 /// Create new cache manager with trait objects (pluggable backends)
260 ///
261 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
262 ///
263 /// # Arguments
264 ///
265 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
266 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
267 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
268 ///
269 /// # Example
270 ///
271 /// ```rust,ignore
272 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
273 /// use std::sync::Arc;
274 ///
275 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
276 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
277 ///
278 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
279 /// ```
280 /// # Errors
281 ///
282 /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
283 pub fn new_with_backends(
284 l1_cache: Arc<dyn CacheBackend>,
285 l2_cache: Arc<dyn L2CacheBackend>,
286 streaming_backend: Option<Arc<dyn StreamingBackend>>,
287 ) -> Result<Self> {
288 debug!("Initializing Cache Manager with L1+L2 backends...");
289
290 let tiers = vec![
291 CacheTier::new(Arc::new(ProxyL1ToL2(l1_cache)), 1, false, 1.0),
292 CacheTier::new(l2_cache, 2, true, 1.0),
293 ];
294
295 Ok(Self {
296 tiers,
297 streaming_backend,
298 total_requests: AtomicU64::new(0),
299 l1_hits: AtomicU64::new(0),
300 l2_hits: AtomicU64::new(0),
301 misses: AtomicU64::new(0),
302 promotions: AtomicUsize::new(0),
303 in_flight_requests: Arc::new(DashMap::new()),
304 invalidation_publisher: None,
305 invalidation_subscriber: None,
306 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
307 })
308 }
309
310 /// Create new cache manager with default backends (backward compatible)
311 ///
312 /// This is the legacy constructor maintained for backward compatibility.
313 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
314 ///
315 /// # Arguments
316 ///
317 /// * `l1_cache` - Moka L1 cache instance
318 /// * `l2_cache` - Redis L2 cache instance
319 /// # Errors
320 ///
321 /// Returns an error if Redis connection fails.
322 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
323 debug!("Initializing Cache Manager...");
324
325 // Convert concrete types to trait objects
326 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
327 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
328
329 // Create RedisStreams backend for streaming functionality
330 let redis_url =
331 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
332 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
333 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
334
335 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend))
336 }
337
338 /// Create new cache manager with invalidation support
339 ///
340 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
341 ///
342 /// # Arguments
343 ///
344 /// * `l1_cache` - Moka L1 cache instance
345 /// * `l2_cache` - Redis L2 cache instance
346 /// * `redis_url` - Redis connection URL for Pub/Sub
347 /// * `config` - Invalidation configuration
348 ///
349 /// # Example
350 ///
351 /// ```rust,ignore
352 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
353 ///
354 /// let config = InvalidationConfig {
355 /// channel: "my_app:cache:invalidate".to_string(),
356 /// ..Default::default()
357 /// };
358 ///
359 /// let manager = CacheManager::new_with_invalidation(
360 /// l1, l2, "redis://localhost", config
361 /// ).await?;
362 /// ```
363 /// # Errors
364 ///
365 /// Returns an error if Redis connection fails or invalidation setup fails.
366 pub async fn new_with_invalidation(
367 l1_cache: Arc<L1Cache>,
368 l2_cache: Arc<L2Cache>,
369 redis_url: &str,
370 config: InvalidationConfig,
371 ) -> Result<Self> {
372 debug!("Initializing Cache Manager with Invalidation...");
373 debug!(" Pub/Sub channel: {}", config.channel);
374
375 // Convert concrete types to trait objects
376 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
377 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
378
379 // Create RedisStreams backend for streaming functionality
380 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
381 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
382
383 // Create publisher
384 let client = redis::Client::open(redis_url)?;
385 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
386 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
387
388 // Create subscriber
389 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
390 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
391
392 let tiers = vec![
393 CacheTier::new(Arc::new(ProxyL1ToL2(l1_backend)), 1, false, 1.0),
394 CacheTier::new(l2_backend, 2, true, 1.0),
395 ];
396
397 let manager = Self {
398 tiers,
399 streaming_backend: Some(streaming_backend),
400 total_requests: AtomicU64::new(0),
401 l1_hits: AtomicU64::new(0),
402 l2_hits: AtomicU64::new(0),
403 misses: AtomicU64::new(0),
404 promotions: AtomicUsize::new(0),
405 in_flight_requests: Arc::new(DashMap::new()),
406 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
407 invalidation_subscriber: Some(Arc::new(subscriber)),
408 invalidation_stats,
409 };
410
411 // Start subscriber with handler
412 manager.start_invalidation_subscriber();
413
414 info!("Cache Manager initialized with invalidation support");
415
416 Ok(manager)
417 }
418
419 /// Create new cache manager with multi-tier architecture (v0.5.0+)
420 ///
421 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
422 /// Tiers are checked in order (lower `tier_level` = faster/hotter).
423 ///
424 /// # Arguments
425 ///
426 /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
427 /// * `streaming_backend` - Optional streaming backend
428 ///
429 /// # Example
430 ///
431 /// ```rust,ignore
432 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
433 /// use std::sync::Arc;
434 ///
435 /// // L1 + L2 + L3 setup
436 /// let l1 = Arc::new(L1Cache::new()?);
437 /// let l2 = Arc::new(L2Cache::new().await?);
438 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
439 ///
440 /// let tiers = vec![
441 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
442 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
443 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
444 /// ];
445 ///
446 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
447 /// ```
448 /// # Errors
449 ///
450 /// Returns an error if tiers are not sorted by level or if no tiers are provided.
451 pub fn new_with_tiers(
452 tiers: Vec<CacheTier>,
453 streaming_backend: Option<Arc<dyn StreamingBackend>>,
454 ) -> Result<Self> {
455 debug!("Initializing Multi-Tier Cache Manager...");
456 debug!(" Tier count: {}", tiers.len());
457 for tier in &tiers {
458 debug!(
459 " L{}: {} (promotion={}, ttl_scale={})",
460 tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
461 );
462 }
463
464 // Validate tiers are sorted by level
465 for i in 1..tiers.len() {
466 if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1))
467 && current.tier_level <= prev.tier_level
468 {
469 anyhow::bail!(
470 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
471 current.tier_level,
472 prev.tier_level
473 );
474 }
475 }
476
477 Ok(Self {
478 tiers,
479 streaming_backend,
480 total_requests: AtomicU64::new(0),
481 l1_hits: AtomicU64::new(0),
482 l2_hits: AtomicU64::new(0),
483 misses: AtomicU64::new(0),
484 promotions: AtomicUsize::new(0),
485 in_flight_requests: Arc::new(DashMap::new()),
486 invalidation_publisher: None,
487 invalidation_subscriber: None,
488 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
489 })
490 }
491
492 /// Start the invalidation subscriber background task
493 fn start_invalidation_subscriber(&self) {
494 if let Some(subscriber) = &self.invalidation_subscriber {
495 let tiers = self.tiers.clone();
496
497 subscriber.start(move |msg| {
498 let tiers = tiers.clone();
499 async move {
500 for tier in &tiers {
501 match &msg {
502 InvalidationMessage::Remove { key } => {
503 if let Err(e) = tier.backend.remove(key).await {
504 warn!(
505 "Failed to remove '{}' from L{}: {}",
506 key, tier.tier_level, e
507 );
508 }
509 }
510 InvalidationMessage::Update {
511 key,
512 value,
513 ttl_secs,
514 } => {
515 let ttl = ttl_secs
516 .map_or_else(|| Duration::from_secs(300), Duration::from_secs);
517 if let Err(e) =
518 tier.backend.set_with_ttl(key, value.clone(), ttl).await
519 {
520 warn!(
521 "Failed to update '{}' in L{}: {}",
522 key, tier.tier_level, e
523 );
524 }
525 }
526 InvalidationMessage::RemovePattern { pattern } => {
527 if let Err(e) = tier.backend.remove_pattern(pattern).await {
528 warn!(
529 "Failed to remove pattern '{}' from L{}: {}",
530 pattern, tier.tier_level, e
531 );
532 }
533 }
534 InvalidationMessage::RemoveBulk { keys } => {
535 for key in keys {
536 if let Err(e) = tier.backend.remove(key).await {
537 warn!(
538 "Failed to remove '{}' from L{}: {}",
539 key, tier.tier_level, e
540 );
541 }
542 }
543 }
544 }
545 }
546 Ok(())
547 }
548 });
549
550 info!("Invalidation subscriber started across all tiers");
551 }
552 }
553
554 /// Get value from cache using multi-tier architecture (v0.5.0+)
555 ///
556 /// This method iterates through all configured tiers and automatically promotes
557 /// to upper tiers on cache hit.
558 async fn get_multi_tier(&self, key: &str) -> Result<Option<Bytes>> {
559 // Try each tier sequentially (sorted by tier_level)
560 for (tier_index, tier) in self.tiers.iter().enumerate() {
561 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
562 // Cache hit!
563 tier.record_hit();
564
565 // Promote to all upper tiers (if promotion enabled)
566 if tier.promotion_enabled && tier_index > 0 {
567 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
568
569 // Promote to all tiers above this one
570 for upper_tier in self.tiers.iter().take(tier_index).rev() {
571 if let Err(e) = upper_tier
572 .set_with_ttl(key, value.clone(), promotion_ttl)
573 .await
574 {
575 warn!(
576 "Failed to promote '{}' from L{} to L{}: {}",
577 key, tier.tier_level, upper_tier.tier_level, e
578 );
579 } else {
580 self.promotions.fetch_add(1, Ordering::Relaxed);
581 debug!(
582 "Promoted '{}' from L{} to L{} (TTL: {:?})",
583 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
584 );
585 }
586 }
587 }
588
589 return Ok(Some(value));
590 }
591 }
592
593 // Cache miss across all tiers
594 Ok(None)
595 }
596
597 /// Get value from cache (L1 first, then L2 fallback with promotion)
598 ///
599 /// This method now includes built-in Cache Stampede protection when cache misses occur.
600 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
601 /// unnecessary duplicate work on external data sources.
602 ///
603 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
604 ///
605 /// # Arguments
606 /// * `key` - Cache key to retrieve
607 ///
608 /// # Returns
609 /// * `Ok(Some(value))` - Cache hit, value found in any tier
610 /// * `Ok(None)` - Cache miss, value not found in any cache
611 /// * `Err(error)` - Cache operation failed
612 /// # Errors
613 ///
614 /// Returns an error if cache operation fails.
615 ///
616 /// # Panics
617 ///
618 /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
619 pub async fn get(&self, key: &str) -> Result<Option<Bytes>> {
620 self.total_requests.fetch_add(1, Ordering::Relaxed);
621
622 // Fast path for L1 (first tier) - no locking needed
623 if let Some(tier1) = self.tiers.first()
624 && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
625 {
626 tier1.record_hit();
627 // Update legacy stats for backward compatibility
628 self.l1_hits.fetch_add(1, Ordering::Relaxed);
629 return Ok(Some(value));
630 }
631
632 // L1 miss - use stampede protection for lower tiers
633 let key_owned = key.to_string();
634 let lock_guard = self
635 .in_flight_requests
636 .entry(key_owned.clone())
637 .or_insert_with(|| broadcast::Sender::new(1))
638 .clone();
639
640 let mut rx = lock_guard.subscribe();
641
642 // If there are other receivers, someone else is computing, wait for it
643 if lock_guard.receiver_count() > 1 {
644 match rx.recv().await {
645 Ok(Ok(value)) => return Ok(Some(value)),
646 Ok(Err(e)) => {
647 return Err(anyhow::anyhow!("Computation failed in another thread: {e}"));
648 }
649 Err(_) => {} // Fall through to re-compute if sender dropped or channel empty
650 }
651 }
652
653 // Double-check L1 after acquiring lock (or if we are the first to compute)
654 if let Some(tier1) = self.tiers.first()
655 && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
656 {
657 tier1.record_hit();
658 self.l1_hits.fetch_add(1, Ordering::Relaxed);
659 let _ = lock_guard.send(Ok(value.clone())); // Notify any waiting subscribers
660 return Ok(Some(value));
661 }
662
663 // Check remaining tiers with promotion
664 let result = self.get_multi_tier(key).await?;
665
666 if let Some(val) = result.clone() {
667 // Hit in L2+ tier - update legacy stats
668 if self.tiers.len() >= 2 {
669 self.l2_hits.fetch_add(1, Ordering::Relaxed);
670 }
671
672 // Notify any waiting subscribers
673 let _ = lock_guard.send(Ok(val.clone()));
674
675 // Remove the in-flight entry after computation/retrieval
676 self.in_flight_requests.remove(key);
677
678 Ok(Some(val))
679 } else {
680 self.misses.fetch_add(1, Ordering::Relaxed);
681
682 // Remove the in-flight entry after computation/retrieval
683 self.in_flight_requests.remove(key);
684
685 Ok(None)
686 }
687 }
688
689 /// Set value with specific cache strategy (all tiers)
690 ///
691 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
692 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
693 /// # Errors
694 ///
695 /// Returns an error if cache set operation fails.
696 pub async fn set_with_strategy(
697 &self,
698 key: &str,
699 value: Bytes,
700 strategy: CacheStrategy,
701 ) -> Result<()> {
702 let ttl = strategy.to_duration();
703
704 let mut success_count = 0;
705 let mut last_error = None;
706
707 for tier in &self.tiers {
708 match tier.set_with_ttl(key, value.clone(), ttl).await {
709 Ok(()) => {
710 success_count += 1;
711 }
712 Err(e) => {
713 error!(
714 "L{} cache set failed for key '{}': {}",
715 tier.tier_level, key, e
716 );
717 last_error = Some(e);
718 }
719 }
720 }
721
722 if success_count > 0 {
723 debug!(
724 "[Cache] Stored '{}' in {}/{} tiers (base TTL: {:?})",
725 key,
726 success_count,
727 self.tiers.len(),
728 ttl
729 );
730 return Ok(());
731 }
732
733 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{key}'")))
734 }
735
736 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
737 ///
738 /// This method provides comprehensive Cache Stampede protection:
739 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
740 /// 2. Check L2 cache with mutex-based coalescing
741 /// 3. Compute fresh data with protection against concurrent computations
742 ///
743 /// # Arguments
744 /// * `key` - Cache key
745 /// * `strategy` - Cache strategy for TTL and storage behavior
746 /// * `compute_fn` - Async function to compute the value if not in any cache
747 ///
748 /// # Example
749 /// ```ignore
750 /// let api_data = cache_manager.get_or_compute_with(
751 /// "api_response",
752 /// CacheStrategy::RealTime,
753 /// || async {
754 /// fetch_data_from_api().await
755 /// }
756 /// ).await?;
757 /// ```
758 #[allow(dead_code)]
759 /// # Errors
760 ///
761 /// Returns an error if compute function fails or cache operations fail.
762 pub async fn get_or_compute_with<F, Fut>(
763 &self,
764 key: &str,
765 strategy: CacheStrategy,
766 compute_fn: F,
767 ) -> Result<Bytes>
768 where
769 F: FnOnce() -> Fut + Send,
770 Fut: Future<Output = Result<Bytes>> + Send,
771 {
772 self.total_requests.fetch_add(1, Ordering::Relaxed);
773
774 // 1. Try tiers sequentially first
775 for (idx, tier) in self.tiers.iter().enumerate() {
776 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
777 tier.record_hit();
778 if tier.tier_level == 1 {
779 self.l1_hits.fetch_add(1, Ordering::Relaxed);
780 } else if tier.tier_level == 2 {
781 self.l2_hits.fetch_add(1, Ordering::Relaxed);
782 }
783
784 // Promotion to L1 if hit was in a lower tier
785 if idx > 0 && tier.promotion_enabled {
786 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
787 if let Some(l1_tier) = self.tiers.first() {
788 let _ = l1_tier
789 .set_with_ttl(key, value.clone(), promotion_ttl)
790 .await;
791 }
792 }
793
794 return Ok(value);
795 }
796 }
797
798 // 2. Cache miss across all tiers - use stampede protection
799 let (tx, mut rx) = match self.in_flight_requests.entry(key.to_string()) {
800 dashmap::mapref::entry::Entry::Occupied(entry) => {
801 let tx = entry.get().clone();
802 (tx.clone(), tx.subscribe())
803 }
804 dashmap::mapref::entry::Entry::Vacant(entry) => {
805 let (tx, _) = broadcast::channel(1);
806 entry.insert(tx.clone());
807 (tx.clone(), tx.subscribe())
808 }
809 };
810
811 if tx.receiver_count() > 1 {
812 // Someone else is computing, wait for it
813 match rx.recv().await {
814 Ok(Ok(value)) => return Ok(value),
815 Ok(Err(e)) => {
816 return Err(anyhow::anyhow!("Computation failed in another thread: {e}"));
817 }
818 Err(_) => {} // Fall through to re-compute
819 }
820 }
821
822 // 3. Re-check cache after receiving/creating broadcaster (double-check pattern)
823 for tier in &self.tiers {
824 if let Some((value, _)) = tier.get_with_ttl(key).await {
825 let _ = tx.send(Ok(value.clone()));
826 return Ok(value);
827 }
828 }
829
830 // 4. Miss - compute fresh data
831 debug!(
832 "Computing fresh data for key: '{}' (Stampede protected)",
833 key
834 );
835
836 let result = compute_fn().await;
837
838 // Remove from in_flight BEFORE broadcasting
839 self.in_flight_requests.remove(key);
840
841 match &result {
842 Ok(value) => {
843 let _ = self.set_with_strategy(key, value.clone(), strategy).await;
844 let _ = tx.send(Ok(value.clone()));
845 }
846 Err(e) => {
847 let _ = tx.send(Err(Arc::new(anyhow::anyhow!("{e}"))));
848 }
849 }
850
851 result
852 }
853
854 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
855 ///
856 /// This method provides the same functionality as `get_or_compute_with()` but with
857 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
858 /// API calls, or any computation that returns structured data.
859 ///
860 /// # Type Safety
861 ///
862 /// - Returns your actual type `T` instead of `serde_json::Value`
863 /// - Compiler enforces Serialize + `DeserializeOwned` bounds
864 /// - No manual JSON conversion needed
865 ///
866 /// # Cache Flow
867 ///
868 /// 1. Check L1 cache → deserialize if found
869 /// 2. Check L2 cache → deserialize + promote to L1 if found
870 /// 3. Execute `compute_fn` → serialize → store in L1+L2
871 /// 4. Full stampede protection (only ONE request computes)
872 ///
873 /// # Arguments
874 ///
875 /// * `key` - Cache key
876 /// * `strategy` - Cache strategy for TTL
877 /// * `compute_fn` - Async function returning `Result<T>`
878 ///
879 /// # Example - Database Query
880 ///
881 /// ```no_run
882 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
883 /// # use std::sync::Arc;
884 /// # use serde::{Serialize, Deserialize};
885 /// # async fn example() -> anyhow::Result<()> {
886 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
887 /// # let l2 = Arc::new(L2Cache::new().await?);
888 /// # let cache_manager = CacheManager::new(l1, l2);
889 ///
890 /// #[derive(Serialize, Deserialize)]
891 /// struct User {
892 /// id: i64,
893 /// name: String,
894 /// }
895 ///
896 /// // Type-safe database caching (example - requires sqlx)
897 /// // let user: User = cache_manager.get_or_compute_typed(
898 /// // "user:123",
899 /// // CacheStrategy::MediumTerm,
900 /// // || async {
901 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
902 /// // .bind(123)
903 /// // .fetch_one(&pool)
904 /// // .await
905 /// // }
906 /// // ).await?;
907 /// # Ok(())
908 /// # }
909 /// ```
910 ///
911 /// # Example - API Call
912 ///
913 /// ```no_run
914 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
915 /// # use std::sync::Arc;
916 /// # use serde::{Serialize, Deserialize};
917 /// # async fn example() -> anyhow::Result<()> {
918 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
919 /// # let l2 = Arc::new(L2Cache::new().await?);
920 /// # let cache_manager = CacheManager::new(l1, l2);
921 /// #[derive(Serialize, Deserialize)]
922 /// struct ApiResponse {
923 /// data: String,
924 /// timestamp: i64,
925 /// }
926 ///
927 /// // API call caching (example - requires reqwest)
928 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
929 /// // "api:endpoint",
930 /// // CacheStrategy::RealTime,
931 /// // || async {
932 /// // reqwest::get("https://api.example.com/data")
933 /// // .await?
934 /// // .json::<ApiResponse>()
935 /// // .await
936 /// // }
937 /// // ).await?;
938 /// # Ok(())
939 /// # }
940 /// ```
941 ///
942 /// # Performance
943 ///
944 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
945 /// - L2 hit: 2-5ms + deserialization + L1 promotion
946 /// - Compute: Your function time + serialization + L1+L2 storage
947 /// - Stampede protection: 99.6% latency reduction under high concurrency
948 ///
949 /// # Errors
950 ///
951 /// Returns error if:
952 /// - Compute function fails
953 /// - Serialization fails (invalid type for JSON)
954 /// - Deserialization fails (cache data doesn't match type T)
955 /// - Cache operations fail (Redis connection issues)
956 #[allow(clippy::too_many_lines)]
957 pub async fn get_or_compute_typed<T, F, Fut>(
958 &self,
959 key: &str,
960 strategy: CacheStrategy,
961 compute_fn: F,
962 ) -> Result<T>
963 where
964 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
965 F: FnOnce() -> Fut + Send,
966 Fut: Future<Output = Result<T>> + Send,
967 {
968 self.total_requests.fetch_add(1, Ordering::Relaxed);
969
970 // 1. Try L1 first with zero-cost typed access (if backend supports it)
971 // Note: For now we still use bytes path in CacheManager to keep it generic,
972 // but backends like Moka use their internal typed cache.
973 // We might want a dedicated Tier trait method for typed access later.
974
975 for (idx, tier) in self.tiers.iter().enumerate() {
976 if let Some((bytes, ttl)) = tier.get_with_ttl(key).await {
977 tier.record_hit();
978 if tier.tier_level == 1 {
979 self.l1_hits.fetch_add(1, Ordering::Relaxed);
980 } else if tier.tier_level == 2 {
981 self.l2_hits.fetch_add(1, Ordering::Relaxed);
982 }
983
984 match serde_json::from_slice::<T>(&bytes) {
985 Ok(value) => {
986 // Promotion
987 if idx > 0 && tier.promotion_enabled {
988 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
989 if let Some(l1_tier) = self.tiers.first() {
990 let _ = l1_tier.set_with_ttl(key, bytes, promotion_ttl).await;
991 self.promotions.fetch_add(1, Ordering::Relaxed);
992 }
993 }
994 return Ok(value);
995 }
996 Err(e) => {
997 warn!("Deserialization failed for key '{}': {}", key, e);
998 // Fall through to next tier or compute
999 }
1000 }
1001 }
1002 }
1003
1004 // 2. Compute with stampede protection (reuse get_or_compute_with logic for Bytes)
1005 let compute_fn_wrapped = || async {
1006 let val = compute_fn().await?;
1007 Ok(Bytes::from(serde_json::to_vec(&val)?))
1008 };
1009
1010 let bytes = self
1011 .get_or_compute_with(key, strategy, compute_fn_wrapped)
1012 .await?;
1013
1014 match serde_json::from_slice::<T>(&bytes) {
1015 Ok(val) => Ok(val),
1016 Err(e) => Err(anyhow::anyhow!(
1017 "Coalesced result deserialization failed: {e}"
1018 )),
1019 }
1020 }
1021
1022 /// Get comprehensive cache statistics
1023 ///
1024 /// In multi-tier mode, aggregates statistics from all tiers.
1025 /// In legacy mode, returns L1 and L2 stats.
1026 #[allow(dead_code)]
1027 pub fn get_stats(&self) -> CacheManagerStats {
1028 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1029 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1030 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1031 let misses = self.misses.load(Ordering::Relaxed);
1032
1033 CacheManagerStats {
1034 total_requests: total_reqs,
1035 l1_hits,
1036 l2_hits,
1037 total_hits: l1_hits + l2_hits,
1038 misses,
1039 hit_rate: if total_reqs > 0 {
1040 #[allow(clippy::cast_precision_loss)]
1041 {
1042 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1043 }
1044 } else {
1045 0.0
1046 },
1047 l1_hit_rate: if total_reqs > 0 {
1048 #[allow(clippy::cast_precision_loss)]
1049 {
1050 (l1_hits as f64 / total_reqs as f64) * 100.0
1051 }
1052 } else {
1053 0.0
1054 },
1055 promotions: self.promotions.load(Ordering::Relaxed),
1056 in_flight_requests: self.in_flight_requests.len(),
1057 }
1058 }
1059
1060 /// Get per-tier statistics (v0.5.0+)
1061 ///
1062 /// Returns statistics for each tier if multi-tier mode is enabled.
1063 /// Returns None if using legacy 2-tier mode.
1064 ///
1065 /// # Example
1066 /// ```rust,ignore
1067 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1068 /// for stats in tier_stats {
1069 /// println!("L{}: {} hits ({})",
1070 /// stats.tier_level,
1071 /// stats.hit_count(),
1072 /// stats.backend_name);
1073 /// }
1074 /// }
1075 /// ```
1076 pub fn get_tier_stats(&self) -> Vec<TierStats> {
1077 self.tiers.iter().map(|tier| tier.stats.clone()).collect()
1078 }
1079}
1080
1081/// Proxy wrapper to allow using `CacheBackend` where `DynL2CacheBackend` is expected
1082/// (Internal helper for `new_with_backends` to wrap L1 `CacheBackend` into `DynL2CacheBackend`)
1083struct ProxyL1ToL2(Arc<dyn CacheBackend>);
1084
1085impl CacheBackend for ProxyL1ToL2 {
1086 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
1087 self.0.get(key)
1088 }
1089
1090 fn set_with_ttl<'a>(
1091 &'a self,
1092 key: &'a str,
1093 value: Bytes,
1094 ttl: Duration,
1095 ) -> BoxFuture<'a, Result<()>> {
1096 self.0.set_with_ttl(key, value, ttl)
1097 }
1098
1099 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Result<()>> {
1100 self.0.remove(key)
1101 }
1102
1103 fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, Result<()>> {
1104 self.0.remove_pattern(pattern)
1105 }
1106
1107 fn health_check(&self) -> BoxFuture<'_, bool> {
1108 self.0.health_check()
1109 }
1110
1111 fn name(&self) -> &'static str {
1112 self.0.name()
1113 }
1114}
1115
1116impl L2CacheBackend for ProxyL1ToL2 {
1117 fn get_with_ttl<'a>(
1118 &'a self,
1119 key: &'a str,
1120 ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
1121 Box::pin(async move { self.0.get(key).await.map(|v| (v, None)) })
1122 }
1123}
1124
1125impl CacheManager {
1126 // ===== Redis Streams Methods =====
1127
1128 /// Publish data to Redis Stream
1129 ///
1130 /// # Arguments
1131 /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1132 /// * `fields` - Field-value pairs to publish
1133 /// * `maxlen` - Optional max length for stream trimming
1134 ///
1135 /// # Returns
1136 /// The entry ID generated by Redis
1137 ///
1138 /// # Errors
1139 /// Returns error if streaming backend is not configured
1140 pub async fn publish_to_stream(
1141 &self,
1142 stream_key: &str,
1143 fields: Vec<(String, String)>,
1144 maxlen: Option<usize>,
1145 ) -> Result<String> {
1146 match &self.streaming_backend {
1147 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1148 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1149 }
1150 }
1151
1152 /// Read latest entries from Redis Stream
1153 ///
1154 /// # Arguments
1155 /// * `stream_key` - Name of the stream
1156 /// * `count` - Number of latest entries to retrieve
1157 ///
1158 /// # Returns
1159 /// Vector of (`entry_id`, fields) tuples (newest first)
1160 ///
1161 /// # Errors
1162 /// Returns error if streaming backend is not configured
1163 pub async fn read_stream_latest(
1164 &self,
1165 stream_key: &str,
1166 count: usize,
1167 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1168 match &self.streaming_backend {
1169 Some(backend) => backend.stream_read_latest(stream_key, count).await,
1170 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1171 }
1172 }
1173
1174 /// Read from Redis Stream with optional blocking
1175 ///
1176 /// # Arguments
1177 /// * `stream_key` - Name of the stream
1178 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1179 /// * `count` - Max entries to retrieve
1180 /// * `block_ms` - Optional blocking timeout in ms
1181 ///
1182 /// # Returns
1183 /// Vector of (`entry_id`, fields) tuples
1184 ///
1185 /// # Errors
1186 /// Returns error if streaming backend is not configured
1187 pub async fn read_stream(
1188 &self,
1189 stream_key: &str,
1190 last_id: &str,
1191 count: usize,
1192 block_ms: Option<usize>,
1193 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1194 match &self.streaming_backend {
1195 Some(backend) => {
1196 backend
1197 .stream_read(stream_key, last_id, count, block_ms)
1198 .await
1199 }
1200 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1201 }
1202 }
1203
1204 // ===== Cache Invalidation Methods =====
1205
1206 /// Invalidate a cache key across all instances
1207 ///
1208 /// This removes the key from all cache tiers and broadcasts
1209 /// the invalidation to all other cache instances via Redis Pub/Sub.
1210 ///
1211 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1212 ///
1213 /// # Arguments
1214 /// * `key` - Cache key to invalidate
1215 ///
1216 /// # Example
1217 /// ```rust,no_run
1218 /// # use multi_tier_cache::CacheManager;
1219 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1220 /// // Invalidate user cache after profile update
1221 /// cache_manager.invalidate("user:123").await?;
1222 /// # Ok(())
1223 /// # }
1224 /// ```
1225 /// # Errors
1226 ///
1227 /// Returns an error if invalidation fails.
1228 pub async fn invalidate(&self, key: &str) -> Result<()> {
1229 // Remove from ALL tiers
1230 for tier in &self.tiers {
1231 if let Err(e) = tier.remove(key).await {
1232 warn!(
1233 "Failed to remove '{}' from L{}: {}",
1234 key, tier.tier_level, e
1235 );
1236 }
1237 }
1238
1239 // Broadcast to other instances
1240 if let Some(publisher) = &self.invalidation_publisher {
1241 let mut pub_lock = publisher.lock().await;
1242 let msg = InvalidationMessage::remove(key);
1243 pub_lock.publish(&msg).await?;
1244 self.invalidation_stats
1245 .messages_sent
1246 .fetch_add(1, Ordering::Relaxed);
1247 }
1248
1249 debug!("Invalidated '{}' across all instances", key);
1250 Ok(())
1251 }
1252
1253 /// Update cache value across all instances
1254 ///
1255 /// This updates the key in all cache tiers and broadcasts
1256 /// the update to all other cache instances, avoiding cache misses.
1257 ///
1258 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1259 ///
1260 /// # Arguments
1261 /// * `key` - Cache key to update
1262 /// * `value` - New value
1263 /// * `ttl` - Optional TTL (uses default if None)
1264 ///
1265 /// # Example
1266 /// ```rust,no_run
1267 /// # use multi_tier_cache::CacheManager;
1268 /// # use std::time::Duration;
1269 /// # use bytes::Bytes;
1270 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1271 /// // Update user cache with new data
1272 /// let user_data = Bytes::from("alice");
1273 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1274 /// # Ok(())
1275 /// # }
1276 /// ```
1277 /// # Errors
1278 ///
1279 /// Returns an error if cache update fails.
1280 pub async fn update_cache(&self, key: &str, value: Bytes, ttl: Option<Duration>) -> Result<()> {
1281 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1282
1283 // Update ALL tiers
1284 for tier in &self.tiers {
1285 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1286 warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1287 }
1288 }
1289
1290 // Broadcast update to other instances
1291 if let Some(publisher) = &self.invalidation_publisher {
1292 let mut pub_lock = publisher.lock().await;
1293 let msg = InvalidationMessage::update(key, value, Some(ttl));
1294 pub_lock.publish(&msg).await?;
1295 self.invalidation_stats
1296 .messages_sent
1297 .fetch_add(1, Ordering::Relaxed);
1298 }
1299
1300 debug!("Updated '{}' across all instances", key);
1301 Ok(())
1302 }
1303
1304 /// Invalidate all keys matching a pattern
1305 ///
1306 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1307 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1308 ///
1309 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1310 ///
1311 /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1312 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1313 ///
1314 /// # Arguments
1315 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1316 ///
1317 /// # Example
1318 /// ```rust,no_run
1319 /// # use multi_tier_cache::CacheManager;
1320 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1321 /// // Invalidate all user caches
1322 /// cache_manager.invalidate_pattern("user:*").await?;
1323 ///
1324 /// // Invalidate specific user's related caches
1325 /// cache_manager.invalidate_pattern("user:123:*").await?;
1326 /// # Ok(())
1327 /// # }
1328 /// ```
1329 /// # Errors
1330 ///
1331 /// Returns an error if invalidation fails.
1332 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1333 debug!(pattern = %pattern, "Invalidating pattern across all tiers");
1334
1335 // 1. Invalidate in all configured tiers
1336 for tier in &self.tiers {
1337 debug!(tier = %tier.tier_level, "Invalidating pattern in tier");
1338 tier.backend.remove_pattern(pattern).await?;
1339 }
1340
1341 // 2. Broadcast invalidation if publisher is configured
1342 if let Some(publisher) = &self.invalidation_publisher {
1343 let msg = InvalidationMessage::remove_pattern(pattern);
1344 publisher.lock().await.publish(&msg).await?;
1345 debug!(pattern = %pattern, "Broadcasted pattern invalidation");
1346 }
1347
1348 Ok(())
1349 }
1350
1351 /// Set value with automatic broadcast to all instances
1352 ///
1353 /// This is a write-through operation that updates the cache and
1354 /// broadcasts the update to all other instances automatically.
1355 ///
1356 /// # Arguments
1357 /// * `key` - Cache key
1358 /// * `value` - Value to cache
1359 /// * `strategy` - Cache strategy (determines TTL)
1360 ///
1361 /// # Example
1362 /// ```rust,no_run
1363 /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1364 /// # use bytes::Bytes;
1365 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1366 /// // Update and broadcast in one call
1367 /// let data = Bytes::from("active");
1368 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1369 /// # Ok(())
1370 /// # }
1371 /// ```
1372 /// # Errors
1373 ///
1374 /// Returns an error if cache set or broadcast fails.
1375 pub async fn set_with_broadcast(
1376 &self,
1377 key: &str,
1378 value: Bytes,
1379 strategy: CacheStrategy,
1380 ) -> Result<()> {
1381 let ttl = strategy.to_duration();
1382
1383 // Set in local caches
1384 self.set_with_strategy(key, value.clone(), strategy).await?;
1385
1386 // Broadcast update if invalidation is enabled
1387 if let Some(publisher) = &self.invalidation_publisher {
1388 let mut pub_lock = publisher.lock().await;
1389 let msg = InvalidationMessage::update(key, value, Some(ttl));
1390 pub_lock.publish(&msg).await?;
1391 self.invalidation_stats
1392 .messages_sent
1393 .fetch_add(1, Ordering::Relaxed);
1394 }
1395
1396 Ok(())
1397 }
1398
1399 /// Get invalidation statistics
1400 ///
1401 /// Returns statistics about invalidation operations if invalidation is enabled.
1402 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1403 if self.invalidation_subscriber.is_some() {
1404 Some(self.invalidation_stats.snapshot())
1405 } else {
1406 None
1407 }
1408 }
1409}
1410
1411/// Cache Manager statistics
1412#[allow(dead_code)]
1413#[derive(Debug, Clone)]
1414pub struct CacheManagerStats {
1415 pub total_requests: u64,
1416 pub l1_hits: u64,
1417 pub l2_hits: u64,
1418 pub total_hits: u64,
1419 pub misses: u64,
1420 pub hit_rate: f64,
1421 pub l1_hit_rate: f64,
1422 pub promotions: usize,
1423 pub in_flight_requests: usize,
1424}