multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use anyhow::Result;
6use dashmap::DashMap;
7use serde_json;
8use std::future::Future;
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13
14use tracing::{debug, error, info, warn};
15
16use super::invalidation::{
17 AtomicInvalidationStats, InvalidationConfig, InvalidationMessage, InvalidationPublisher,
18 InvalidationStats, InvalidationSubscriber,
19};
20use crate::backends::{L1Cache, L2Cache};
21use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
22
23/// Type alias for the in-flight requests map
24type InFlightMap = DashMap<String, Arc<Mutex<()>>>;
25
26/// RAII cleanup guard for in-flight request tracking
27/// Ensures that entries are removed from `DashMap` even on early return or panic
28struct CleanupGuard<'a> {
29 map: &'a InFlightMap,
30 key: String,
31}
32
33impl Drop for CleanupGuard<'_> {
34 fn drop(&mut self) {
35 self.map.remove(&self.key);
36 }
37}
38
39/// Cache strategies for different data types
40#[derive(Debug, Clone)]
41#[allow(dead_code)]
42pub enum CacheStrategy {
43 /// Real-time data - 10 seconds TTL
44 RealTime,
45 /// Short-term data - 5 minutes TTL
46 ShortTerm,
47 /// Medium-term data - 1 hour TTL
48 MediumTerm,
49 /// Long-term data - 3 hours TTL
50 LongTerm,
51 /// Custom TTL
52 Custom(Duration),
53 /// Default strategy (5 minutes)
54 Default,
55}
56
57impl CacheStrategy {
58 /// Convert strategy to duration
59 #[must_use]
60 pub fn to_duration(&self) -> Duration {
61 match self {
62 Self::RealTime => Duration::from_secs(10),
63 Self::ShortTerm | Self::Default => Duration::from_secs(300), // 5 minutes
64 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
65 Self::LongTerm => Duration::from_secs(10800), // 3 hours
66 Self::Custom(duration) => *duration,
67 }
68 }
69}
70
71/// Statistics for a single cache tier
72#[derive(Debug)]
73pub struct TierStats {
74 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
75 pub tier_level: usize,
76 /// Number of cache hits at this tier
77 pub hits: AtomicU64,
78 /// Backend name for identification
79 pub backend_name: String,
80}
81
82impl Clone for TierStats {
83 fn clone(&self) -> Self {
84 Self {
85 tier_level: self.tier_level,
86 hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
87 backend_name: self.backend_name.clone(),
88 }
89 }
90}
91
92impl TierStats {
93 fn new(tier_level: usize, backend_name: String) -> Self {
94 Self {
95 tier_level,
96 hits: AtomicU64::new(0),
97 backend_name,
98 }
99 }
100
101 /// Get current hit count
102 pub fn hit_count(&self) -> u64 {
103 self.hits.load(Ordering::Relaxed)
104 }
105}
106
107/// A single cache tier in the multi-tier architecture
108pub struct CacheTier {
109 /// Cache backend for this tier
110 backend: Arc<dyn L2CacheBackend>,
111 /// Tier level (1 = hottest/fastest, higher = colder/slower)
112 tier_level: usize,
113 /// Enable automatic promotion to upper tiers on cache hit
114 promotion_enabled: bool,
115 /// TTL scale factor (multiplier for TTL when storing/promoting)
116 ttl_scale: f64,
117 /// Statistics for this tier
118 stats: TierStats,
119}
120
121impl CacheTier {
122 /// Create a new cache tier
123 pub fn new(
124 backend: Arc<dyn L2CacheBackend>,
125 tier_level: usize,
126 promotion_enabled: bool,
127 ttl_scale: f64,
128 ) -> Self {
129 let backend_name = backend.name().to_string();
130 Self {
131 backend,
132 tier_level,
133 promotion_enabled,
134 ttl_scale,
135 stats: TierStats::new(tier_level, backend_name),
136 }
137 }
138
139 /// Get value with TTL from this tier
140 async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
141 self.backend.get_with_ttl(key).await
142 }
143
144 /// Set value with TTL in this tier
145 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
146 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
147 self.backend.set_with_ttl(key, value, scaled_ttl).await
148 }
149
150 /// Remove value from this tier
151 async fn remove(&self, key: &str) -> Result<()> {
152 self.backend.remove(key).await
153 }
154
155 /// Record a cache hit for this tier
156 fn record_hit(&self) {
157 self.stats.hits.fetch_add(1, Ordering::Relaxed);
158 }
159}
160
161/// Configuration for a cache tier (used in builder pattern)
162#[derive(Debug, Clone)]
163pub struct TierConfig {
164 /// Tier level (1, 2, 3, 4...)
165 pub tier_level: usize,
166 /// Enable promotion to upper tiers on hit
167 pub promotion_enabled: bool,
168 /// TTL scale factor (1.0 = same as base TTL)
169 pub ttl_scale: f64,
170}
171
172impl TierConfig {
173 /// Create new tier configuration
174 #[must_use]
175 pub fn new(tier_level: usize) -> Self {
176 Self {
177 tier_level,
178 promotion_enabled: true,
179 ttl_scale: 1.0,
180 }
181 }
182
183 /// Configure as L1 (hot tier)
184 #[must_use]
185 pub fn as_l1() -> Self {
186 Self {
187 tier_level: 1,
188 promotion_enabled: false, // L1 is already top tier
189 ttl_scale: 1.0,
190 }
191 }
192
193 /// Configure as L2 (warm tier)
194 #[must_use]
195 pub fn as_l2() -> Self {
196 Self {
197 tier_level: 2,
198 promotion_enabled: true,
199 ttl_scale: 1.0,
200 }
201 }
202
203 /// Configure as L3 (cold tier) with longer TTL
204 #[must_use]
205 pub fn as_l3() -> Self {
206 Self {
207 tier_level: 3,
208 promotion_enabled: true,
209 ttl_scale: 2.0, // Keep data 2x longer
210 }
211 }
212
213 /// Configure as L4 (archive tier) with much longer TTL
214 #[must_use]
215 pub fn as_l4() -> Self {
216 Self {
217 tier_level: 4,
218 promotion_enabled: true,
219 ttl_scale: 8.0, // Keep data 8x longer
220 }
221 }
222
223 /// Set promotion enabled
224 #[must_use]
225 pub fn with_promotion(mut self, enabled: bool) -> Self {
226 self.promotion_enabled = enabled;
227 self
228 }
229
230 /// Set TTL scale factor
231 #[must_use]
232 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
233 self.ttl_scale = scale;
234 self
235 }
236
237 /// Set tier level
238 #[must_use]
239 pub fn with_level(mut self, level: usize) -> Self {
240 self.tier_level = level;
241 self
242 }
243}
244
245/// Proxy wrapper to convert `L2CacheBackend` to `CacheBackend`
246/// (Rust doesn't support automatic trait upcasting for trait objects)
247struct ProxyCacheBackend {
248 backend: Arc<dyn L2CacheBackend>,
249}
250
251#[async_trait::async_trait]
252impl CacheBackend for ProxyCacheBackend {
253 async fn get(&self, key: &str) -> Option<serde_json::Value> {
254 self.backend.get(key).await
255 }
256
257 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
258 self.backend.set_with_ttl(key, value, ttl).await
259 }
260
261 async fn remove(&self, key: &str) -> Result<()> {
262 self.backend.remove(key).await
263 }
264
265 async fn health_check(&self) -> bool {
266 self.backend.health_check().await
267 }
268
269 fn name(&self) -> &'static str {
270 self.backend.name()
271 }
272}
273
274/// Cache Manager - Unified operations across multiple cache tiers
275///
276/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
277/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
278/// legacy L1+L2 behavior for backward compatibility.
279pub struct CacheManager {
280 /// Dynamic multi-tier cache architecture (v0.5.0+)
281 /// If Some, this takes precedence over `l1_cache/l2_cache` fields
282 tiers: Option<Vec<CacheTier>>,
283
284 // ===== Legacy fields (v0.1.0 - v0.4.x) =====
285 // Maintained for backward compatibility
286 /// L1 Cache (trait object for pluggable backends)
287 l1_cache: Arc<dyn CacheBackend>,
288 /// L2 Cache (trait object for pluggable backends)
289 l2_cache: Arc<dyn L2CacheBackend>,
290 /// L2 Cache concrete instance (for invalidation `scan_keys`)
291 l2_cache_concrete: Option<Arc<L2Cache>>,
292
293 /// Optional streaming backend (defaults to L2 if it implements `StreamingBackend`)
294 streaming_backend: Option<Arc<dyn StreamingBackend>>,
295 /// Statistics (`AtomicU64` is already thread-safe, no Arc needed)
296 total_requests: AtomicU64,
297 l1_hits: AtomicU64,
298 l2_hits: AtomicU64,
299 misses: AtomicU64,
300 promotions: AtomicUsize,
301 /// In-flight requests to prevent Cache Stampede on L2/compute operations
302 in_flight_requests: Arc<InFlightMap>,
303 /// Invalidation publisher (for broadcasting invalidation messages)
304 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
305 /// Invalidation subscriber (for receiving invalidation messages)
306 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
307 /// Invalidation statistics
308 invalidation_stats: Arc<AtomicInvalidationStats>,
309}
310
311impl CacheManager {
312 /// Create new cache manager with trait objects (pluggable backends)
313 ///
314 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
315 ///
316 /// # Arguments
317 ///
318 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
319 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
320 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
321 ///
322 /// # Example
323 ///
324 /// ```rust,ignore
325 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
326 /// use std::sync::Arc;
327 ///
328 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
329 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
330 ///
331 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
332 /// ```
333 /// # Errors
334 ///
335 /// Returns `Ok` if successful. Currently no error conditions, but kept for future compatibility.
336 pub fn new_with_backends(
337 l1_cache: Arc<dyn CacheBackend>,
338 l2_cache: Arc<dyn L2CacheBackend>,
339 streaming_backend: Option<Arc<dyn StreamingBackend>>,
340 ) -> Result<Self> {
341 debug!("Initializing Cache Manager with custom backends...");
342 debug!(" L1: {}", l1_cache.name());
343 debug!(" L2: {}", l2_cache.name());
344 if streaming_backend.is_some() {
345 debug!(" Streaming: enabled");
346 } else {
347 debug!(" Streaming: disabled");
348 }
349
350 Ok(Self {
351 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
352 l1_cache,
353 l2_cache,
354 l2_cache_concrete: None,
355 streaming_backend,
356 total_requests: AtomicU64::new(0),
357 l1_hits: AtomicU64::new(0),
358 l2_hits: AtomicU64::new(0),
359 misses: AtomicU64::new(0),
360 promotions: AtomicUsize::new(0),
361 in_flight_requests: Arc::new(DashMap::new()),
362 invalidation_publisher: None,
363 invalidation_subscriber: None,
364 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
365 })
366 }
367
368 /// Create new cache manager with default backends (backward compatible)
369 ///
370 /// This is the legacy constructor maintained for backward compatibility.
371 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
372 ///
373 /// # Arguments
374 ///
375 /// * `l1_cache` - Moka L1 cache instance
376 /// * `l2_cache` - Redis L2 cache instance
377 /// # Errors
378 ///
379 /// Returns an error if Redis connection fails.
380 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
381 debug!("Initializing Cache Manager...");
382
383 // Convert concrete types to trait objects
384 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
385 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
386
387 // Create RedisStreams backend for streaming functionality
388 let redis_url =
389 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
390 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
391 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
392
393 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend))
394 }
395
396 /// Create new cache manager with invalidation support
397 ///
398 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
399 ///
400 /// # Arguments
401 ///
402 /// * `l1_cache` - Moka L1 cache instance
403 /// * `l2_cache` - Redis L2 cache instance
404 /// * `redis_url` - Redis connection URL for Pub/Sub
405 /// * `config` - Invalidation configuration
406 ///
407 /// # Example
408 ///
409 /// ```rust,ignore
410 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
411 ///
412 /// let config = InvalidationConfig {
413 /// channel: "my_app:cache:invalidate".to_string(),
414 /// ..Default::default()
415 /// };
416 ///
417 /// let manager = CacheManager::new_with_invalidation(
418 /// l1, l2, "redis://localhost", config
419 /// ).await?;
420 /// ```
421 /// # Errors
422 ///
423 /// Returns an error if Redis connection fails or invalidation setup fails.
424 pub async fn new_with_invalidation(
425 l1_cache: Arc<L1Cache>,
426 l2_cache: Arc<L2Cache>,
427 redis_url: &str,
428 config: InvalidationConfig,
429 ) -> Result<Self> {
430 debug!("Initializing Cache Manager with Invalidation...");
431 debug!(" Pub/Sub channel: {}", config.channel);
432
433 // Convert concrete types to trait objects
434 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
435 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
436
437 // Create RedisStreams backend for streaming functionality
438 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
439 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
440
441 // Create publisher
442 let client = redis::Client::open(redis_url)?;
443 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
444 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
445
446 // Create subscriber
447 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
448 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
449
450 let manager = Self {
451 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
452 l1_cache: l1_backend,
453 l2_cache: l2_backend,
454 l2_cache_concrete: Some(l2_cache),
455 streaming_backend: Some(streaming_backend),
456 total_requests: AtomicU64::new(0),
457 l1_hits: AtomicU64::new(0),
458 l2_hits: AtomicU64::new(0),
459 misses: AtomicU64::new(0),
460 promotions: AtomicUsize::new(0),
461 in_flight_requests: Arc::new(DashMap::new()),
462 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
463 invalidation_subscriber: Some(Arc::new(subscriber)),
464 invalidation_stats,
465 };
466
467 // Start subscriber with handler
468 manager.start_invalidation_subscriber();
469
470 info!("Cache Manager initialized with invalidation support");
471
472 Ok(manager)
473 }
474
475 /// Create new cache manager with multi-tier architecture (v0.5.0+)
476 ///
477 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
478 /// Tiers are checked in order (lower `tier_level` = faster/hotter).
479 ///
480 /// # Arguments
481 ///
482 /// * `tiers` - Vector of configured cache tiers (must be sorted by `tier_level` ascending)
483 /// * `streaming_backend` - Optional streaming backend
484 ///
485 /// # Example
486 ///
487 /// ```rust,ignore
488 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
489 /// use std::sync::Arc;
490 ///
491 /// // L1 + L2 + L3 setup
492 /// let l1 = Arc::new(L1Cache::new()?);
493 /// let l2 = Arc::new(L2Cache::new().await?);
494 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
495 ///
496 /// let tiers = vec![
497 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
498 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
499 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
500 /// ];
501 ///
502 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
503 /// ```
504 /// # Errors
505 ///
506 /// Returns an error if tiers are not sorted by level or if no tiers are provided.
507 pub fn new_with_tiers(
508 tiers: Vec<CacheTier>,
509 streaming_backend: Option<Arc<dyn StreamingBackend>>,
510 ) -> Result<Self> {
511 debug!("Initializing Multi-Tier Cache Manager...");
512 debug!(" Tier count: {}", tiers.len());
513 for tier in &tiers {
514 debug!(
515 " L{}: {} (promotion={}, ttl_scale={})",
516 tier.tier_level, tier.stats.backend_name, tier.promotion_enabled, tier.ttl_scale
517 );
518 }
519
520 // Validate tiers are sorted by level
521 for i in 1..tiers.len() {
522 if let (Some(current), Some(prev)) = (tiers.get(i), tiers.get(i - 1)) {
523 if current.tier_level <= prev.tier_level {
524 anyhow::bail!(
525 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
526 current.tier_level,
527 prev.tier_level
528 );
529 }
530 }
531 }
532
533 // For backward compatibility with legacy code, we need dummy l1/l2 caches
534 // Use first tier as l1, second tier as l2 if available
535 let (l1_cache, l2_cache) = if tiers.len() >= 2 {
536 if let (Some(t0), Some(t1)) = (tiers.first(), tiers.get(1)) {
537 (t0.backend.clone(), t1.backend.clone())
538 } else {
539 // Should be unreachable due to len check
540 anyhow::bail!("Failed to access tiers 0 and 1");
541 }
542 } else if tiers.len() == 1 {
543 // Only one tier - use it for both
544 if let Some(t0) = tiers.first() {
545 let tier = t0.backend.clone();
546 (tier.clone(), tier)
547 } else {
548 anyhow::bail!("Failed to access tier 0");
549 }
550 } else {
551 anyhow::bail!("At least one cache tier is required");
552 };
553
554 // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
555 let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
556 backend: l1_cache.clone(),
557 });
558
559 Ok(Self {
560 tiers: Some(tiers),
561 l1_cache: l1_backend,
562 l2_cache,
563 l2_cache_concrete: None,
564 streaming_backend,
565 total_requests: AtomicU64::new(0),
566 l1_hits: AtomicU64::new(0),
567 l2_hits: AtomicU64::new(0),
568 misses: AtomicU64::new(0),
569 promotions: AtomicUsize::new(0),
570 in_flight_requests: Arc::new(DashMap::new()),
571 invalidation_publisher: None,
572 invalidation_subscriber: None,
573 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
574 })
575 }
576
577 /// Start the invalidation subscriber background task
578 fn start_invalidation_subscriber(&self) {
579 if let Some(subscriber) = &self.invalidation_subscriber {
580 let l1_cache = Arc::clone(&self.l1_cache);
581 let l2_cache_concrete = self.l2_cache_concrete.clone();
582
583 subscriber.start(move |msg| {
584 let l1 = Arc::clone(&l1_cache);
585 let _l2 = l2_cache_concrete.clone();
586
587 async move {
588 match msg {
589 InvalidationMessage::Remove { key } => {
590 // Remove from L1
591 l1.remove(&key).await?;
592 debug!("Invalidation: Removed '{}' from L1", key);
593 }
594 InvalidationMessage::Update {
595 key,
596 value,
597 ttl_secs,
598 } => {
599 // Update L1 with new value
600 let ttl = ttl_secs
601 .map_or_else(|| Duration::from_secs(300), Duration::from_secs);
602 l1.set_with_ttl(&key, value, ttl).await?;
603 debug!("Invalidation: Updated '{}' in L1", key);
604 }
605 InvalidationMessage::RemovePattern { pattern } => {
606 // Remove matching keys from L1
607 if let Err(e) = l1.remove_pattern(&pattern).await {
608 warn!("Failed to remove pattern '{}' from L1: {}", pattern, e);
609 }
610 debug!("Invalidation: Pattern '{}' removed from L1", pattern);
611 }
612 InvalidationMessage::RemoveBulk { keys } => {
613 // Remove multiple keys from L1
614 for key in keys {
615 if let Err(e) = l1.remove(&key).await {
616 warn!("Failed to remove '{}' from L1: {}", key, e);
617 }
618 }
619 debug!("Invalidation: Bulk removed keys from L1");
620 }
621 }
622 Ok(())
623 }
624 });
625
626 info!("Invalidation subscriber started");
627 }
628 }
629
630 /// Get value from cache using multi-tier architecture (v0.5.0+)
631 ///
632 /// This method iterates through all configured tiers and automatically promotes
633 /// to upper tiers on cache hit.
634 async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
635 let Some(tiers) = self.tiers.as_ref() else {
636 panic!("Tiers must be initialized in multi-tier mode")
637 }; // Safe: only called when tiers is Some
638
639 // Try each tier sequentially (sorted by tier_level)
640 for (tier_index, tier) in tiers.iter().enumerate() {
641 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
642 // Cache hit!
643 tier.record_hit();
644
645 // Promote to all upper tiers (if promotion enabled)
646 if tier.promotion_enabled && tier_index > 0 {
647 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
648
649 // Promote to all tiers above this one
650 for upper_tier in tiers.iter().take(tier_index).rev() {
651 if let Err(e) = upper_tier
652 .set_with_ttl(key, value.clone(), promotion_ttl)
653 .await
654 {
655 warn!(
656 "Failed to promote '{}' from L{} to L{}: {}",
657 key, tier.tier_level, upper_tier.tier_level, e
658 );
659 } else {
660 self.promotions.fetch_add(1, Ordering::Relaxed);
661 debug!(
662 "Promoted '{}' from L{} to L{} (TTL: {:?})",
663 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
664 );
665 }
666 }
667 }
668
669 return Ok(Some(value));
670 }
671 }
672
673 // Cache miss across all tiers
674 Ok(None)
675 }
676
677 /// Get value from cache (L1 first, then L2 fallback with promotion)
678 ///
679 /// This method now includes built-in Cache Stampede protection when cache misses occur.
680 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
681 /// unnecessary duplicate work on external data sources.
682 ///
683 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
684 ///
685 /// # Arguments
686 /// * `key` - Cache key to retrieve
687 ///
688 /// # Returns
689 /// * `Ok(Some(value))` - Cache hit, value found in any tier
690 /// * `Ok(None)` - Cache miss, value not found in any cache
691 /// * `Err(error)` - Cache operation failed
692 /// # Errors
693 ///
694 /// Returns an error if cache operation fails.
695 ///
696 /// # Panics
697 ///
698 /// Panics if tiers are not initialized in multi-tier mode (should not happen if constructed correctly).
699 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
700 self.total_requests.fetch_add(1, Ordering::Relaxed);
701
702 // NEW: Multi-tier mode (v0.5.0+)
703 if self.tiers.is_some() {
704 // Fast path for L1 (first tier) - no locking needed
705 if let Some(tier1) = self
706 .tiers
707 .as_ref()
708 .unwrap_or_else(|| panic!("Tiers initialized"))
709 .first()
710 {
711 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
712 tier1.record_hit();
713 // Update legacy stats for backward compatibility
714 self.l1_hits.fetch_add(1, Ordering::Relaxed);
715 return Ok(Some(value));
716 }
717 }
718
719 // L1 miss - use stampede protection for lower tiers
720 let key_owned = key.to_string();
721 let lock_guard = self
722 .in_flight_requests
723 .entry(key_owned.clone())
724 .or_insert_with(|| Arc::new(Mutex::new(())))
725 .clone();
726
727 let _guard = lock_guard.lock().await;
728 let cleanup_guard = CleanupGuard {
729 map: &self.in_flight_requests,
730 key: key_owned.clone(),
731 };
732
733 // Double-check L1 after acquiring lock
734 if let Some(tier1) = self
735 .tiers
736 .as_ref()
737 .unwrap_or_else(|| panic!("Tiers initialized"))
738 .first()
739 {
740 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
741 tier1.record_hit();
742 self.l1_hits.fetch_add(1, Ordering::Relaxed);
743 return Ok(Some(value));
744 }
745 }
746
747 // Check remaining tiers with promotion
748 let result = self.get_multi_tier(key).await?;
749
750 if result.is_some() {
751 // Hit in L2+ tier - update legacy stats
752 if self
753 .tiers
754 .as_ref()
755 .unwrap_or_else(|| panic!("Tiers initialized"))
756 .len()
757 >= 2
758 {
759 self.l2_hits.fetch_add(1, Ordering::Relaxed);
760 }
761 } else {
762 self.misses.fetch_add(1, Ordering::Relaxed);
763 }
764
765 drop(cleanup_guard);
766 return Ok(result);
767 }
768
769 // LEGACY: 2-tier mode (L1 + L2)
770 // Fast path: Try L1 first (no locking needed)
771 if let Some(value) = self.l1_cache.get(key).await {
772 self.l1_hits.fetch_add(1, Ordering::Relaxed);
773 return Ok(Some(value));
774 }
775
776 // L1 miss - implement Cache Stampede protection for L2 lookup
777 let key_owned = key.to_string();
778 let lock_guard = self
779 .in_flight_requests
780 .entry(key_owned.clone())
781 .or_insert_with(|| Arc::new(Mutex::new(())))
782 .clone();
783
784 let _guard = lock_guard.lock().await;
785
786 // RAII cleanup guard - ensures entry is removed even on early return or panic
787 let cleanup_guard = CleanupGuard {
788 map: &self.in_flight_requests,
789 key: key_owned.clone(),
790 };
791
792 // Double-check L1 cache after acquiring lock
793 // (Another concurrent request might have populated it while we were waiting)
794 if let Some(value) = self.l1_cache.get(key).await {
795 self.l1_hits.fetch_add(1, Ordering::Relaxed);
796 // cleanup_guard will auto-remove entry on drop
797 return Ok(Some(value));
798 }
799
800 // Check L2 cache with TTL information
801 if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
802 self.l2_hits.fetch_add(1, Ordering::Relaxed);
803
804 // Promote to L1 with same TTL as Redis (or default if no TTL)
805 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
806
807 if self
808 .l1_cache
809 .set_with_ttl(key, value.clone(), promotion_ttl)
810 .await
811 .is_err()
812 {
813 // L1 promotion failed, but we still have the data
814 warn!("Failed to promote key '{}' to L1 cache", key);
815 } else {
816 self.promotions.fetch_add(1, Ordering::Relaxed);
817 debug!(
818 "Promoted '{}' from L2 to L1 with TTL {:?} (via get)",
819 key, promotion_ttl
820 );
821 }
822
823 // cleanup_guard will auto-remove entry on drop
824 return Ok(Some(value));
825 }
826
827 // Both L1 and L2 miss
828 self.misses.fetch_add(1, Ordering::Relaxed);
829
830 // cleanup_guard will auto-remove entry on drop
831 drop(cleanup_guard);
832
833 Ok(None)
834 }
835
836 /// Set value with specific cache strategy (all tiers)
837 ///
838 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
839 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
840 /// # Errors
841 ///
842 /// Returns an error if cache set operation fails.
843 pub async fn set_with_strategy(
844 &self,
845 key: &str,
846 value: serde_json::Value,
847 strategy: CacheStrategy,
848 ) -> Result<()> {
849 let ttl = strategy.to_duration();
850
851 // NEW: Multi-tier mode (v0.5.0+)
852 if let Some(tiers) = &self.tiers {
853 // Store in ALL tiers with their respective TTL scaling
854 let mut success_count = 0;
855 let mut last_error = None;
856
857 for tier in tiers {
858 match tier.set_with_ttl(key, value.clone(), ttl).await {
859 Ok(()) => {
860 success_count += 1;
861 }
862 Err(e) => {
863 error!(
864 "L{} cache set failed for key '{}': {}",
865 tier.tier_level, key, e
866 );
867 last_error = Some(e);
868 }
869 }
870 }
871
872 if success_count > 0 {
873 debug!(
874 "[Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
875 key,
876 success_count,
877 tiers.len(),
878 ttl
879 );
880 return Ok(());
881 }
882 return Err(
883 last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{key}'"))
884 );
885 }
886
887 // LEGACY: 2-tier mode (L1 + L2)
888 // Store in both L1 and L2
889 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
890 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
891
892 // Return success if at least one cache succeeded
893 match (l1_result, l2_result) {
894 (Ok(()), Ok(())) => {
895 // Both succeeded
896 debug!("[L1+L2] Cached '{}' with TTL {:?}", key, ttl);
897 Ok(())
898 }
899 (Ok(()), Err(_)) => {
900 // L1 succeeded, L2 failed
901 warn!("L2 cache set failed for key '{}', continuing with L1", key);
902 debug!("[L1] Cached '{}' with TTL {:?}", key, ttl);
903 Ok(())
904 }
905 (Err(_), Ok(())) => {
906 // L1 failed, L2 succeeded
907 warn!("L1 cache set failed for key '{}', continuing with L2", key);
908 debug!("[L2] Cached '{}' with TTL {:?}", key, ttl);
909 Ok(())
910 }
911 (Err(e1), Err(_e2)) => {
912 // Both failed
913 Err(anyhow::anyhow!(
914 "Both L1 and L2 cache set failed for key '{key}': {e1}"
915 ))
916 }
917 }
918 }
919
920 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
921 ///
922 /// This method provides comprehensive Cache Stampede protection:
923 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
924 /// 2. Check L2 cache with mutex-based coalescing
925 /// 3. Compute fresh data with protection against concurrent computations
926 ///
927 /// # Arguments
928 /// * `key` - Cache key
929 /// * `strategy` - Cache strategy for TTL and storage behavior
930 /// * `compute_fn` - Async function to compute the value if not in any cache
931 ///
932 /// # Example
933 /// ```ignore
934 /// let api_data = cache_manager.get_or_compute_with(
935 /// "api_response",
936 /// CacheStrategy::RealTime,
937 /// || async {
938 /// fetch_data_from_api().await
939 /// }
940 /// ).await?;
941 /// ```
942 #[allow(dead_code)]
943 /// # Errors
944 ///
945 /// Returns an error if compute function fails or cache operations fail.
946 pub async fn get_or_compute_with<F, Fut>(
947 &self,
948 key: &str,
949 strategy: CacheStrategy,
950 compute_fn: F,
951 ) -> Result<serde_json::Value>
952 where
953 F: FnOnce() -> Fut + Send,
954 Fut: Future<Output = Result<serde_json::Value>> + Send,
955 {
956 self.total_requests.fetch_add(1, Ordering::Relaxed);
957
958 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
959 if let Some(value) = self.l1_cache.get(key).await {
960 self.l1_hits.fetch_add(1, Ordering::Relaxed);
961 return Ok(value);
962 }
963
964 // 2. L1 miss - try L2 with Cache Stampede protection
965 let key_owned = key.to_string();
966 let lock_guard = self
967 .in_flight_requests
968 .entry(key_owned.clone())
969 .or_insert_with(|| Arc::new(Mutex::new(())))
970 .clone();
971
972 let _guard = lock_guard.lock().await;
973
974 // RAII cleanup guard - ensures entry is removed even on early return or panic
975 let _cleanup_guard = CleanupGuard {
976 map: &self.in_flight_requests,
977 key: key_owned,
978 };
979
980 // 3. Double-check L1 cache after acquiring lock
981 // (Another request might have populated it while we were waiting)
982 if let Some(value) = self.l1_cache.get(key).await {
983 self.l1_hits.fetch_add(1, Ordering::Relaxed);
984 // _cleanup_guard will auto-remove entry on drop
985 return Ok(value);
986 }
987
988 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
989 if let Some(tiers) = &self.tiers {
990 // Check tiers starting from index 1 (skip L1 since already checked)
991 for tier in tiers.iter().skip(1) {
992 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
993 tier.record_hit();
994
995 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
996
997 // Promote to L1 (first tier)
998 if let Some(l1_tier) = tiers.first() {
999 if let Err(e) = l1_tier
1000 .set_with_ttl(key, value.clone(), promotion_ttl)
1001 .await
1002 {
1003 warn!(
1004 "Failed to promote '{}' from L{} to L1: {}",
1005 key, tier.tier_level, e
1006 );
1007 } else {
1008 self.promotions.fetch_add(1, Ordering::Relaxed);
1009 debug!(
1010 "Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1011 key, tier.tier_level, promotion_ttl
1012 );
1013 }
1014 }
1015
1016 // _cleanup_guard will auto-remove entry on drop
1017 return Ok(value);
1018 }
1019 }
1020 } else {
1021 // LEGACY: Check L2 cache with TTL
1022 if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1023 self.l2_hits.fetch_add(1, Ordering::Relaxed);
1024
1025 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1026 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1027
1028 if let Err(e) = self
1029 .l1_cache
1030 .set_with_ttl(key, value.clone(), promotion_ttl)
1031 .await
1032 {
1033 warn!("Failed to promote key '{}' to L1: {}", key, e);
1034 } else {
1035 self.promotions.fetch_add(1, Ordering::Relaxed);
1036 debug!(
1037 "Promoted '{}' from L2 to L1 with TTL {:?}",
1038 key, promotion_ttl
1039 );
1040 }
1041
1042 // _cleanup_guard will auto-remove entry on drop
1043 return Ok(value);
1044 }
1045 }
1046
1047 // 5. Cache miss across all tiers - compute fresh data
1048 debug!(
1049 "Computing fresh data for key: '{}' (Cache Stampede protected)",
1050 key
1051 );
1052 let fresh_data = compute_fn().await?;
1053
1054 // 6. Store in both caches
1055 if let Err(e) = self
1056 .set_with_strategy(key, fresh_data.clone(), strategy)
1057 .await
1058 {
1059 warn!("Failed to cache computed data for key '{}': {}", key, e);
1060 }
1061
1062 // 7. _cleanup_guard will auto-remove entry on drop
1063
1064 Ok(fresh_data)
1065 }
1066
1067 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
1068 ///
1069 /// This method provides the same functionality as `get_or_compute_with()` but with
1070 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
1071 /// API calls, or any computation that returns structured data.
1072 ///
1073 /// # Type Safety
1074 ///
1075 /// - Returns your actual type `T` instead of `serde_json::Value`
1076 /// - Compiler enforces Serialize + `DeserializeOwned` bounds
1077 /// - No manual JSON conversion needed
1078 ///
1079 /// # Cache Flow
1080 ///
1081 /// 1. Check L1 cache → deserialize if found
1082 /// 2. Check L2 cache → deserialize + promote to L1 if found
1083 /// 3. Execute `compute_fn` → serialize → store in L1+L2
1084 /// 4. Full stampede protection (only ONE request computes)
1085 ///
1086 /// # Arguments
1087 ///
1088 /// * `key` - Cache key
1089 /// * `strategy` - Cache strategy for TTL
1090 /// * `compute_fn` - Async function returning `Result<T>`
1091 ///
1092 /// # Example - Database Query
1093 ///
1094 /// ```no_run
1095 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1096 /// # use std::sync::Arc;
1097 /// # use serde::{Serialize, Deserialize};
1098 /// # async fn example() -> anyhow::Result<()> {
1099 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1100 /// # let l2 = Arc::new(L2Cache::new().await?);
1101 /// # let cache_manager = CacheManager::new(l1, l2);
1102 ///
1103 /// #[derive(Serialize, Deserialize)]
1104 /// struct User {
1105 /// id: i64,
1106 /// name: String,
1107 /// }
1108 ///
1109 /// // Type-safe database caching (example - requires sqlx)
1110 /// // let user: User = cache_manager.get_or_compute_typed(
1111 /// // "user:123",
1112 /// // CacheStrategy::MediumTerm,
1113 /// // || async {
1114 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1115 /// // .bind(123)
1116 /// // .fetch_one(&pool)
1117 /// // .await
1118 /// // }
1119 /// // ).await?;
1120 /// # Ok(())
1121 /// # }
1122 /// ```
1123 ///
1124 /// # Example - API Call
1125 ///
1126 /// ```no_run
1127 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache, MokaCacheConfig};
1128 /// # use std::sync::Arc;
1129 /// # use serde::{Serialize, Deserialize};
1130 /// # async fn example() -> anyhow::Result<()> {
1131 /// # let l1 = Arc::new(L1Cache::new(MokaCacheConfig::default())?);
1132 /// # let l2 = Arc::new(L2Cache::new().await?);
1133 /// # let cache_manager = CacheManager::new(l1, l2);
1134 /// #[derive(Serialize, Deserialize)]
1135 /// struct ApiResponse {
1136 /// data: String,
1137 /// timestamp: i64,
1138 /// }
1139 ///
1140 /// // API call caching (example - requires reqwest)
1141 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1142 /// // "api:endpoint",
1143 /// // CacheStrategy::RealTime,
1144 /// // || async {
1145 /// // reqwest::get("https://api.example.com/data")
1146 /// // .await?
1147 /// // .json::<ApiResponse>()
1148 /// // .await
1149 /// // }
1150 /// // ).await?;
1151 /// # Ok(())
1152 /// # }
1153 /// ```
1154 ///
1155 /// # Performance
1156 ///
1157 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1158 /// - L2 hit: 2-5ms + deserialization + L1 promotion
1159 /// - Compute: Your function time + serialization + L1+L2 storage
1160 /// - Stampede protection: 99.6% latency reduction under high concurrency
1161 ///
1162 /// # Errors
1163 ///
1164 /// Returns error if:
1165 /// - Compute function fails
1166 /// - Serialization fails (invalid type for JSON)
1167 /// - Deserialization fails (cache data doesn't match type T)
1168 /// - Cache operations fail (Redis connection issues)
1169 #[allow(clippy::too_many_lines)]
1170 pub async fn get_or_compute_typed<T, F, Fut>(
1171 &self,
1172 key: &str,
1173 strategy: CacheStrategy,
1174 compute_fn: F,
1175 ) -> Result<T>
1176 where
1177 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1178 F: FnOnce() -> Fut + Send,
1179 Fut: Future<Output = Result<T>> + Send,
1180 {
1181 self.total_requests.fetch_add(1, Ordering::Relaxed);
1182
1183 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1184 if let Some(cached_json) = self.l1_cache.get(key).await {
1185 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1186
1187 // Attempt to deserialize from JSON to type T
1188 match serde_json::from_value::<T>(cached_json) {
1189 Ok(typed_value) => {
1190 debug!(
1191 "[L1 HIT] Deserialized '{}' to type {}",
1192 key,
1193 std::any::type_name::<T>()
1194 );
1195 return Ok(typed_value);
1196 }
1197 Err(e) => {
1198 // Deserialization failed - cache data may be stale or corrupt
1199 warn!(
1200 "L1 cache deserialization failed for key '{}': {}. Will recompute.",
1201 key, e
1202 );
1203 // Fall through to recompute
1204 }
1205 }
1206 }
1207
1208 // 2. L1 miss - try L2 with Cache Stampede protection
1209 let key_owned = key.to_string();
1210 let lock_guard = self
1211 .in_flight_requests
1212 .entry(key_owned.clone())
1213 .or_insert_with(|| Arc::new(Mutex::new(())))
1214 .clone();
1215
1216 let _guard = lock_guard.lock().await;
1217
1218 // RAII cleanup guard - ensures entry is removed even on early return or panic
1219 let _cleanup_guard = CleanupGuard {
1220 map: &self.in_flight_requests,
1221 key: key_owned,
1222 };
1223
1224 // 3. Double-check L1 cache after acquiring lock
1225 // (Another request might have populated it while we were waiting)
1226 if let Some(cached_json) = self.l1_cache.get(key).await {
1227 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1228 if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1229 debug!("[L1 HIT] Deserialized '{}' after lock acquisition", key);
1230 return Ok(typed_value);
1231 }
1232 }
1233
1234 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1235 if let Some(tiers) = &self.tiers {
1236 // Check tiers starting from index 1 (skip L1 since already checked)
1237 for tier in tiers.iter().skip(1) {
1238 if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1239 tier.record_hit();
1240
1241 // Attempt to deserialize
1242 match serde_json::from_value::<T>(cached_json.clone()) {
1243 Ok(typed_value) => {
1244 debug!(
1245 "[L{} HIT] Deserialized '{}' to type {}",
1246 tier.tier_level,
1247 key,
1248 std::any::type_name::<T>()
1249 );
1250
1251 // Promote to L1 (first tier)
1252 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1253 if let Some(tier) = tiers.first() {
1254 if let Err(e) =
1255 tier.set_with_ttl(key, cached_json, promotion_ttl).await
1256 {
1257 warn!(
1258 "Failed to promote '{}' from L{} to L1: {}",
1259 key, tier.tier_level, e
1260 );
1261 } else {
1262 self.promotions.fetch_add(1, Ordering::Relaxed);
1263 debug!("Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1264 key, tier.tier_level, promotion_ttl);
1265 }
1266 }
1267
1268 return Ok(typed_value);
1269 }
1270 Err(e) => {
1271 warn!("L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1272 tier.tier_level, key, e);
1273 // Continue to next tier
1274 }
1275 }
1276 }
1277 }
1278 } else {
1279 // LEGACY: Check L2 cache with TTL
1280 if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1281 self.l2_hits.fetch_add(1, Ordering::Relaxed);
1282
1283 // Attempt to deserialize
1284 match serde_json::from_value::<T>(cached_json.clone()) {
1285 Ok(typed_value) => {
1286 debug!("[L2 HIT] Deserialized '{}' from Redis", key);
1287
1288 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1289 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1290
1291 if let Err(e) = self
1292 .l1_cache
1293 .set_with_ttl(key, cached_json, promotion_ttl)
1294 .await
1295 {
1296 warn!("Failed to promote key '{}' to L1: {}", key, e);
1297 } else {
1298 self.promotions.fetch_add(1, Ordering::Relaxed);
1299 debug!(
1300 "Promoted '{}' from L2 to L1 with TTL {:?}",
1301 key, promotion_ttl
1302 );
1303 }
1304
1305 return Ok(typed_value);
1306 }
1307 Err(e) => {
1308 warn!(
1309 "L2 cache deserialization failed for key '{}': {}. Will recompute.",
1310 key, e
1311 );
1312 // Fall through to recompute
1313 }
1314 }
1315 }
1316 }
1317
1318 // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1319 debug!(
1320 "Computing fresh typed data for key: '{}' (Cache Stampede protected)",
1321 key
1322 );
1323 let typed_value = compute_fn().await?;
1324
1325 // 6. Serialize to JSON for storage
1326 let json_value = serde_json::to_value(&typed_value).map_err(|e| {
1327 anyhow::anyhow!(
1328 "Failed to serialize type {} for caching: {}",
1329 std::any::type_name::<T>(),
1330 e
1331 )
1332 })?;
1333
1334 // 7. Store in both L1 and L2 caches
1335 if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1336 warn!(
1337 "Failed to cache computed typed data for key '{}': {}",
1338 key, e
1339 );
1340 } else {
1341 debug!(
1342 "Cached typed value for '{}' (type: {})",
1343 key,
1344 std::any::type_name::<T>()
1345 );
1346 }
1347
1348 // 8. _cleanup_guard will auto-remove entry on drop
1349
1350 Ok(typed_value)
1351 }
1352
1353 /// Get comprehensive cache statistics
1354 ///
1355 /// In multi-tier mode, aggregates statistics from all tiers.
1356 /// In legacy mode, returns L1 and L2 stats.
1357 #[allow(dead_code)]
1358 pub fn get_stats(&self) -> CacheManagerStats {
1359 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1360 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1361 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1362 let misses = self.misses.load(Ordering::Relaxed);
1363
1364 CacheManagerStats {
1365 total_requests: total_reqs,
1366 l1_hits,
1367 l2_hits,
1368 total_hits: l1_hits + l2_hits,
1369 misses,
1370 hit_rate: if total_reqs > 0 {
1371 #[allow(clippy::cast_precision_loss)]
1372 {
1373 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1374 }
1375 } else {
1376 0.0
1377 },
1378 l1_hit_rate: if total_reqs > 0 {
1379 #[allow(clippy::cast_precision_loss)]
1380 {
1381 (l1_hits as f64 / total_reqs as f64) * 100.0
1382 }
1383 } else {
1384 0.0
1385 },
1386 promotions: self.promotions.load(Ordering::Relaxed),
1387 in_flight_requests: self.in_flight_requests.len(),
1388 }
1389 }
1390
1391 /// Get per-tier statistics (v0.5.0+)
1392 ///
1393 /// Returns statistics for each tier if multi-tier mode is enabled.
1394 /// Returns None if using legacy 2-tier mode.
1395 ///
1396 /// # Example
1397 /// ```rust,ignore
1398 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1399 /// for stats in tier_stats {
1400 /// println!("L{}: {} hits ({})",
1401 /// stats.tier_level,
1402 /// stats.hit_count(),
1403 /// stats.backend_name);
1404 /// }
1405 /// }
1406 /// ```
1407 pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1408 self.tiers
1409 .as_ref()
1410 .map(|tiers| tiers.iter().map(|tier| tier.stats.clone()).collect())
1411 }
1412
1413 // ===== Redis Streams Methods =====
1414
1415 /// Publish data to Redis Stream
1416 ///
1417 /// # Arguments
1418 /// * `stream_key` - Name of the stream (e.g., "`events_stream`")
1419 /// * `fields` - Field-value pairs to publish
1420 /// * `maxlen` - Optional max length for stream trimming
1421 ///
1422 /// # Returns
1423 /// The entry ID generated by Redis
1424 ///
1425 /// # Errors
1426 /// Returns error if streaming backend is not configured
1427 pub async fn publish_to_stream(
1428 &self,
1429 stream_key: &str,
1430 fields: Vec<(String, String)>,
1431 maxlen: Option<usize>,
1432 ) -> Result<String> {
1433 match &self.streaming_backend {
1434 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1435 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1436 }
1437 }
1438
1439 /// Read latest entries from Redis Stream
1440 ///
1441 /// # Arguments
1442 /// * `stream_key` - Name of the stream
1443 /// * `count` - Number of latest entries to retrieve
1444 ///
1445 /// # Returns
1446 /// Vector of (`entry_id`, fields) tuples (newest first)
1447 ///
1448 /// # Errors
1449 /// Returns error if streaming backend is not configured
1450 pub async fn read_stream_latest(
1451 &self,
1452 stream_key: &str,
1453 count: usize,
1454 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1455 match &self.streaming_backend {
1456 Some(backend) => backend.stream_read_latest(stream_key, count).await,
1457 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1458 }
1459 }
1460
1461 /// Read from Redis Stream with optional blocking
1462 ///
1463 /// # Arguments
1464 /// * `stream_key` - Name of the stream
1465 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1466 /// * `count` - Max entries to retrieve
1467 /// * `block_ms` - Optional blocking timeout in ms
1468 ///
1469 /// # Returns
1470 /// Vector of (`entry_id`, fields) tuples
1471 ///
1472 /// # Errors
1473 /// Returns error if streaming backend is not configured
1474 pub async fn read_stream(
1475 &self,
1476 stream_key: &str,
1477 last_id: &str,
1478 count: usize,
1479 block_ms: Option<usize>,
1480 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1481 match &self.streaming_backend {
1482 Some(backend) => {
1483 backend
1484 .stream_read(stream_key, last_id, count, block_ms)
1485 .await
1486 }
1487 None => Err(anyhow::anyhow!("Streaming backend not configured")),
1488 }
1489 }
1490
1491 // ===== Cache Invalidation Methods =====
1492
1493 /// Invalidate a cache key across all instances
1494 ///
1495 /// This removes the key from all cache tiers and broadcasts
1496 /// the invalidation to all other cache instances via Redis Pub/Sub.
1497 ///
1498 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1499 ///
1500 /// # Arguments
1501 /// * `key` - Cache key to invalidate
1502 ///
1503 /// # Example
1504 /// ```rust,no_run
1505 /// # use multi_tier_cache::CacheManager;
1506 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1507 /// // Invalidate user cache after profile update
1508 /// cache_manager.invalidate("user:123").await?;
1509 /// # Ok(())
1510 /// # }
1511 /// ```
1512 /// # Errors
1513 ///
1514 /// Returns an error if invalidation fails.
1515 pub async fn invalidate(&self, key: &str) -> Result<()> {
1516 // NEW: Multi-tier mode (v0.5.0+)
1517 if let Some(tiers) = &self.tiers {
1518 // Remove from ALL tiers
1519 for tier in tiers {
1520 if let Err(e) = tier.remove(key).await {
1521 warn!(
1522 "Failed to remove '{}' from L{}: {}",
1523 key, tier.tier_level, e
1524 );
1525 }
1526 }
1527 } else {
1528 // LEGACY: 2-tier mode
1529 self.l1_cache.remove(key).await?;
1530 self.l2_cache.remove(key).await?;
1531 }
1532
1533 // Broadcast to other instances
1534 if let Some(publisher) = &self.invalidation_publisher {
1535 let mut pub_lock = publisher.lock().await;
1536 let msg = InvalidationMessage::remove(key);
1537 pub_lock.publish(&msg).await?;
1538 self.invalidation_stats
1539 .messages_sent
1540 .fetch_add(1, Ordering::Relaxed);
1541 }
1542
1543 debug!("Invalidated '{}' across all instances", key);
1544 Ok(())
1545 }
1546
1547 /// Update cache value across all instances
1548 ///
1549 /// This updates the key in all cache tiers and broadcasts
1550 /// the update to all other cache instances, avoiding cache misses.
1551 ///
1552 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1553 ///
1554 /// # Arguments
1555 /// * `key` - Cache key to update
1556 /// * `value` - New value
1557 /// * `ttl` - Optional TTL (uses default if None)
1558 ///
1559 /// # Example
1560 /// ```rust,no_run
1561 /// # use multi_tier_cache::CacheManager;
1562 /// # use std::time::Duration;
1563 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1564 /// // Update user cache with new data
1565 /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1566 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1567 /// # Ok(())
1568 /// # }
1569 /// ```
1570 /// # Errors
1571 ///
1572 /// Returns an error if cache update fails.
1573 pub async fn update_cache(
1574 &self,
1575 key: &str,
1576 value: serde_json::Value,
1577 ttl: Option<Duration>,
1578 ) -> Result<()> {
1579 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1580
1581 // NEW: Multi-tier mode (v0.5.0+)
1582 if let Some(tiers) = &self.tiers {
1583 // Update ALL tiers with their respective TTL scaling
1584 for tier in tiers {
1585 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1586 warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1587 }
1588 }
1589 } else {
1590 // LEGACY: 2-tier mode
1591 self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1592 self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1593 }
1594
1595 // Broadcast update to other instances
1596 if let Some(publisher) = &self.invalidation_publisher {
1597 let mut pub_lock = publisher.lock().await;
1598 let msg = InvalidationMessage::update(key, value, Some(ttl));
1599 pub_lock.publish(&msg).await?;
1600 self.invalidation_stats
1601 .messages_sent
1602 .fetch_add(1, Ordering::Relaxed);
1603 }
1604
1605 debug!("Updated '{}' across all instances", key);
1606 Ok(())
1607 }
1608
1609 /// Invalidate all keys matching a pattern
1610 ///
1611 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1612 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1613 ///
1614 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1615 ///
1616 /// **Note**: Pattern scanning requires a concrete `L2Cache` instance with `scan_keys()`.
1617 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1618 ///
1619 /// # Arguments
1620 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1621 ///
1622 /// # Example
1623 /// ```rust,no_run
1624 /// # use multi_tier_cache::CacheManager;
1625 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1626 /// // Invalidate all user caches
1627 /// cache_manager.invalidate_pattern("user:*").await?;
1628 ///
1629 /// // Invalidate specific user's related caches
1630 /// cache_manager.invalidate_pattern("user:123:*").await?;
1631 /// # Ok(())
1632 /// # }
1633 /// ```
1634 /// # Errors
1635 ///
1636 /// Returns an error if invalidation fails.
1637 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1638 // Scan L2 for matching keys
1639 // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1640 let keys = if let Some(l2) = &self.l2_cache_concrete {
1641 l2.scan_keys(pattern).await?
1642 } else {
1643 return Err(anyhow::anyhow!(
1644 "Pattern invalidation requires concrete L2Cache instance"
1645 ));
1646 };
1647
1648 if keys.is_empty() {
1649 debug!("No keys found matching pattern '{}'", pattern);
1650 return Ok(());
1651 }
1652
1653 // NEW: Multi-tier mode (v0.5.0+)
1654 if let Some(tiers) = &self.tiers {
1655 // Remove from ALL tiers
1656 for key in &keys {
1657 for tier in tiers {
1658 if let Err(e) = tier.remove(key).await {
1659 warn!(
1660 "Failed to remove '{}' from L{}: {}",
1661 key, tier.tier_level, e
1662 );
1663 }
1664 }
1665 }
1666 } else {
1667 // LEGACY: 2-tier mode - Remove from L2 in bulk
1668 if let Some(l2) = &self.l2_cache_concrete {
1669 l2.remove_bulk(&keys).await?;
1670 }
1671 }
1672
1673 // Broadcast pattern invalidation
1674 if let Some(publisher) = &self.invalidation_publisher {
1675 let mut pub_lock = publisher.lock().await;
1676 let msg = InvalidationMessage::remove_bulk(keys.clone());
1677 pub_lock.publish(&msg).await?;
1678 self.invalidation_stats
1679 .messages_sent
1680 .fetch_add(1, Ordering::Relaxed);
1681 }
1682
1683 debug!(
1684 "Invalidated {} keys matching pattern '{}'",
1685 keys.len(),
1686 pattern
1687 );
1688 Ok(())
1689 }
1690
1691 /// Set value with automatic broadcast to all instances
1692 ///
1693 /// This is a write-through operation that updates the cache and
1694 /// broadcasts the update to all other instances automatically.
1695 ///
1696 /// # Arguments
1697 /// * `key` - Cache key
1698 /// * `value` - Value to cache
1699 /// * `strategy` - Cache strategy (determines TTL)
1700 ///
1701 /// # Example
1702 /// ```rust,no_run
1703 /// # use multi_tier_cache::{CacheManager, CacheStrategy};
1704 /// # async fn example(cache_manager: &CacheManager) -> anyhow::Result<()> {
1705 /// // Update and broadcast in one call
1706 /// let data = serde_json::json!({"status": "active"});
1707 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1708 /// # Ok(())
1709 /// # }
1710 /// ```
1711 /// # Errors
1712 ///
1713 /// Returns an error if cache set or broadcast fails.
1714 pub async fn set_with_broadcast(
1715 &self,
1716 key: &str,
1717 value: serde_json::Value,
1718 strategy: CacheStrategy,
1719 ) -> Result<()> {
1720 let ttl = strategy.to_duration();
1721
1722 // Set in local caches
1723 self.set_with_strategy(key, value.clone(), strategy).await?;
1724
1725 // Broadcast update if invalidation is enabled
1726 if let Some(publisher) = &self.invalidation_publisher {
1727 let mut pub_lock = publisher.lock().await;
1728 let msg = InvalidationMessage::update(key, value, Some(ttl));
1729 pub_lock.publish(&msg).await?;
1730 self.invalidation_stats
1731 .messages_sent
1732 .fetch_add(1, Ordering::Relaxed);
1733 }
1734
1735 Ok(())
1736 }
1737
1738 /// Get invalidation statistics
1739 ///
1740 /// Returns statistics about invalidation operations if invalidation is enabled.
1741 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1742 if self.invalidation_subscriber.is_some() {
1743 Some(self.invalidation_stats.snapshot())
1744 } else {
1745 None
1746 }
1747 }
1748}
1749
1750/// Cache Manager statistics
1751#[allow(dead_code)]
1752#[derive(Debug, Clone)]
1753pub struct CacheManagerStats {
1754 pub total_requests: u64,
1755 pub l1_hits: u64,
1756 pub l2_hits: u64,
1757 pub total_hits: u64,
1758 pub misses: u64,
1759 pub hit_rate: f64,
1760 pub l1_hit_rate: f64,
1761 pub promotions: usize,
1762 pub in_flight_requests: usize,
1763}