multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use crate::error::CacheResult;
6use dashmap::DashMap;
7use rand::Rng;
8use std::future::Future;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::time::Duration;
12#[cfg(feature = "redis")]
13#[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
14use tokio::sync::Mutex;
15use tokio::sync::broadcast;
16use tracing::{debug, error, info, warn};
17
18#[cfg(feature = "moka")]
19#[cfg_attr(docsrs, doc(cfg(feature = "moka")))]
20use crate::L1Cache;
21#[cfg(feature = "redis")]
22#[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
23use crate::L2Cache;
24#[cfg(feature = "redis")]
25#[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
26use crate::invalidation::{
27 AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
28 InvalidationSubscriber,
29};
30use crate::serialization::{CacheSerializer, JsonSerializer};
31use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
32use bytes::Bytes;
33use futures_util::future::BoxFuture;
34
35///// Type alias for the in-flight requests map
36/// Stores a broadcast sender for each active key computation
37type InFlightMap = DashMap<String, broadcast::Sender<CacheResult<Bytes>>>;
38
39/// Cache strategies for different data types
40#[derive(Debug, Clone)]
41#[allow(dead_code)]
42pub enum CacheStrategy {
43 /// Real-time data - 10 seconds TTL
44 RealTime,
45 /// Short-term data - 5 minutes TTL
46 ShortTerm,
47 /// Medium-term data - 1 hour TTL
48 MediumTerm,
49 /// Long-term data - 3 hours TTL
50 LongTerm,
51 /// Custom TTL
52 Custom(Duration),
53 /// Default strategy (5 minutes)
54 Default,
55}
56
57impl CacheStrategy {
58 /// Convert strategy to duration
59 #[must_use]
60 pub fn to_duration(&self) -> Duration {
61 match self {
62 Self::RealTime => Duration::from_secs(10),
63 Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
64 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
65 Self::LongTerm => Duration::from_secs(10800), // 3 hours
66 Self::Custom(duration) => *duration,
67 }
68 }
69}
70
71/// Statistics for a single cache tier
72#[derive(Debug)]
73pub struct TierStats {
74 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
75 pub tier_level: usize,
76 /// Number of cache hits at this tier
77 pub hits: AtomicU64,
78 /// Backend name for identification
79 pub backend_name: String,
80}
81
82impl Clone for TierStats {
83 fn clone(&self) -> Self {
84 Self {
85 tier_level: self.tier_level,
86 hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
87 backend_name: self.backend_name.clone(),
88 }
89 }
90}
91
92impl TierStats {
93 fn new(tier_level: usize, backend_name: String) -> Self {
94 Self {
95 tier_level,
96 hits: AtomicU64::new(0),
97 backend_name,
98 }
99 }
100
101 /// Get current hit count
102 pub fn hit_count(&self) -> u64 {
103 self.hits.load(Ordering::Relaxed)
104 }
105}
106
107/// A single cache tier in the multi-tier architecture
108#[derive(Clone)]
109pub struct CacheTier {
110 /// The backend for this tier
111 pub backend: Arc<dyn L2CacheBackend>,
112 /// Tier level (1 for fastest, increases for slower/cheaper tiers)
113 pub tier_level: usize,
114 /// Whether to promote keys FROM lower tiers TO this tier
115 pub promotion_enabled: bool,
116 /// Promotion frequency (N) - promote with 1/N probability
117 pub promotion_frequency: usize,
118 /// TTL multiplier for this tier (e.g., L2 might store for 2x L1 TTL)
119 pub ttl_scale: f64,
120 /// Statistics for this tier
121 pub stats: TierStats,
122}
123
124impl CacheTier {
125 /// Create a new cache tier
126 pub fn new(
127 backend: Arc<dyn L2CacheBackend>,
128 tier_level: usize,
129 promotion_enabled: bool,
130 promotion_frequency: usize,
131 ttl_scale: f64,
132 ) -> Self {
133 let backend_name = backend.name().to_string();
134 Self {
135 backend,
136 tier_level,
137 promotion_enabled,
138 promotion_frequency,
139 ttl_scale,
140 stats: TierStats::new(tier_level, backend_name),
141 }
142 }
143
144 /// Get value with TTL from this tier
145 async fn get_with_ttl(&self, key: &str) -> Option<(Bytes, Option<Duration>)> {
146 self.backend.get_with_ttl(key).await
147 }
148
149 /// Set value with TTL in this tier
150 async fn set_with_ttl(&self, key: &str, value: Bytes, ttl: Duration) -> CacheResult<()> {
151 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
152 self.backend.set_with_ttl(key, value, scaled_ttl).await
153 }
154
155 /// Remove value from this tier
156 async fn remove(&self, key: &str) -> CacheResult<()> {
157 self.backend.remove(key).await
158 }
159
160 /// Record a cache hit for this tier
161 fn record_hit(&self) {
162 self.stats.hits.fetch_add(1, Ordering::Relaxed);
163 }
164}
165
166/// Configuration for a cache tier (used in builder pattern)
167#[derive(Debug, Clone)]
168pub struct TierConfig {
169 /// Tier level (1, 2, 3, 4...)
170 pub tier_level: usize,
171 /// Enable promotion to upper tiers on hit
172 pub promotion_enabled: bool,
173 /// Promotion frequency (N) - promote with 1/N probability (default 10)
174 pub promotion_frequency: usize,
175 /// TTL scale factor (1.0 = same as base TTL)
176 pub ttl_scale: f64,
177}
178
179impl TierConfig {
180 /// Create new tier configuration
181 #[must_use]
182 pub fn new(tier_level: usize) -> Self {
183 Self {
184 tier_level,
185 promotion_enabled: true,
186 promotion_frequency: 10,
187 ttl_scale: 1.0,
188 }
189 }
190
191 /// Configure as L1 (hot tier)
192 #[must_use]
193 pub fn as_l1() -> Self {
194 Self {
195 tier_level: 1,
196 promotion_enabled: false, // L1 is already top tier
197 promotion_frequency: 1, // Doesn't matter but use 1
198 ttl_scale: 1.0,
199 }
200 }
201
202 /// Configure as L2 (warm tier)
203 #[must_use]
204 pub fn as_l2() -> Self {
205 Self {
206 tier_level: 2,
207 promotion_enabled: true,
208 promotion_frequency: 10,
209 ttl_scale: 1.0,
210 }
211 }
212
213 /// Configure as L3 (cold tier) with longer TTL
214 #[must_use]
215 pub fn as_l3() -> Self {
216 Self {
217 tier_level: 3,
218 promotion_enabled: true,
219 promotion_frequency: 10,
220 ttl_scale: 2.0, // Keep data 2x longer
221 }
222 }
223
224 /// Configure as L4 (archive tier) with much longer TTL
225 #[must_use]
226 pub fn as_l4() -> Self {
227 Self {
228 tier_level: 4,
229 promotion_enabled: true,
230 promotion_frequency: 10,
231 ttl_scale: 8.0, // Keep data 8x longer
232 }
233 }
234
235 /// Set promotion enabled
236 #[must_use]
237 pub fn with_promotion(mut self, enabled: bool) -> Self {
238 self.promotion_enabled = enabled;
239 self
240 }
241
242 /// Set promotion frequency (N)
243 #[must_use]
244 pub fn with_promotion_frequency(mut self, n: usize) -> Self {
245 self.promotion_frequency = n;
246 self
247 }
248
249 /// Set TTL scale factor
250 #[must_use]
251 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
252 self.ttl_scale = scale;
253 self
254 }
255
256 /// Set tier level
257 #[must_use]
258 pub fn with_level(mut self, level: usize) -> Self {
259 self.tier_level = level;
260 self
261 }
262}
263
264pub struct CacheManager {
265 /// Ordered list of cache tiers (L1, L2, L3, ...)
266 tiers: Vec<CacheTier>,
267
268 /// Optional streaming backend
269 streaming_backend: Option<Arc<dyn StreamingBackend>>,
270 /// Statistics
271 total_requests: AtomicU64,
272 l1_hits: AtomicU64,
273 l2_hits: AtomicU64,
274 misses: AtomicU64,
275 /// In-flight requests map (Broadcaster integration will replace this in Step 4)
276 in_flight_requests: Arc<InFlightMap>,
277 /// Pluggable serializer
278 serializer: Arc<CacheSerializer>,
279 /// Invalidation publisher
280 #[cfg(feature = "redis")]
281 #[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
282 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
283 /// Invalidation subscriber
284 #[cfg(feature = "redis")]
285 #[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
286 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
287 /// Invalidation statistics
288 #[cfg(feature = "redis")]
289 #[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
290 invalidation_stats: Arc<AtomicInvalidationStats>,
291 /// Number of promotions performed
292 promotions: AtomicUsize,
293}
294
295impl CacheManager {
296 /// Create new cache manager with trait objects (pluggable backends)
297 ///
298 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
299 ///
300 /// # Arguments
301 ///
302 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
303 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
304 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
305 ///
306 /// # Example
307 ///
308 /// ```rust,ignore
309 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
310 /// use std::sync::Arc;
311 ///
312 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
313 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
314 ///
315 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
316 /// ```
317 /// # Errors
318 ///
319 /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
320 pub fn new_with_backends(
321 l1_cache: Arc<dyn CacheBackend>,
322 l2_cache: Arc<dyn L2CacheBackend>,
323 streaming_backend: Option<Arc<dyn StreamingBackend>>,
324 ) -> CacheResult<Self> {
325 debug!("Initializing Cache Manager with L1+L2 backends...");
326
327 let tiers = vec![
328 CacheTier::new(Arc::new(ProxyL1ToL2(l1_cache)), 1, false, 1, 1.0),
329 CacheTier::new(l2_cache, 2, true, 10, 1.0),
330 ];
331
332 Ok(Self {
333 tiers,
334 streaming_backend,
335 total_requests: AtomicU64::new(0),
336 l1_hits: AtomicU64::new(0),
337 l2_hits: AtomicU64::new(0),
338 misses: AtomicU64::new(0),
339 promotions: AtomicUsize::new(0),
340 in_flight_requests: Arc::new(DashMap::new()),
341 serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
342 #[cfg(feature = "redis")]
343 invalidation_publisher: None,
344 #[cfg(feature = "redis")]
345 invalidation_subscriber: None,
346 #[cfg(feature = "redis")]
347 #[cfg(feature = "redis")]
348 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
349 })
350 }
351
352 /// Create new cache manager with default backends (backward compatible)
353 ///
354 /// This is the legacy constructor maintained for backward compatibility.
355 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
356 ///
357 /// # Arguments
358 ///
359 /// * `l1_cache` - Moka L1 cache instance
360 /// * `l2_cache` - Redis L2 cache instance
361 /// # Errors
362 ///
363 /// Returns an error if Redis connection fails.
364 #[cfg(all(feature = "moka", feature = "redis"))]
365 #[cfg_attr(docsrs, doc(cfg(all(feature = "moka", feature = "redis"))))]
366 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> CacheResult<Self> {
367 debug!("Initializing Cache Manager...");
368
369 // Convert concrete types to trait objects
370 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
371 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
372
373 // Create RedisStreams backend for streaming functionality
374 let streaming_backend: Option<Arc<dyn StreamingBackend>> = {
375 let redis_url =
376 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
377 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
378 Some(Arc::new(redis_streams))
379 };
380
381 Self::new_with_backends(l1_backend, l2_backend, streaming_backend)
382 }
383
384 /// Create new cache manager with invalidation support
385 ///
386 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
387 ///
388 /// # Arguments
389 ///
390 /// * `l1_cache` - Moka L1 cache instance
391 /// * `l2_cache` - Redis L2 cache instance
392 /// * `redis_url` - Redis connection URL for Pub/Sub
393 /// * `config` - Invalidation configuration
394 ///
395 /// # Example
396 ///
397 /// ```rust,ignore
398 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
399 ///
400 /// let config = InvalidationConfig {
401 /// channel: "my_app:cache:invalidate".to_string(),
402 /// ..Default::default()
403 /// };
404 ///
405 /// let manager = CacheManager::new_with_invalidation(
406 /// l1, l2, "redis://localhost", config
407 /// ).await?;
408 /// ```
409 /// # Errors
410 ///
411 /// Returns an error if Redis connection fails or invalidation setup fails.
412 #[cfg(all(feature = "moka", feature = "redis"))]
413 #[cfg_attr(docsrs, doc(cfg(all(feature = "moka", feature = "redis"))))]
414 pub async fn new_with_invalidation(
415 l1_cache: Arc<L1Cache>,
416 l2_cache: Arc<L2Cache>,
417 redis_url: &str,
418 config: InvalidationConfig,
419 ) -> CacheResult<Self> {
420 debug!("Initializing Cache Manager with Invalidation...");
421 debug!(" Pub/Sub channel: {}", config.channel);
422
423 // Convert concrete types to trait objects
424 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
425 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
426
427 // Create RedisStreams backend for streaming functionality
428 let streaming_backend: Option<Arc<dyn StreamingBackend>> = {
429 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
430 Some(Arc::new(redis_streams))
431 };
432
433 // Create publisher
434 let (invalidation_publisher, invalidation_subscriber) = {
435 let client = redis::Client::open(redis_url)?;
436 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
437 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
438
439 // Create subscriber
440 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
441 (
442 Some(Arc::new(Mutex::new(publisher))),
443 Some(Arc::new(subscriber)),
444 )
445 };
446
447 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
448
449 let tiers = vec![
450 CacheTier::new(Arc::new(ProxyL1ToL2(l1_backend)), 1, false, 1, 1.0),
451 CacheTier::new(l2_backend, 2, true, 10, 1.0),
452 ];
453
454 let manager = Self {
455 tiers,
456 streaming_backend,
457 total_requests: AtomicU64::new(0),
458 l1_hits: AtomicU64::new(0),
459 l2_hits: AtomicU64::new(0),
460 misses: AtomicU64::new(0),
461 promotions: AtomicUsize::new(0),
462 in_flight_requests: Arc::new(DashMap::new()),
463 serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
464 invalidation_publisher,
465 invalidation_subscriber,
466 invalidation_stats,
467 };
468
469 // Start subscriber with handler
470 manager.start_invalidation_subscriber();
471
472 info!("Cache Manager initialized with invalidation support");
473
474 Ok(manager)
475 }
476
477 /// Create new cache manager with multi-tier architecture (v0.5.0+)
478 ///
479 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
480 /// Tiers are checked in order (lower `tier_level` = faster/hotter).
481 ///
482 /// # Arguments
483 ///
484 /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
485 /// * `streaming_backend` - Optional streaming backend
486 ///
487 /// # Example
488 ///
489 /// ```rust,ignore
490 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
491 /// use std::sync::Arc;
492 ///
493 /// // L1 + L2 + L3 setup
494 /// let l1 = Arc::new(L1Cache::new()?);
495 /// let l2 = Arc::new(L2Cache::new().await?);
496 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
497 ///
498 /// let tiers = vec![
499 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
500 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
501 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
502 /// ];
503 ///
504 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
505 /// ```
506 /// # Errors
507 ///
508 /// Returns an error if tiers are not sorted by level or if no tiers are provided.
509 pub fn new_with_tiers(
510 tiers: Vec<CacheTier>,
511 streaming_backend: Option<Arc<dyn StreamingBackend>>,
512 ) -> CacheResult<Self> {
513 debug!("Initializing Multi-Tier Cache Manager...");
514 debug!(" Tier count: {}", tiers.len());
515 for tier in &tiers {
516 debug!(
517 " L{}: {} (promotion={}, ttl_scale={})",
518 tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
519 );
520 }
521
522 // Validate tiers are sorted by level
523 for i in 1..tiers.len() {
524 if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1))
525 && current.tier_level <= prev.tier_level
526 {
527 return Err(crate::error::CacheError::ConfigError(format!(
528 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
529 current.tier_level, prev.tier_level
530 )));
531 }
532 }
533
534 Ok(Self {
535 tiers,
536 streaming_backend,
537 total_requests: AtomicU64::new(0),
538 l1_hits: AtomicU64::new(0),
539 l2_hits: AtomicU64::new(0),
540 misses: AtomicU64::new(0),
541 promotions: AtomicUsize::new(0),
542 in_flight_requests: Arc::new(DashMap::new()),
543 serializer: Arc::new(CacheSerializer::Json(JsonSerializer)),
544 #[cfg(feature = "redis")]
545 invalidation_publisher: None,
546 #[cfg(feature = "redis")]
547 invalidation_subscriber: None,
548 #[cfg(feature = "redis")]
549 #[cfg(feature = "redis")]
550 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
551 })
552 }
553
554 /// Set a custom serializer for the cache manager
555 pub fn set_serializer(&mut self, serializer: CacheSerializer) {
556 debug!(name = %serializer.name(), "Switching cache serializer");
557 self.serializer = Arc::new(serializer);
558 }
559
560 /// Start the invalidation subscriber background task
561 #[cfg(feature = "redis")]
562 #[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
563 fn start_invalidation_subscriber(&self) {
564 #[cfg(feature = "redis")]
565 if let Some(subscriber) = &self.invalidation_subscriber {
566 let tiers = self.tiers.clone();
567
568 subscriber.start(move |msg: crate::invalidation::InvalidationMessage| {
569 let tiers = tiers.clone();
570 async move {
571 for tier in &tiers {
572 match &msg {
573 InvalidationMessage::Remove { key } => {
574 tier.backend.remove(key).await.ok();
575 }
576 InvalidationMessage::Update {
577 key,
578 value,
579 ttl_secs,
580 } => {
581 if let Some(secs) = ttl_secs {
582 tier.backend
583 .set_with_ttl(
584 key,
585 value.clone(),
586 Duration::from_secs(*secs),
587 )
588 .await
589 .ok();
590 } else {
591 tier.backend.set(key, value.clone()).await.ok();
592 }
593 }
594 InvalidationMessage::RemovePattern { pattern } => {
595 if let Err(e) = tier.backend.remove_pattern(pattern).await {
596 warn!(
597 "Failed to remove pattern '{}' from L{}: {}",
598 pattern, tier.tier_level, e
599 );
600 }
601 }
602 InvalidationMessage::RemoveBulk { keys } => {
603 for key in keys {
604 if let Err(e) = tier.backend.remove(key).await {
605 warn!(
606 "Failed to remove '{}' from L{}: {}",
607 key, tier.tier_level, e
608 );
609 }
610 }
611 }
612 }
613 }
614 Ok(())
615 }
616 });
617
618 info!("Invalidation subscriber started across all tiers");
619 }
620 }
621
622 /// Get value from cache using multi-tier architecture (v0.5.0+)
623 ///
624 /// This method iterates through all configured tiers and automatically promotes
625 /// to upper tiers on cache hit.
626 async fn get_multi_tier(&self, key: &str) -> CacheResult<Option<Bytes>> {
627 // Try each tier sequentially (sorted by tier_level)
628 for (tier_index, tier) in self.tiers.iter().enumerate() {
629 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
630 // Cache hit!
631 tier.record_hit();
632
633 // Promote to all upper tiers (if promotion enabled)
634 if tier.promotion_enabled && tier_index > 0 {
635 // Probabilistic Promotion Check
636 let should_promote = if tier.promotion_frequency <= 1 {
637 true
638 } else {
639 rand::thread_rng().gen_ratio(
640 1,
641 u32::try_from(tier.promotion_frequency).unwrap_or(u32::MAX),
642 )
643 };
644
645 if should_promote {
646 let promotion_ttl =
647 ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
648
649 // Promote to all tiers above this one
650 for upper_tier in self.tiers.iter().take(tier_index).rev() {
651 if let Err(e) = upper_tier
652 .set_with_ttl(key, value.clone(), promotion_ttl)
653 .await
654 {
655 warn!(
656 "Failed to promote '{}' from L{} to L{}: {}",
657 key, tier.tier_level, upper_tier.tier_level, e
658 );
659 } else {
660 self.promotions.fetch_add(1, Ordering::Relaxed);
661 debug!(
662 "Promoted '{}' from L{} to L{} (TTL: {:?})",
663 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
664 );
665 }
666 }
667 } else {
668 debug!(
669 "Probabilistic skip promotion for '{}' from L{} (N={})",
670 key, tier.tier_level, tier.promotion_frequency
671 );
672 }
673 }
674
675 return Ok(Some(value));
676 }
677 }
678
679 // Cache miss across all tiers
680 Ok(None)
681 }
682
683 /// Get value from cache (L1 first, then L2 fallback with promotion)
684 ///
685 /// This method now includes built-in Cache Stampede protection when cache misses occur.
686 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
687 /// unnecessary duplicate work on external data sources.
688 ///
689 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
690 ///
691 /// # Arguments
692 /// * `key` - Cache key to retrieve
693 ///
694 /// # Returns
695 /// * `Ok(Some(value))` - Cache hit, value found in any tier
696 /// * `Ok(None)` - Cache miss, value not found in any cache
697 /// * `Err(error)` - Cache operation failed
698 /// # Errors
699 ///
700 /// Returns an error if cache operation fails.
701 ///
702 /// # Panics
703 ///
704 /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
705 pub async fn get(&self, key: &str) -> CacheResult<Option<Bytes>> {
706 self.total_requests.fetch_add(1, Ordering::Relaxed);
707
708 // Fast path for L1 (first tier) - no locking needed
709 if let Some(tier1) = self.tiers.first()
710 && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
711 {
712 tier1.record_hit();
713 // Update legacy stats for backward compatibility
714 self.l1_hits.fetch_add(1, Ordering::Relaxed);
715 return Ok(Some(value));
716 }
717
718 // L1 miss - use stampede protection for lower tiers
719 let key_owned = key.to_string();
720 let lock_guard = self
721 .in_flight_requests
722 .entry(key_owned.clone())
723 .or_insert_with(|| broadcast::Sender::new(1))
724 .clone();
725
726 let mut rx = lock_guard.subscribe();
727
728 // If there are other receivers, someone else is computing, wait for it
729 if lock_guard.receiver_count() > 1 {
730 match rx.recv().await {
731 Ok(Ok(value)) => return Ok(Some(value)),
732 Ok(Err(e)) => {
733 return Err(crate::error::CacheError::BackendError(format!(
734 "Computation failed in another thread: {e}"
735 )));
736 }
737 Err(_) => {} // Fall through to re-compute if sender dropped or channel empty
738 }
739 }
740
741 // Double-check L1 after acquiring lock (or if we are the first to compute)
742 if let Some(tier1) = self.tiers.first()
743 && let Some((value, _ttl)) = tier1.get_with_ttl(key).await
744 {
745 tier1.record_hit();
746 self.l1_hits.fetch_add(1, Ordering::Relaxed);
747 let _ = lock_guard.send(Ok(value.clone())); // Notify any waiting subscribers
748 return Ok(Some(value));
749 }
750
751 // Check remaining tiers with promotion
752 let result = self.get_multi_tier(key).await?;
753
754 if let Some(val) = result.clone() {
755 // Hit in L2+ tier - update legacy stats
756 if self.tiers.len() >= 2 {
757 self.l2_hits.fetch_add(1, Ordering::Relaxed);
758 }
759
760 // Notify any waiting subscribers
761 let _ = lock_guard.send(Ok(val.clone()));
762
763 // Remove the in-flight entry after computation/retrieval
764 self.in_flight_requests.remove(key);
765
766 Ok(Some(val))
767 } else {
768 self.misses.fetch_add(1, Ordering::Relaxed);
769
770 // Remove the in-flight entry after computation/retrieval
771 self.in_flight_requests.remove(key);
772
773 Ok(None)
774 }
775 }
776
777 /// Get a value from cache and deserialize it (Type-Safe Version)
778 ///
779 /// # Errors
780 ///
781 /// Returns a `SerializationError` if deserialization fails, or a `BackendError` if the cache retrieval fails.
782 pub async fn get_typed<T>(&self, key: &str) -> CacheResult<Option<T>>
783 where
784 T: serde::de::DeserializeOwned,
785 {
786 if let Some(bytes) = self.get(key).await? {
787 return Ok(Some(self.serializer.deserialize::<T>(&bytes)?));
788 }
789 Ok(None)
790 }
791
792 /// Set value with specific cache strategy (all tiers)
793 ///
794 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
795 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
796 /// # Errors
797 ///
798 /// Returns an error if cache set operation fails.
799 pub async fn set_with_strategy(
800 &self,
801 key: &str,
802 value: Bytes,
803 strategy: CacheStrategy,
804 ) -> CacheResult<()> {
805 let ttl = strategy.to_duration();
806
807 let mut success_count = 0;
808 let mut last_error = None;
809
810 for tier in &self.tiers {
811 match tier.set_with_ttl(key, value.clone(), ttl).await {
812 Ok(()) => {
813 success_count += 1;
814 }
815 Err(e) => {
816 error!(
817 "L{} cache set failed for key '{}': {}",
818 tier.tier_level, key, e
819 );
820 last_error = Some(e);
821 }
822 }
823 }
824
825 if success_count > 0 {
826 debug!(
827 "[Cache] Stored '{}' in {}/{} tiers (base TTL: {:?})",
828 key,
829 success_count,
830 self.tiers.len(),
831 ttl
832 );
833 return Ok(());
834 }
835
836 Err(last_error.unwrap_or_else(|| {
837 crate::error::CacheError::InternalError("All tiers failed".to_string())
838 }))
839 }
840
841 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
842 ///
843 /// This method provides comprehensive Cache Stampede protection:
844 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
845 /// 2. Check L2 cache with mutex-based coalescing
846 /// 3. Compute fresh data with protection against concurrent computations
847 ///
848 /// # Arguments
849 /// * `key` - Cache key
850 /// * `strategy` - Cache strategy for TTL and storage behavior
851 /// * `compute_fn` - Async function to compute the value if not in any cache
852 ///
853 /// # Example
854 /// ```ignore
855 /// let api_data = cache_manager.get_or_compute_with(
856 /// "api_response",
857 /// CacheStrategy::RealTime,
858 /// || async {
859 /// fetch_data_from_api().await
860 /// }
861 /// ).await?;
862 /// ```
863 #[allow(dead_code)]
864 /// # Errors
865 ///
866 /// Returns an error if compute function fails or cache operations fail.
867 pub async fn get_or_compute_with<F, Fut>(
868 &self,
869 key: &str,
870 strategy: CacheStrategy,
871 compute_fn: F,
872 ) -> CacheResult<Bytes>
873 where
874 F: FnOnce() -> Fut + Send,
875 Fut: Future<Output = CacheResult<Bytes>> + Send,
876 {
877 self.total_requests.fetch_add(1, Ordering::Relaxed);
878
879 // 1. Try tiers sequentially first
880 for (idx, tier) in self.tiers.iter().enumerate() {
881 if let Some((value, _ttl)) = tier.get_with_ttl(key).await {
882 tier.record_hit();
883 if tier.tier_level == 1 {
884 self.l1_hits.fetch_add(1, Ordering::Relaxed);
885 } else if tier.tier_level == 2 {
886 self.l2_hits.fetch_add(1, Ordering::Relaxed);
887 }
888
889 // Promotion to L1 if hit was in a lower tier
890 if idx > 0 && tier.promotion_enabled {
891 // Probabilistic Promotion Check
892 let should_promote = if tier.promotion_frequency <= 1 {
893 true
894 } else {
895 rand::thread_rng().gen_ratio(
896 1,
897 u32::try_from(tier.promotion_frequency).unwrap_or(u32::MAX),
898 )
899 };
900
901 if should_promote && let Some(l1_tier) = self.tiers.first() {
902 let _ = l1_tier
903 .set_with_ttl(key, value.clone(), strategy.to_duration())
904 .await;
905 self.promotions.fetch_add(1, Ordering::Relaxed);
906 }
907 }
908 return Ok(value);
909 }
910 }
911
912 // 2. Cache miss across all tiers - use stampede protection
913 let (tx, mut rx): (
914 tokio::sync::broadcast::Sender<CacheResult<Bytes>>,
915 tokio::sync::broadcast::Receiver<CacheResult<Bytes>>,
916 ) = match self.in_flight_requests.entry(key.to_string()) {
917 dashmap::mapref::entry::Entry::Occupied(entry) => {
918 let tx = entry.get().clone();
919 (tx.clone(), tx.subscribe())
920 }
921 dashmap::mapref::entry::Entry::Vacant(entry) => {
922 let (tx, _) = tokio::sync::broadcast::channel(1);
923 entry.insert(tx.clone());
924 (tx.clone(), tx.subscribe())
925 }
926 };
927
928 if tx.receiver_count() > 1 {
929 // Someone else is computing, wait for it
930 match rx.recv().await {
931 Ok(Ok(value)) => return Ok(value),
932 Ok(Err(e)) => {
933 return Err(crate::error::CacheError::BackendError(format!(
934 "Computation failed in another thread: {e}"
935 )));
936 }
937 Err(_) => {} // Fall through to re-compute
938 }
939 }
940
941 // 3. Re-check cache after receiving/creating broadcaster (double-check pattern)
942 for tier in &self.tiers {
943 if let Some((value, _)) = tier.get_with_ttl(key).await {
944 let _ = tx.send(Ok(value.clone()));
945 return Ok(value);
946 }
947 }
948
949 // 4. Miss - compute fresh data
950 debug!(
951 "Computing fresh data for key: '{}' (Stampede protected)",
952 key
953 );
954
955 let result = compute_fn().await;
956
957 // Remove from in_flight BEFORE broadcasting
958 self.in_flight_requests.remove(key);
959
960 match &result {
961 Ok(value) => {
962 let _ = self.set_with_strategy(key, value.clone(), strategy).await;
963 let _ = tx.send(Ok(value.clone()));
964 }
965 Err(e) => {
966 let _ = tx.send(Err(e.clone()));
967 }
968 }
969
970 result
971 }
972
973 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
974 ///
975 /// This method provides the same functionality as `get_or_compute_with()` but with
976 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
977 /// API calls, or any computation that returns structured data.
978 ///
979 /// # Type Safety
980 ///
981 /// - Returns your actual type `T` instead of `serde_json::Value`
982 /// - Compiler enforces Serialize + `DeserializeOwned` bounds
983 /// - No manual JSON conversion needed
984 ///
985 /// # Cache Flow
986 ///
987 /// 1. Check L1 cache → deserialize if found
988 /// 2. Check L2 cache → deserialize + promote to L1 if found
989 /// 3. Execute `compute_fn` → serialize → store in L1+L2
990 /// 4. Full stampede protection (only ONE request computes)
991 ///
992 /// # Arguments
993 ///
994 /// * `key` - Cache key
995 /// * `strategy` - Cache strategy for TTL
996 /// * `compute_fn` - Async function returning `Result<T>`
997 ///
998 /// # Example - Database Query
999 ///
1000 /// ```no_run
1001 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1002 /// # use std::sync::Arc;
1003 /// # use serde::{Serialize, Deserialize};
1004 /// # async fn example() -> anyhow::Result<()> {
1005 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1006 /// # let l2 = Arc::new(L2Cache::new().await?);
1007 /// # let cache_manager = CacheManager::new(l1, l2);
1008 ///
1009 /// #[derive(Serialize, Deserialize)]
1010 /// struct User {
1011 /// id: i64,
1012 /// name: String,
1013 /// }
1014 ///
1015 /// // Type-safe database caching (example - requires sqlx)
1016 /// // let user: User = cache_manager.get_or_compute_typed(
1017 /// // "user:123",
1018 /// // CacheStrategy::MediumTerm,
1019 /// // || async {
1020 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1021 /// // .bind(123)
1022 /// // .fetch_one(&pool)
1023 /// // .await
1024 /// // }
1025 /// // ).await?;
1026 /// # Ok(())
1027 /// # }
1028 /// ```
1029 ///
1030 /// # Example - API Call
1031 ///
1032 /// ```no_run
1033 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1034 /// # use std::sync::Arc;
1035 /// # use serde::{Serialize, Deserialize};
1036 /// # async fn example() -> anyhow::Result<()> {
1037 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1038 /// # let l2 = Arc::new(L2Cache::new().await?);
1039 /// # let cache_manager = CacheManager::new(l1, l2);
1040 /// #[derive(Serialize, Deserialize)]
1041 /// struct ApiResponse {
1042 /// data: String,
1043 /// timestamp: i64,
1044 /// }
1045 ///
1046 /// // API call caching (example - requires reqwest)
1047 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1048 /// // "api:endpoint",
1049 /// // CacheStrategy::RealTime,
1050 /// // || async {
1051 /// // reqwest::get("https://api.example.com/data")
1052 /// // .await?
1053 /// // .json::<ApiResponse>()
1054 /// // .await
1055 /// // }
1056 /// // ).await?;
1057 /// # Ok(())
1058 /// # }
1059 /// ```
1060 ///
1061 /// # Performance
1062 ///
1063 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1064 /// - L2 hit: 2-5ms + deserialization + L1 promotion
1065 /// - Compute: Your function time + serialization + L1+L2 storage
1066 /// - Stampede protection: 99.6% latency reduction under high concurrency
1067 ///
1068 /// # Errors
1069 ///
1070 /// Returns error if:
1071 /// - Compute function fails
1072 /// - Serialization fails (invalid type for JSON)
1073 /// - Deserialization fails (cache data doesn't match type T)
1074 /// - Cache operations fail (Redis connection issues)
1075 #[allow(clippy::too_many_lines)]
1076 pub async fn get_or_compute_typed<T, F, Fut>(
1077 &self,
1078 key: &str,
1079 strategy: CacheStrategy,
1080 compute_fn: F,
1081 ) -> CacheResult<T>
1082 where
1083 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1084 F: FnOnce() -> Fut + Send,
1085 Fut: Future<Output = CacheResult<T>> + Send,
1086 {
1087 // 1. Try to get typed from cache first
1088 if let Some(value) = self.get_typed::<T>(key).await? {
1089 return Ok(value);
1090 }
1091
1092 // 2. Use get_or_compute_with to handle stampede protection
1093 let serializer = self.serializer.clone();
1094 let bytes_result = self
1095 .get_or_compute_with(key, strategy, || async move {
1096 let val = compute_fn().await?;
1097 serializer.serialize(&val)
1098 })
1099 .await?;
1100
1101 // 3. Deserialize result
1102 self.serializer.deserialize::<T>(&bytes_result)
1103 }
1104
1105 /// Get comprehensive cache statistics
1106 ///
1107 /// In multi-tier mode, aggregates statistics from all tiers.
1108 /// In legacy mode, returns L1 and L2 stats.
1109 #[allow(dead_code)]
1110 pub fn get_stats(&self) -> CacheManagerStats {
1111 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1112 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1113 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1114 let misses = self.misses.load(Ordering::Relaxed);
1115
1116 CacheManagerStats {
1117 total_requests: total_reqs,
1118 l1_hits,
1119 l2_hits,
1120 total_hits: l1_hits + l2_hits,
1121 misses,
1122 hit_rate: if total_reqs > 0 {
1123 #[allow(clippy::cast_precision_loss)]
1124 {
1125 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1126 }
1127 } else {
1128 0.0
1129 },
1130 l1_hit_rate: if total_reqs > 0 {
1131 #[allow(clippy::cast_precision_loss)]
1132 {
1133 (l1_hits as f64 / total_reqs as f64) * 100.0
1134 }
1135 } else {
1136 0.0
1137 },
1138 promotions: self.promotions.load(Ordering::Relaxed),
1139 in_flight_requests: self.in_flight_requests.len(),
1140 }
1141 }
1142
1143 /// Get per-tier statistics (v0.5.0+)
1144 ///
1145 /// Returns statistics for each tier if multi-tier mode is enabled.
1146 /// Returns None if using legacy 2-tier mode.
1147 ///
1148 /// # Example
1149 /// ```rust,ignore
1150 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1151 /// for stats in tier_stats {
1152 /// println!("L{}: {} hits ({})",
1153 /// stats.tier_level,
1154 /// stats.hit_count(),
1155 /// stats.backend_name);
1156 /// }
1157 /// }
1158 /// ```
1159 pub fn get_tier_stats(&self) -> Vec<TierStats> {
1160 self.tiers.iter().map(|tier| tier.stats.clone()).collect()
1161 }
1162}
1163
1164/// Proxy wrapper to allow using `CacheBackend` where `DynL2CacheBackend` is expected
1165/// (Internal helper for `new_with_backends` to wrap L1 `CacheBackend` into `DynL2CacheBackend`)
1166struct ProxyL1ToL2(Arc<dyn CacheBackend>);
1167
1168impl CacheBackend for ProxyL1ToL2 {
1169 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
1170 self.0.get(key)
1171 }
1172
1173 fn set_with_ttl<'a>(
1174 &'a self,
1175 key: &'a str,
1176 value: Bytes,
1177 ttl: Duration,
1178 ) -> BoxFuture<'a, CacheResult<()>> {
1179 self.0.set_with_ttl(key, value, ttl)
1180 }
1181
1182 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
1183 self.0.remove(key)
1184 }
1185
1186 fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
1187 self.0.remove_pattern(pattern)
1188 }
1189
1190 fn health_check(&self) -> BoxFuture<'_, bool> {
1191 self.0.health_check()
1192 }
1193
1194 fn name(&self) -> &'static str {
1195 self.0.name()
1196 }
1197}
1198
1199impl L2CacheBackend for ProxyL1ToL2 {
1200 fn get_with_ttl<'a>(
1201 &'a self,
1202 key: &'a str,
1203 ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
1204 Box::pin(async move { self.0.get(key).await.map(|v| (v, None)) })
1205 }
1206}
1207
1208impl CacheManager {
1209 // ===== Redis Streams Methods =====
1210
1211 /// Publish data to Redis Stream
1212 ///
1213 /// # Arguments
1214 /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1215 /// * `fields` - Field-value pairs to publish
1216 /// * `maxlen` - Optional max length for stream trimming
1217 ///
1218 /// # Returns
1219 /// The entry ID generated by Redis
1220 ///
1221 /// # Errors
1222 /// Returns error if streaming backend is not configured
1223 pub async fn publish_to_stream(
1224 &self,
1225 stream_key: &str,
1226 fields: Vec<(String, String)>,
1227 maxlen: Option<usize>,
1228 ) -> CacheResult<String> {
1229 match &self.streaming_backend {
1230 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1231 None => Err(crate::error::CacheError::ConfigError(
1232 "Streaming backend not configured".to_string(),
1233 )),
1234 }
1235 }
1236
1237 /// Read latest entries from Redis Stream
1238 ///
1239 /// # Arguments
1240 /// * `stream_key` - Name of the stream
1241 /// * `count` - Number of latest entries to retrieve
1242 ///
1243 /// # Returns
1244 /// Vector of (`entry_id`, fields) tuples (newest first)
1245 ///
1246 /// # Errors
1247 /// Returns error if streaming backend is not configured
1248 pub async fn read_stream_latest(
1249 &self,
1250 stream_key: &str,
1251 count: usize,
1252 ) -> CacheResult<Vec<(String, Vec<(String, String)>)>> {
1253 match &self.streaming_backend {
1254 Some(backend) => backend
1255 .stream_read_latest(stream_key, count)
1256 .await,
1257 None => Err(crate::error::CacheError::ConfigError(
1258 "Streaming backend not configured".to_string(),
1259 )),
1260 }
1261 }
1262
1263 /// Read from Redis Stream with optional blocking
1264 ///
1265 /// # Arguments
1266 /// * `stream_key` - Name of the stream
1267 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1268 /// * `count` - Max entries to retrieve
1269 /// * `block_ms` - Optional blocking timeout in ms
1270 ///
1271 /// # Returns
1272 /// Vector of (`entry_id`, fields) tuples
1273 ///
1274 /// # Errors
1275 /// Returns error if streaming backend is not configured
1276 pub async fn read_stream(
1277 &self,
1278 stream_key: &str,
1279 last_id: &str,
1280 count: usize,
1281 block_ms: Option<usize>,
1282 ) -> CacheResult<Vec<(String, Vec<(String, String)>)>> {
1283 match &self.streaming_backend {
1284 Some(backend) => {
1285 backend
1286 .stream_read(stream_key, last_id, count, block_ms)
1287 .await
1288 }
1289 None => Err(crate::error::CacheError::ConfigError(
1290 "Streaming backend not configured".to_string(),
1291 )),
1292 }
1293 }
1294
1295 // ===== Cache Invalidation Methods =====
1296
1297 /// Invalidate a cache key across all instances
1298 ///
1299 /// This removes the key from all cache tiers and broadcasts
1300 /// the invalidation to all other cache instances via Redis Pub/Sub.
1301 ///
1302 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1303 ///
1304 /// # Arguments
1305 /// * `key` - Cache key to invalidate
1306 ///
1307 /// # Example
1308 /// ```rust,no_run
1309 /// # use multi_tier_cache::CacheManager;
1310 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1311 /// // Invalidate user cache after profile update
1312 /// cache_manager.invalidate("user:123").await?;
1313 /// # Ok(())
1314 /// # }
1315 /// ```
1316 /// # Errors
1317 ///
1318 /// Returns an error if invalidation fails.
1319 pub async fn invalidate(&self, key: &str) -> CacheResult<()> {
1320 // Remove from ALL tiers
1321 for tier in &self.tiers {
1322 if let Err(e) = tier.remove(key).await {
1323 warn!(
1324 "Failed to remove '{}' from L{}: {}",
1325 key, tier.tier_level, e
1326 );
1327 }
1328 }
1329
1330 // Broadcast to other instances
1331 #[cfg(feature = "redis")]
1332 {
1333 if let Some(publisher) = &self.invalidation_publisher {
1334 let mut pub_lock: tokio::sync::MutexGuard<
1335 '_,
1336 crate::invalidation::InvalidationPublisher,
1337 > = publisher.lock().await;
1338 let msg = InvalidationMessage::remove(key);
1339 pub_lock.publish(&msg).await?;
1340 self.invalidation_stats
1341 .messages_sent
1342 .fetch_add(1, Ordering::Relaxed);
1343 }
1344 }
1345
1346 debug!("Invalidated '{}' across all instances", key);
1347 Ok(())
1348 }
1349
1350 /// Update cache value across all instances
1351 ///
1352 /// This updates the key in all cache tiers and broadcasts
1353 /// the update to all other cache instances, avoiding cache misses.
1354 ///
1355 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1356 ///
1357 /// # Arguments
1358 /// * `key` - Cache key to update
1359 /// * `value` - New value
1360 /// * `ttl` - Optional TTL (uses default if None)
1361 ///
1362 /// # Example
1363 /// ```rust,no_run
1364 /// # use multi_tier_cache::CacheManager;
1365 /// # use std::time::Duration;
1366 /// # use bytes::Bytes;
1367 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1368 /// // Update user cache with new data
1369 /// let user_data = Bytes::from("alice");
1370 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1371 /// # Ok(())
1372 /// # }
1373 /// ```
1374 /// # Errors
1375 ///
1376 /// Returns an error if cache update fails.
1377 pub async fn update_cache(
1378 &self,
1379 key: &str,
1380 value: Bytes,
1381 ttl: Option<Duration>,
1382 ) -> CacheResult<()> {
1383 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1384
1385 // Update ALL tiers
1386 for tier in &self.tiers {
1387 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1388 warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1389 }
1390 }
1391
1392 // Broadcast update to other instances
1393 #[cfg(feature = "redis")]
1394 if let Some(publisher) = &self.invalidation_publisher {
1395 let mut pub_lock = publisher.lock().await;
1396 let msg = InvalidationMessage::update(key, value, Some(ttl));
1397 pub_lock.publish(&msg).await?;
1398 self.invalidation_stats
1399 .messages_sent
1400 .fetch_add(1, Ordering::Relaxed);
1401 }
1402
1403 debug!("Updated '{}' across all instances", key);
1404 Ok(())
1405 }
1406
1407 /// Invalidate all keys matching a pattern
1408 ///
1409 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1410 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1411 ///
1412 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1413 ///
1414 /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1415 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1416 ///
1417 /// # Arguments
1418 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1419 ///
1420 /// # Example
1421 /// ```rust,no_run
1422 /// # use multi_tier_cache::CacheManager;
1423 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1424 /// // Invalidate all user caches
1425 /// cache_manager.invalidate_pattern("user:*").await?;
1426 ///
1427 /// // Invalidate specific user's related caches
1428 /// cache_manager.invalidate_pattern("user:123:*").await?;
1429 /// # Ok(())
1430 /// # }
1431 /// ```
1432 /// # Errors
1433 ///
1434 /// Returns an error if invalidation fails.
1435 pub async fn invalidate_pattern(&self, pattern: &str) -> CacheResult<()> {
1436 debug!(pattern = %pattern, "Invalidating pattern across all tiers");
1437
1438 // 1. Invalidate in all configured tiers
1439 for tier in &self.tiers {
1440 debug!(tier = %tier.tier_level, "Invalidating pattern in tier");
1441 tier.backend.remove_pattern(pattern).await?;
1442 }
1443
1444 // 2. Broadcast invalidation if publisher is configured
1445 #[cfg(feature = "redis")]
1446 {
1447 if let Some(publisher) = &self.invalidation_publisher {
1448 let msg = InvalidationMessage::remove_pattern(pattern);
1449 publisher.lock().await.publish(&msg).await?;
1450 debug!(pattern = %pattern, "Broadcasted pattern invalidation");
1451 }
1452 }
1453
1454 Ok(())
1455 }
1456
1457 /// Set value with automatic broadcast to all instances
1458 ///
1459 /// This is a write-through operation that updates the cache and
1460 /// broadcasts the update to all other instances automatically.
1461 ///
1462 /// # Arguments
1463 /// * `key` - Cache key
1464 /// * `value` - Value to cache
1465 /// * `strategy` - Cache strategy (determines TTL)
1466 ///
1467 /// # Example
1468 /// ```rust,no_run
1469 /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1470 /// # use bytes::Bytes;
1471 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1472 /// // Update and broadcast in one call
1473 /// let data = Bytes::from("active");
1474 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1475 /// # Ok(())
1476 /// # }
1477 /// ```
1478 /// # Errors
1479 ///
1480 /// Returns an error if cache set or broadcast fails.
1481 pub async fn set_with_broadcast(
1482 &self,
1483 key: &str,
1484 value: Bytes,
1485 strategy: CacheStrategy,
1486 ) -> CacheResult<()> {
1487 #[cfg(feature = "redis")] let ttl = strategy.to_duration();
1488
1489 // Set in local caches
1490 self.set_with_strategy(key, value.clone(), strategy).await?;
1491
1492 // Broadcast update if invalidation is enabled
1493 #[cfg(feature = "redis")]
1494 if let Some(publisher) = &self.invalidation_publisher {
1495 let mut pub_lock = publisher.lock().await;
1496 let msg = InvalidationMessage::update(key, value, Some(ttl));
1497 pub_lock.publish(&msg).await?;
1498 self.invalidation_stats
1499 .messages_sent
1500 .fetch_add(1, Ordering::Relaxed);
1501 }
1502
1503 Ok(())
1504 }
1505
1506 /// Get invalidation statistics
1507 ///
1508 /// Returns statistics about invalidation operations if invalidation is enabled.
1509 #[cfg(feature = "redis")]
1510 pub fn invalidation_stats(&self) -> Option<crate::invalidation::InvalidationStats> {
1511 #[cfg(feature = "redis")]
1512 {
1513 Some(self.invalidation_stats.snapshot())
1514 }
1515 #[cfg(not(feature = "redis"))]
1516 {
1517 None
1518 }
1519 }
1520}
1521
1522/// Cache Manager statistics
1523#[allow(dead_code)]
1524#[derive(Debug, Clone)]
1525pub struct CacheManagerStats {
1526 pub total_requests: u64,
1527 pub l1_hits: u64,
1528 pub l2_hits: u64,
1529 pub total_hits: u64,
1530 pub misses: u64,
1531 pub hit_rate: f64,
1532 pub l1_hit_rate: f64,
1533 pub promotions: usize,
1534 pub in_flight_requests: usize,
1535}