multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use crate::backends::{L1Cache, L2Cache};
15use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
16use super::invalidation::{
17 InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
18 InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
19};
20
21/// RAII cleanup guard for in-flight request tracking
22/// Ensures that entries are removed from DashMap even on early return or panic
23struct CleanupGuard<'a> {
24 map: &'a DashMap<String, Arc<Mutex<()>>>,
25 key: String,
26}
27
28impl<'a> Drop for CleanupGuard<'a> {
29 fn drop(&mut self) {
30 self.map.remove(&self.key);
31 }
32}
33
34/// Cache strategies for different data types
35#[derive(Debug, Clone)]
36#[allow(dead_code)]
37pub enum CacheStrategy {
38 /// Real-time data - 10 seconds TTL
39 RealTime,
40 /// Short-term data - 5 minutes TTL
41 ShortTerm,
42 /// Medium-term data - 1 hour TTL
43 MediumTerm,
44 /// Long-term data - 3 hours TTL
45 LongTerm,
46 /// Custom TTL
47 Custom(Duration),
48 /// Default strategy (5 minutes)
49 Default,
50}
51
52impl CacheStrategy {
53 /// Convert strategy to duration
54 pub fn to_duration(&self) -> Duration {
55 match self {
56 Self::RealTime => Duration::from_secs(10),
57 Self::ShortTerm => Duration::from_secs(300), // 5 minutes
58 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
59 Self::LongTerm => Duration::from_secs(10800), // 3 hours
60 Self::Custom(duration) => *duration,
61 Self::Default => Duration::from_secs(300),
62 }
63 }
64}
65
66/// Statistics for a single cache tier
67#[derive(Debug)]
68pub struct TierStats {
69 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
70 pub tier_level: usize,
71 /// Number of cache hits at this tier
72 pub hits: AtomicU64,
73 /// Backend name for identification
74 pub backend_name: String,
75}
76
77impl Clone for TierStats {
78 fn clone(&self) -> Self {
79 Self {
80 tier_level: self.tier_level,
81 hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
82 backend_name: self.backend_name.clone(),
83 }
84 }
85}
86
87impl TierStats {
88 fn new(tier_level: usize, backend_name: String) -> Self {
89 Self {
90 tier_level,
91 hits: AtomicU64::new(0),
92 backend_name,
93 }
94 }
95
96 /// Get current hit count
97 pub fn hit_count(&self) -> u64 {
98 self.hits.load(Ordering::Relaxed)
99 }
100}
101
102/// A single cache tier in the multi-tier architecture
103pub struct CacheTier {
104 /// Cache backend for this tier
105 backend: Arc<dyn L2CacheBackend>,
106 /// Tier level (1 = hottest/fastest, higher = colder/slower)
107 tier_level: usize,
108 /// Enable automatic promotion to upper tiers on cache hit
109 promotion_enabled: bool,
110 /// TTL scale factor (multiplier for TTL when storing/promoting)
111 ttl_scale: f64,
112 /// Statistics for this tier
113 stats: TierStats,
114}
115
116impl CacheTier {
117 /// Create a new cache tier
118 pub fn new(
119 backend: Arc<dyn L2CacheBackend>,
120 tier_level: usize,
121 promotion_enabled: bool,
122 ttl_scale: f64,
123 ) -> Self {
124 let backend_name = backend.name().to_string();
125 Self {
126 backend,
127 tier_level,
128 promotion_enabled,
129 ttl_scale,
130 stats: TierStats::new(tier_level, backend_name),
131 }
132 }
133
134 /// Get value with TTL from this tier
135 async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
136 self.backend.get_with_ttl(key).await
137 }
138
139 /// Set value with TTL in this tier
140 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
141 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
142 self.backend.set_with_ttl(key, value, scaled_ttl).await
143 }
144
145 /// Remove value from this tier
146 async fn remove(&self, key: &str) -> Result<()> {
147 self.backend.remove(key).await
148 }
149
150 /// Record a cache hit for this tier
151 fn record_hit(&self) {
152 self.stats.hits.fetch_add(1, Ordering::Relaxed);
153 }
154}
155
156/// Configuration for a cache tier (used in builder pattern)
157#[derive(Debug, Clone)]
158pub struct TierConfig {
159 /// Tier level (1, 2, 3, 4...)
160 pub tier_level: usize,
161 /// Enable promotion to upper tiers on hit
162 pub promotion_enabled: bool,
163 /// TTL scale factor (1.0 = same as base TTL)
164 pub ttl_scale: f64,
165}
166
167impl TierConfig {
168 /// Create new tier configuration
169 pub fn new(tier_level: usize) -> Self {
170 Self {
171 tier_level,
172 promotion_enabled: true,
173 ttl_scale: 1.0,
174 }
175 }
176
177 /// Configure as L1 (hot tier)
178 pub fn as_l1() -> Self {
179 Self {
180 tier_level: 1,
181 promotion_enabled: false, // L1 is already top tier
182 ttl_scale: 1.0,
183 }
184 }
185
186 /// Configure as L2 (warm tier)
187 pub fn as_l2() -> Self {
188 Self {
189 tier_level: 2,
190 promotion_enabled: true,
191 ttl_scale: 1.0,
192 }
193 }
194
195 /// Configure as L3 (cold tier) with longer TTL
196 pub fn as_l3() -> Self {
197 Self {
198 tier_level: 3,
199 promotion_enabled: true,
200 ttl_scale: 2.0, // Keep data 2x longer
201 }
202 }
203
204 /// Configure as L4 (archive tier) with much longer TTL
205 pub fn as_l4() -> Self {
206 Self {
207 tier_level: 4,
208 promotion_enabled: true,
209 ttl_scale: 8.0, // Keep data 8x longer
210 }
211 }
212
213 /// Set promotion enabled
214 pub fn with_promotion(mut self, enabled: bool) -> Self {
215 self.promotion_enabled = enabled;
216 self
217 }
218
219 /// Set TTL scale factor
220 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
221 self.ttl_scale = scale;
222 self
223 }
224
225 /// Set tier level
226 pub fn with_level(mut self, level: usize) -> Self {
227 self.tier_level = level;
228 self
229 }
230}
231
232/// Proxy wrapper to convert L2CacheBackend to CacheBackend
233/// (Rust doesn't support automatic trait upcasting for trait objects)
234struct ProxyCacheBackend {
235 backend: Arc<dyn L2CacheBackend>,
236}
237
238#[async_trait::async_trait]
239impl CacheBackend for ProxyCacheBackend {
240 async fn get(&self, key: &str) -> Option<serde_json::Value> {
241 self.backend.get(key).await
242 }
243
244 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
245 self.backend.set_with_ttl(key, value, ttl).await
246 }
247
248 async fn remove(&self, key: &str) -> Result<()> {
249 self.backend.remove(key).await
250 }
251
252 async fn health_check(&self) -> bool {
253 self.backend.health_check().await
254 }
255
256 fn name(&self) -> &str {
257 self.backend.name()
258 }
259}
260
261/// Cache Manager - Unified operations across multiple cache tiers
262///
263/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
264/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
265/// legacy L1+L2 behavior for backward compatibility.
266pub struct CacheManager {
267 /// Dynamic multi-tier cache architecture (v0.5.0+)
268 /// If Some, this takes precedence over l1_cache/l2_cache fields
269 tiers: Option<Vec<CacheTier>>,
270
271 // ===== Legacy fields (v0.1.0 - v0.4.x) =====
272 // Maintained for backward compatibility
273 /// L1 Cache (trait object for pluggable backends)
274 l1_cache: Arc<dyn CacheBackend>,
275 /// L2 Cache (trait object for pluggable backends)
276 l2_cache: Arc<dyn L2CacheBackend>,
277 /// L2 Cache concrete instance (for invalidation scan_keys)
278 l2_cache_concrete: Option<Arc<L2Cache>>,
279
280 /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
281 streaming_backend: Option<Arc<dyn StreamingBackend>>,
282 /// Statistics (AtomicU64 is already thread-safe, no Arc needed)
283 total_requests: AtomicU64,
284 l1_hits: AtomicU64,
285 l2_hits: AtomicU64,
286 misses: AtomicU64,
287 promotions: AtomicUsize,
288 /// In-flight requests to prevent Cache Stampede on L2/compute operations
289 in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
290 /// Invalidation publisher (for broadcasting invalidation messages)
291 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
292 /// Invalidation subscriber (for receiving invalidation messages)
293 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
294 /// Invalidation statistics
295 invalidation_stats: Arc<AtomicInvalidationStats>,
296}
297
298impl CacheManager {
299 /// Create new cache manager with trait objects (pluggable backends)
300 ///
301 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
302 ///
303 /// # Arguments
304 ///
305 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
306 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
307 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
308 ///
309 /// # Example
310 ///
311 /// ```rust,ignore
312 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
313 /// use std::sync::Arc;
314 ///
315 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
316 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
317 ///
318 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
319 /// ```
320 pub async fn new_with_backends(
321 l1_cache: Arc<dyn CacheBackend>,
322 l2_cache: Arc<dyn L2CacheBackend>,
323 streaming_backend: Option<Arc<dyn StreamingBackend>>,
324 ) -> Result<Self> {
325 println!(" 🎯 Initializing Cache Manager with custom backends...");
326 println!(" L1: {}", l1_cache.name());
327 println!(" L2: {}", l2_cache.name());
328 if streaming_backend.is_some() {
329 println!(" Streaming: enabled");
330 } else {
331 println!(" Streaming: disabled");
332 }
333
334 Ok(Self {
335 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
336 l1_cache,
337 l2_cache,
338 l2_cache_concrete: None,
339 streaming_backend,
340 total_requests: AtomicU64::new(0),
341 l1_hits: AtomicU64::new(0),
342 l2_hits: AtomicU64::new(0),
343 misses: AtomicU64::new(0),
344 promotions: AtomicUsize::new(0),
345 in_flight_requests: Arc::new(DashMap::new()),
346 invalidation_publisher: None,
347 invalidation_subscriber: None,
348 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
349 })
350 }
351
352 /// Create new cache manager with default backends (backward compatible)
353 ///
354 /// This is the legacy constructor maintained for backward compatibility.
355 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
356 ///
357 /// # Arguments
358 ///
359 /// * `l1_cache` - Moka L1 cache instance
360 /// * `l2_cache` - Redis L2 cache instance
361 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
362 println!(" 🎯 Initializing Cache Manager...");
363
364 // Convert concrete types to trait objects
365 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
366 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
367
368 // Create RedisStreams backend for streaming functionality
369 let redis_url = std::env::var("REDIS_URL")
370 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
371 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
372 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
373
374 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
375 }
376
377 /// Create new cache manager with invalidation support
378 ///
379 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
380 ///
381 /// # Arguments
382 ///
383 /// * `l1_cache` - Moka L1 cache instance
384 /// * `l2_cache` - Redis L2 cache instance
385 /// * `redis_url` - Redis connection URL for Pub/Sub
386 /// * `config` - Invalidation configuration
387 ///
388 /// # Example
389 ///
390 /// ```rust,ignore
391 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
392 ///
393 /// let config = InvalidationConfig {
394 /// channel: "my_app:cache:invalidate".to_string(),
395 /// ..Default::default()
396 /// };
397 ///
398 /// let manager = CacheManager::new_with_invalidation(
399 /// l1, l2, "redis://localhost", config
400 /// ).await?;
401 /// ```
402 pub async fn new_with_invalidation(
403 l1_cache: Arc<L1Cache>,
404 l2_cache: Arc<L2Cache>,
405 redis_url: &str,
406 config: InvalidationConfig,
407 ) -> Result<Self> {
408 println!(" 🎯 Initializing Cache Manager with Invalidation...");
409 println!(" Pub/Sub channel: {}", config.channel);
410
411 // Convert concrete types to trait objects
412 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
413 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
414
415 // Create RedisStreams backend for streaming functionality
416 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
417 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
418
419 // Create publisher
420 let client = redis::Client::open(redis_url)?;
421 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
422 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
423
424 // Create subscriber
425 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
426 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
427
428 let manager = Self {
429 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
430 l1_cache: l1_backend,
431 l2_cache: l2_backend,
432 l2_cache_concrete: Some(l2_cache),
433 streaming_backend: Some(streaming_backend),
434 total_requests: AtomicU64::new(0),
435 l1_hits: AtomicU64::new(0),
436 l2_hits: AtomicU64::new(0),
437 misses: AtomicU64::new(0),
438 promotions: AtomicUsize::new(0),
439 in_flight_requests: Arc::new(DashMap::new()),
440 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
441 invalidation_subscriber: Some(Arc::new(subscriber)),
442 invalidation_stats,
443 };
444
445 // Start subscriber with handler
446 manager.start_invalidation_subscriber();
447
448 println!(" ✅ Cache Manager initialized with invalidation support");
449
450 Ok(manager)
451 }
452
453 /// Create new cache manager with multi-tier architecture (v0.5.0+)
454 ///
455 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
456 /// Tiers are checked in order (lower tier_level = faster/hotter).
457 ///
458 /// # Arguments
459 ///
460 /// * `tiers` - Vector of configured cache tiers (must be sorted by tier_level ascending)
461 /// * `streaming_backend` - Optional streaming backend
462 ///
463 /// # Example
464 ///
465 /// ```rust,ignore
466 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
467 /// use std::sync::Arc;
468 ///
469 /// // L1 + L2 + L3 setup
470 /// let l1 = Arc::new(L1Cache::new().await?);
471 /// let l2 = Arc::new(L2Cache::new().await?);
472 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
473 ///
474 /// let tiers = vec![
475 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
476 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
477 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
478 /// ];
479 ///
480 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
481 /// ```
482 pub async fn new_with_tiers(
483 tiers: Vec<CacheTier>,
484 streaming_backend: Option<Arc<dyn StreamingBackend>>,
485 ) -> Result<Self> {
486 println!(" 🎯 Initializing Multi-Tier Cache Manager...");
487 println!(" Tier count: {}", tiers.len());
488 for tier in &tiers {
489 println!(
490 " L{}: {} (promotion={}, ttl_scale={})",
491 tier.tier_level,
492 tier.stats.backend_name,
493 tier.promotion_enabled,
494 tier.ttl_scale
495 );
496 }
497
498 // Validate tiers are sorted by level
499 for i in 1..tiers.len() {
500 if tiers[i].tier_level <= tiers[i - 1].tier_level {
501 anyhow::bail!(
502 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
503 tiers[i].tier_level,
504 tiers[i - 1].tier_level
505 );
506 }
507 }
508
509 // For backward compatibility with legacy code, we need dummy l1/l2 caches
510 // Use first tier as l1, second tier as l2 if available
511 let (l1_cache, l2_cache) = if tiers.len() >= 2 {
512 (tiers[0].backend.clone(), tiers[1].backend.clone())
513 } else if tiers.len() == 1 {
514 // Only one tier - use it for both
515 let tier = tiers[0].backend.clone();
516 (tier.clone(), tier)
517 } else {
518 anyhow::bail!("At least one cache tier is required");
519 };
520
521 // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
522 let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
523 backend: l1_cache.clone(),
524 });
525
526 Ok(Self {
527 tiers: Some(tiers),
528 l1_cache: l1_backend,
529 l2_cache,
530 l2_cache_concrete: None,
531 streaming_backend,
532 total_requests: AtomicU64::new(0),
533 l1_hits: AtomicU64::new(0),
534 l2_hits: AtomicU64::new(0),
535 misses: AtomicU64::new(0),
536 promotions: AtomicUsize::new(0),
537 in_flight_requests: Arc::new(DashMap::new()),
538 invalidation_publisher: None,
539 invalidation_subscriber: None,
540 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
541 })
542 }
543
544 /// Start the invalidation subscriber background task
545 fn start_invalidation_subscriber(&self) {
546 if let Some(subscriber) = &self.invalidation_subscriber {
547 let l1_cache = Arc::clone(&self.l1_cache);
548 let l2_cache_concrete = self.l2_cache_concrete.clone();
549
550 subscriber.start(move |msg| {
551 let l1 = Arc::clone(&l1_cache);
552 let _l2 = l2_cache_concrete.clone();
553
554 async move {
555 match msg {
556 InvalidationMessage::Remove { key } => {
557 // Remove from L1
558 l1.remove(&key).await?;
559 println!("🗑️ [Invalidation] Removed '{}' from L1", key);
560 }
561 InvalidationMessage::Update { key, value, ttl_secs } => {
562 // Update L1 with new value
563 let ttl = ttl_secs
564 .map(Duration::from_secs)
565 .unwrap_or_else(|| Duration::from_secs(300));
566 l1.set_with_ttl(&key, value, ttl).await?;
567 println!("🔄 [Invalidation] Updated '{}' in L1", key);
568 }
569 InvalidationMessage::RemovePattern { pattern } => {
570 // For pattern-based invalidation, we can't easily iterate L1 cache
571 // So we just log it. The pattern invalidation is mainly for L2.
572 // L1 entries will naturally expire via TTL.
573 println!("🔍 [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
574 }
575 InvalidationMessage::RemoveBulk { keys } => {
576 // Remove multiple keys from L1
577 for key in keys {
578 if let Err(e) = l1.remove(&key).await {
579 eprintln!("⚠️ Failed to remove '{}' from L1: {}", key, e);
580 }
581 }
582 println!("🗑️ [Invalidation] Bulk removed keys from L1");
583 }
584 }
585 Ok(())
586 }
587 });
588
589 println!("📡 Invalidation subscriber started");
590 }
591 }
592
593 /// Get value from cache using multi-tier architecture (v0.5.0+)
594 ///
595 /// This method iterates through all configured tiers and automatically promotes
596 /// to upper tiers on cache hit.
597 async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
598 let tiers = self.tiers.as_ref().unwrap(); // Safe: only called when tiers is Some
599
600 // Try each tier sequentially (sorted by tier_level)
601 for (tier_index, tier) in tiers.iter().enumerate() {
602 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
603 // Cache hit!
604 tier.record_hit();
605
606 // Promote to all upper tiers (if promotion enabled)
607 if tier.promotion_enabled && tier_index > 0 {
608 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
609
610 // Promote to all tiers above this one
611 for upper_tier in tiers[..tier_index].iter().rev() {
612 if let Err(e) = upper_tier.set_with_ttl(key, value.clone(), promotion_ttl).await {
613 eprintln!(
614 "⚠️ Failed to promote '{}' from L{} to L{}: {}",
615 key, tier.tier_level, upper_tier.tier_level, e
616 );
617 } else {
618 self.promotions.fetch_add(1, Ordering::Relaxed);
619 println!(
620 "⬆️ Promoted '{}' from L{} to L{} (TTL: {:?})",
621 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
622 );
623 }
624 }
625 }
626
627 return Ok(Some(value));
628 }
629 }
630
631 // Cache miss across all tiers
632 Ok(None)
633 }
634
635 /// Get value from cache (L1 first, then L2 fallback with promotion)
636 ///
637 /// This method now includes built-in Cache Stampede protection when cache misses occur.
638 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
639 /// unnecessary duplicate work on external data sources.
640 ///
641 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
642 ///
643 /// # Arguments
644 /// * `key` - Cache key to retrieve
645 ///
646 /// # Returns
647 /// * `Ok(Some(value))` - Cache hit, value found in any tier
648 /// * `Ok(None)` - Cache miss, value not found in any cache
649 /// * `Err(error)` - Cache operation failed
650 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
651 self.total_requests.fetch_add(1, Ordering::Relaxed);
652
653 // NEW: Multi-tier mode (v0.5.0+)
654 if self.tiers.is_some() {
655 // Fast path for L1 (first tier) - no locking needed
656 if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
657 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
658 tier1.record_hit();
659 // Update legacy stats for backward compatibility
660 self.l1_hits.fetch_add(1, Ordering::Relaxed);
661 return Ok(Some(value));
662 }
663 }
664
665 // L1 miss - use stampede protection for lower tiers
666 let key_owned = key.to_string();
667 let lock_guard = self.in_flight_requests
668 .entry(key_owned.clone())
669 .or_insert_with(|| Arc::new(Mutex::new(())))
670 .clone();
671
672 let _guard = lock_guard.lock().await;
673 let cleanup_guard = CleanupGuard {
674 map: &self.in_flight_requests,
675 key: key_owned.clone(),
676 };
677
678 // Double-check L1 after acquiring lock
679 if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
680 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
681 tier1.record_hit();
682 self.l1_hits.fetch_add(1, Ordering::Relaxed);
683 return Ok(Some(value));
684 }
685 }
686
687 // Check remaining tiers with promotion
688 let result = self.get_multi_tier(key).await?;
689
690 if result.is_some() {
691 // Hit in L2+ tier - update legacy stats
692 if self.tiers.as_ref().unwrap().len() >= 2 {
693 self.l2_hits.fetch_add(1, Ordering::Relaxed);
694 }
695 } else {
696 self.misses.fetch_add(1, Ordering::Relaxed);
697 }
698
699 drop(cleanup_guard);
700 return Ok(result);
701 }
702
703 // LEGACY: 2-tier mode (L1 + L2)
704 // Fast path: Try L1 first (no locking needed)
705 if let Some(value) = self.l1_cache.get(key).await {
706 self.l1_hits.fetch_add(1, Ordering::Relaxed);
707 return Ok(Some(value));
708 }
709
710 // L1 miss - implement Cache Stampede protection for L2 lookup
711 let key_owned = key.to_string();
712 let lock_guard = self.in_flight_requests
713 .entry(key_owned.clone())
714 .or_insert_with(|| Arc::new(Mutex::new(())))
715 .clone();
716
717 let _guard = lock_guard.lock().await;
718
719 // RAII cleanup guard - ensures entry is removed even on early return or panic
720 let cleanup_guard = CleanupGuard {
721 map: &self.in_flight_requests,
722 key: key_owned.clone(),
723 };
724
725 // Double-check L1 cache after acquiring lock
726 // (Another concurrent request might have populated it while we were waiting)
727 if let Some(value) = self.l1_cache.get(key).await {
728 self.l1_hits.fetch_add(1, Ordering::Relaxed);
729 // cleanup_guard will auto-remove entry on drop
730 return Ok(Some(value));
731 }
732
733 // Check L2 cache with TTL information
734 if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
735 self.l2_hits.fetch_add(1, Ordering::Relaxed);
736
737 // Promote to L1 with same TTL as Redis (or default if no TTL)
738 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
739
740 if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
741 // L1 promotion failed, but we still have the data
742 eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
743 } else {
744 self.promotions.fetch_add(1, Ordering::Relaxed);
745 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
746 }
747
748 // cleanup_guard will auto-remove entry on drop
749 return Ok(Some(value));
750 }
751
752 // Both L1 and L2 miss
753 self.misses.fetch_add(1, Ordering::Relaxed);
754
755 // cleanup_guard will auto-remove entry on drop
756 drop(cleanup_guard);
757
758 Ok(None)
759 }
760
761 /// Get value from cache with fallback computation (enhanced backward compatibility)
762 ///
763 /// This is a convenience method that combines `get()` with optional computation.
764 /// If the value is not found in cache, it will execute the compute function
765 /// and cache the result automatically.
766 ///
767 /// # Arguments
768 /// * `key` - Cache key
769 /// * `compute_fn` - Optional function to compute value if not in cache
770 /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
771 ///
772 /// # Returns
773 /// * `Ok(Some(value))` - Value found in cache or computed successfully
774 /// * `Ok(None)` - Value not in cache and no compute function provided
775 /// * `Err(error)` - Cache operation or computation failed
776 ///
777 /// # Example
778 /// ```ignore
779 /// // Simple cache get (existing behavior)
780 /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
781 ///
782 /// // Get with computation fallback (new enhanced behavior)
783 /// let api_data = cache_manager.get_with_fallback(
784 /// "api_response",
785 /// Some(|| async { fetch_data_from_api().await }),
786 /// Some(CacheStrategy::RealTime)
787 /// ).await?;
788 /// ```
789
790 /// Set value with specific cache strategy (all tiers)
791 ///
792 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
793 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
794 pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
795 let ttl = strategy.to_duration();
796
797 // NEW: Multi-tier mode (v0.5.0+)
798 if let Some(tiers) = &self.tiers {
799 // Store in ALL tiers with their respective TTL scaling
800 let mut success_count = 0;
801 let mut last_error = None;
802
803 for tier in tiers {
804 match tier.set_with_ttl(key, value.clone(), ttl).await {
805 Ok(_) => {
806 success_count += 1;
807 }
808 Err(e) => {
809 eprintln!("⚠️ L{} cache set failed for key '{}': {}", tier.tier_level, key, e);
810 last_error = Some(e);
811 }
812 }
813 }
814
815 if success_count > 0 {
816 println!("💾 [Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
817 key, success_count, tiers.len(), ttl);
818 return Ok(());
819 } else {
820 return Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{}'", key)));
821 }
822 }
823
824 // LEGACY: 2-tier mode (L1 + L2)
825 // Store in both L1 and L2
826 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
827 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
828
829 // Return success if at least one cache succeeded
830 match (l1_result, l2_result) {
831 (Ok(_), Ok(_)) => {
832 // Both succeeded
833 println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
834 Ok(())
835 }
836 (Ok(_), Err(_)) => {
837 // L1 succeeded, L2 failed
838 eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
839 println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
840 Ok(())
841 }
842 (Err(_), Ok(_)) => {
843 // L1 failed, L2 succeeded
844 eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
845 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
846 Ok(())
847 }
848 (Err(e1), Err(_e2)) => {
849 // Both failed
850 Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
851 }
852 }
853 }
854
855 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
856 ///
857 /// This method provides comprehensive Cache Stampede protection:
858 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
859 /// 2. Check L2 cache with mutex-based coalescing
860 /// 3. Compute fresh data with protection against concurrent computations
861 ///
862 /// # Arguments
863 /// * `key` - Cache key
864 /// * `strategy` - Cache strategy for TTL and storage behavior
865 /// * `compute_fn` - Async function to compute the value if not in any cache
866 ///
867 /// # Example
868 /// ```ignore
869 /// let api_data = cache_manager.get_or_compute_with(
870 /// "api_response",
871 /// CacheStrategy::RealTime,
872 /// || async {
873 /// fetch_data_from_api().await
874 /// }
875 /// ).await?;
876 /// ```
877 #[allow(dead_code)]
878 pub async fn get_or_compute_with<F, Fut>(
879 &self,
880 key: &str,
881 strategy: CacheStrategy,
882 compute_fn: F,
883 ) -> Result<serde_json::Value>
884 where
885 F: FnOnce() -> Fut + Send,
886 Fut: Future<Output = Result<serde_json::Value>> + Send,
887 {
888 self.total_requests.fetch_add(1, Ordering::Relaxed);
889
890 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
891 if let Some(value) = self.l1_cache.get(key).await {
892 self.l1_hits.fetch_add(1, Ordering::Relaxed);
893 return Ok(value);
894 }
895
896 // 2. L1 miss - try L2 with Cache Stampede protection
897 let key_owned = key.to_string();
898 let lock_guard = self.in_flight_requests
899 .entry(key_owned.clone())
900 .or_insert_with(|| Arc::new(Mutex::new(())))
901 .clone();
902
903 let _guard = lock_guard.lock().await;
904
905 // RAII cleanup guard - ensures entry is removed even on early return or panic
906 let _cleanup_guard = CleanupGuard {
907 map: &self.in_flight_requests,
908 key: key_owned,
909 };
910
911 // 3. Double-check L1 cache after acquiring lock
912 // (Another request might have populated it while we were waiting)
913 if let Some(value) = self.l1_cache.get(key).await {
914 self.l1_hits.fetch_add(1, Ordering::Relaxed);
915 // _cleanup_guard will auto-remove entry on drop
916 return Ok(value);
917 }
918
919 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
920 if let Some(tiers) = &self.tiers {
921 // Check tiers starting from index 1 (skip L1 since already checked)
922 for tier in tiers.iter().skip(1) {
923 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
924 tier.record_hit();
925
926 // Promote to L1 (first tier)
927 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
928 if let Err(e) = tiers[0].set_with_ttl(key, value.clone(), promotion_ttl).await {
929 eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}", key, tier.tier_level, e);
930 } else {
931 self.promotions.fetch_add(1, Ordering::Relaxed);
932 println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
933 key, tier.tier_level, promotion_ttl);
934 }
935
936 // _cleanup_guard will auto-remove entry on drop
937 return Ok(value);
938 }
939 }
940 } else {
941 // LEGACY: Check L2 cache with TTL
942 if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
943 self.l2_hits.fetch_add(1, Ordering::Relaxed);
944
945 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
946 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
947
948 if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
949 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
950 } else {
951 self.promotions.fetch_add(1, Ordering::Relaxed);
952 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
953 }
954
955 // _cleanup_guard will auto-remove entry on drop
956 return Ok(value);
957 }
958 }
959
960 // 5. Cache miss across all tiers - compute fresh data
961 println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
962 let fresh_data = compute_fn().await?;
963
964 // 6. Store in both caches
965 if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
966 eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
967 }
968
969 // 7. _cleanup_guard will auto-remove entry on drop
970
971 Ok(fresh_data)
972 }
973
974 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
975 ///
976 /// This method provides the same functionality as `get_or_compute_with()` but with
977 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
978 /// API calls, or any computation that returns structured data.
979 ///
980 /// # Type Safety
981 ///
982 /// - Returns your actual type `T` instead of `serde_json::Value`
983 /// - Compiler enforces Serialize + DeserializeOwned bounds
984 /// - No manual JSON conversion needed
985 ///
986 /// # Cache Flow
987 ///
988 /// 1. Check L1 cache → deserialize if found
989 /// 2. Check L2 cache → deserialize + promote to L1 if found
990 /// 3. Execute compute_fn → serialize → store in L1+L2
991 /// 4. Full stampede protection (only ONE request computes)
992 ///
993 /// # Arguments
994 ///
995 /// * `key` - Cache key
996 /// * `strategy` - Cache strategy for TTL
997 /// * `compute_fn` - Async function returning `Result<T>`
998 ///
999 /// # Example - Database Query
1000 ///
1001 /// ```no_run
1002 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1003 /// # use std::sync::Arc;
1004 /// # use serde::{Serialize, Deserialize};
1005 /// # async fn example() -> anyhow::Result<()> {
1006 /// # let l1 = Arc::new(L1Cache::new().await?);
1007 /// # let l2 = Arc::new(L2Cache::new().await?);
1008 /// # let cache_manager = CacheManager::new(l1, l2);
1009 ///
1010 /// #[derive(Serialize, Deserialize)]
1011 /// struct User {
1012 /// id: i64,
1013 /// name: String,
1014 /// }
1015 ///
1016 /// // Type-safe database caching (example - requires sqlx)
1017 /// // let user: User = cache_manager.get_or_compute_typed(
1018 /// // "user:123",
1019 /// // CacheStrategy::MediumTerm,
1020 /// // || async {
1021 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1022 /// // .bind(123)
1023 /// // .fetch_one(&pool)
1024 /// // .await
1025 /// // }
1026 /// // ).await?;
1027 /// # Ok(())
1028 /// # }
1029 /// ```
1030 ///
1031 /// # Example - API Call
1032 ///
1033 /// ```no_run
1034 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1035 /// # use std::sync::Arc;
1036 /// # use serde::{Serialize, Deserialize};
1037 /// # async fn example() -> anyhow::Result<()> {
1038 /// # let l1 = Arc::new(L1Cache::new().await?);
1039 /// # let l2 = Arc::new(L2Cache::new().await?);
1040 /// # let cache_manager = CacheManager::new(l1, l2);
1041 /// #[derive(Serialize, Deserialize)]
1042 /// struct ApiResponse {
1043 /// data: String,
1044 /// timestamp: i64,
1045 /// }
1046 ///
1047 /// // API call caching (example - requires reqwest)
1048 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1049 /// // "api:endpoint",
1050 /// // CacheStrategy::RealTime,
1051 /// // || async {
1052 /// // reqwest::get("https://api.example.com/data")
1053 /// // .await?
1054 /// // .json::<ApiResponse>()
1055 /// // .await
1056 /// // }
1057 /// // ).await?;
1058 /// # Ok(())
1059 /// # }
1060 /// ```
1061 ///
1062 /// # Performance
1063 ///
1064 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1065 /// - L2 hit: 2-5ms + deserialization + L1 promotion
1066 /// - Compute: Your function time + serialization + L1+L2 storage
1067 /// - Stampede protection: 99.6% latency reduction under high concurrency
1068 ///
1069 /// # Errors
1070 ///
1071 /// Returns error if:
1072 /// - Compute function fails
1073 /// - Serialization fails (invalid type for JSON)
1074 /// - Deserialization fails (cache data doesn't match type T)
1075 /// - Cache operations fail (Redis connection issues)
1076 pub async fn get_or_compute_typed<T, F, Fut>(
1077 &self,
1078 key: &str,
1079 strategy: CacheStrategy,
1080 compute_fn: F,
1081 ) -> Result<T>
1082 where
1083 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1084 F: FnOnce() -> Fut + Send,
1085 Fut: Future<Output = Result<T>> + Send,
1086 {
1087 self.total_requests.fetch_add(1, Ordering::Relaxed);
1088
1089 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1090 if let Some(cached_json) = self.l1_cache.get(key).await {
1091 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1092
1093 // Attempt to deserialize from JSON to type T
1094 match serde_json::from_value::<T>(cached_json) {
1095 Ok(typed_value) => {
1096 println!("✅ [L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
1097 return Ok(typed_value);
1098 }
1099 Err(e) => {
1100 // Deserialization failed - cache data may be stale or corrupt
1101 eprintln!("⚠️ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1102 // Fall through to recompute
1103 }
1104 }
1105 }
1106
1107 // 2. L1 miss - try L2 with Cache Stampede protection
1108 let key_owned = key.to_string();
1109 let lock_guard = self.in_flight_requests
1110 .entry(key_owned.clone())
1111 .or_insert_with(|| Arc::new(Mutex::new(())))
1112 .clone();
1113
1114 let _guard = lock_guard.lock().await;
1115
1116 // RAII cleanup guard - ensures entry is removed even on early return or panic
1117 let _cleanup_guard = CleanupGuard {
1118 map: &self.in_flight_requests,
1119 key: key_owned,
1120 };
1121
1122 // 3. Double-check L1 cache after acquiring lock
1123 // (Another request might have populated it while we were waiting)
1124 if let Some(cached_json) = self.l1_cache.get(key).await {
1125 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1126 if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1127 println!("✅ [L1 HIT] Deserialized '{}' after lock acquisition", key);
1128 return Ok(typed_value);
1129 }
1130 }
1131
1132 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1133 if let Some(tiers) = &self.tiers {
1134 // Check tiers starting from index 1 (skip L1 since already checked)
1135 for tier in tiers.iter().skip(1) {
1136 if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1137 tier.record_hit();
1138
1139 // Attempt to deserialize
1140 match serde_json::from_value::<T>(cached_json.clone()) {
1141 Ok(typed_value) => {
1142 println!("✅ [L{} HIT] Deserialized '{}' to type {}",
1143 tier.tier_level, key, std::any::type_name::<T>());
1144
1145 // Promote to L1 (first tier)
1146 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1147 if let Err(e) = tiers[0].set_with_ttl(key, cached_json, promotion_ttl).await {
1148 eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}",
1149 key, tier.tier_level, e);
1150 } else {
1151 self.promotions.fetch_add(1, Ordering::Relaxed);
1152 println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1153 key, tier.tier_level, promotion_ttl);
1154 }
1155
1156 return Ok(typed_value);
1157 }
1158 Err(e) => {
1159 eprintln!("⚠️ L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1160 tier.tier_level, key, e);
1161 // Continue to next tier
1162 }
1163 }
1164 }
1165 }
1166 } else {
1167 // LEGACY: Check L2 cache with TTL
1168 if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1169 self.l2_hits.fetch_add(1, Ordering::Relaxed);
1170
1171 // Attempt to deserialize
1172 match serde_json::from_value::<T>(cached_json.clone()) {
1173 Ok(typed_value) => {
1174 println!("✅ [L2 HIT] Deserialized '{}' from Redis", key);
1175
1176 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1177 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1178
1179 if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
1180 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
1181 } else {
1182 self.promotions.fetch_add(1, Ordering::Relaxed);
1183 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
1184 }
1185
1186 return Ok(typed_value);
1187 }
1188 Err(e) => {
1189 eprintln!("⚠️ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1190 // Fall through to recompute
1191 }
1192 }
1193 }
1194 }
1195
1196 // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1197 println!("💻 Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
1198 let typed_value = compute_fn().await?;
1199
1200 // 6. Serialize to JSON for storage
1201 let json_value = serde_json::to_value(&typed_value)
1202 .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
1203
1204 // 7. Store in both L1 and L2 caches
1205 if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1206 eprintln!("⚠️ Failed to cache computed typed data for key '{}': {}", key, e);
1207 } else {
1208 println!("💾 Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
1209 }
1210
1211 // 8. _cleanup_guard will auto-remove entry on drop
1212
1213 Ok(typed_value)
1214 }
1215
1216 /// Get comprehensive cache statistics
1217 ///
1218 /// In multi-tier mode, aggregates statistics from all tiers.
1219 /// In legacy mode, returns L1 and L2 stats.
1220 #[allow(dead_code)]
1221 pub fn get_stats(&self) -> CacheManagerStats {
1222 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1223 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1224 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1225 let misses = self.misses.load(Ordering::Relaxed);
1226
1227 CacheManagerStats {
1228 total_requests: total_reqs,
1229 l1_hits,
1230 l2_hits,
1231 total_hits: l1_hits + l2_hits,
1232 misses,
1233 hit_rate: if total_reqs > 0 {
1234 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1235 } else { 0.0 },
1236 l1_hit_rate: if total_reqs > 0 {
1237 (l1_hits as f64 / total_reqs as f64) * 100.0
1238 } else { 0.0 },
1239 promotions: self.promotions.load(Ordering::Relaxed),
1240 in_flight_requests: self.in_flight_requests.len(),
1241 }
1242 }
1243
1244 /// Get per-tier statistics (v0.5.0+)
1245 ///
1246 /// Returns statistics for each tier if multi-tier mode is enabled.
1247 /// Returns None if using legacy 2-tier mode.
1248 ///
1249 /// # Example
1250 /// ```rust,ignore
1251 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1252 /// for stats in tier_stats {
1253 /// println!("L{}: {} hits ({})",
1254 /// stats.tier_level,
1255 /// stats.hit_count(),
1256 /// stats.backend_name);
1257 /// }
1258 /// }
1259 /// ```
1260 pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1261 self.tiers.as_ref().map(|tiers| {
1262 tiers.iter().map(|tier| tier.stats.clone()).collect()
1263 })
1264 }
1265
1266 // ===== Redis Streams Methods =====
1267
1268 /// Publish data to Redis Stream
1269 ///
1270 /// # Arguments
1271 /// * `stream_key` - Name of the stream (e.g., "events_stream")
1272 /// * `fields` - Field-value pairs to publish
1273 /// * `maxlen` - Optional max length for stream trimming
1274 ///
1275 /// # Returns
1276 /// The entry ID generated by Redis
1277 ///
1278 /// # Errors
1279 /// Returns error if streaming backend is not configured
1280 pub async fn publish_to_stream(
1281 &self,
1282 stream_key: &str,
1283 fields: Vec<(String, String)>,
1284 maxlen: Option<usize>
1285 ) -> Result<String> {
1286 match &self.streaming_backend {
1287 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1288 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1289 }
1290 }
1291
1292 /// Read latest entries from Redis Stream
1293 ///
1294 /// # Arguments
1295 /// * `stream_key` - Name of the stream
1296 /// * `count` - Number of latest entries to retrieve
1297 ///
1298 /// # Returns
1299 /// Vector of (entry_id, fields) tuples (newest first)
1300 ///
1301 /// # Errors
1302 /// Returns error if streaming backend is not configured
1303 pub async fn read_stream_latest(
1304 &self,
1305 stream_key: &str,
1306 count: usize
1307 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1308 match &self.streaming_backend {
1309 Some(backend) => backend.stream_read_latest(stream_key, count).await,
1310 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1311 }
1312 }
1313
1314 /// Read from Redis Stream with optional blocking
1315 ///
1316 /// # Arguments
1317 /// * `stream_key` - Name of the stream
1318 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1319 /// * `count` - Max entries to retrieve
1320 /// * `block_ms` - Optional blocking timeout in ms
1321 ///
1322 /// # Returns
1323 /// Vector of (entry_id, fields) tuples
1324 ///
1325 /// # Errors
1326 /// Returns error if streaming backend is not configured
1327 pub async fn read_stream(
1328 &self,
1329 stream_key: &str,
1330 last_id: &str,
1331 count: usize,
1332 block_ms: Option<usize>
1333 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1334 match &self.streaming_backend {
1335 Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
1336 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1337 }
1338 }
1339
1340 // ===== Cache Invalidation Methods =====
1341
1342 /// Invalidate a cache key across all instances
1343 ///
1344 /// This removes the key from all cache tiers and broadcasts
1345 /// the invalidation to all other cache instances via Redis Pub/Sub.
1346 ///
1347 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1348 ///
1349 /// # Arguments
1350 /// * `key` - Cache key to invalidate
1351 ///
1352 /// # Example
1353 /// ```rust,ignore
1354 /// // Invalidate user cache after profile update
1355 /// cache_manager.invalidate("user:123").await?;
1356 /// ```
1357 pub async fn invalidate(&self, key: &str) -> Result<()> {
1358 // NEW: Multi-tier mode (v0.5.0+)
1359 if let Some(tiers) = &self.tiers {
1360 // Remove from ALL tiers
1361 for tier in tiers {
1362 if let Err(e) = tier.remove(key).await {
1363 eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1364 }
1365 }
1366 } else {
1367 // LEGACY: 2-tier mode
1368 self.l1_cache.remove(key).await?;
1369 self.l2_cache.remove(key).await?;
1370 }
1371
1372 // Broadcast to other instances
1373 if let Some(publisher) = &self.invalidation_publisher {
1374 let mut pub_lock = publisher.lock().await;
1375 let msg = InvalidationMessage::remove(key);
1376 pub_lock.publish(&msg).await?;
1377 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1378 }
1379
1380 println!("🗑️ Invalidated '{}' across all instances", key);
1381 Ok(())
1382 }
1383
1384 /// Update cache value across all instances
1385 ///
1386 /// This updates the key in all cache tiers and broadcasts
1387 /// the update to all other cache instances, avoiding cache misses.
1388 ///
1389 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1390 ///
1391 /// # Arguments
1392 /// * `key` - Cache key to update
1393 /// * `value` - New value
1394 /// * `ttl` - Optional TTL (uses default if None)
1395 ///
1396 /// # Example
1397 /// ```rust,ignore
1398 /// // Update user cache with new data
1399 /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1400 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1401 /// ```
1402 pub async fn update_cache(
1403 &self,
1404 key: &str,
1405 value: serde_json::Value,
1406 ttl: Option<Duration>,
1407 ) -> Result<()> {
1408 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1409
1410 // NEW: Multi-tier mode (v0.5.0+)
1411 if let Some(tiers) = &self.tiers {
1412 // Update ALL tiers with their respective TTL scaling
1413 for tier in tiers {
1414 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1415 eprintln!("⚠️ Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1416 }
1417 }
1418 } else {
1419 // LEGACY: 2-tier mode
1420 self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1421 self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1422 }
1423
1424 // Broadcast update to other instances
1425 if let Some(publisher) = &self.invalidation_publisher {
1426 let mut pub_lock = publisher.lock().await;
1427 let msg = InvalidationMessage::update(key, value, Some(ttl));
1428 pub_lock.publish(&msg).await?;
1429 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1430 }
1431
1432 println!("🔄 Updated '{}' across all instances", key);
1433 Ok(())
1434 }
1435
1436 /// Invalidate all keys matching a pattern
1437 ///
1438 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1439 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1440 ///
1441 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1442 ///
1443 /// **Note**: Pattern scanning requires a concrete L2Cache instance with `scan_keys()`.
1444 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1445 ///
1446 /// # Arguments
1447 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1448 ///
1449 /// # Example
1450 /// ```rust,ignore
1451 /// // Invalidate all user caches
1452 /// cache_manager.invalidate_pattern("user:*").await?;
1453 ///
1454 /// // Invalidate specific user's related caches
1455 /// cache_manager.invalidate_pattern("user:123:*").await?;
1456 /// ```
1457 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1458 // Scan L2 for matching keys
1459 // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1460 let keys = if let Some(l2) = &self.l2_cache_concrete {
1461 l2.scan_keys(pattern).await?
1462 } else {
1463 return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
1464 };
1465
1466 if keys.is_empty() {
1467 println!("🔍 No keys found matching pattern '{}'", pattern);
1468 return Ok(());
1469 }
1470
1471 // NEW: Multi-tier mode (v0.5.0+)
1472 if let Some(tiers) = &self.tiers {
1473 // Remove from ALL tiers
1474 for key in &keys {
1475 for tier in tiers {
1476 if let Err(e) = tier.remove(key).await {
1477 eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1478 }
1479 }
1480 }
1481 } else {
1482 // LEGACY: 2-tier mode - Remove from L2 in bulk
1483 if let Some(l2) = &self.l2_cache_concrete {
1484 l2.remove_bulk(&keys).await?;
1485 }
1486 }
1487
1488 // Broadcast pattern invalidation
1489 if let Some(publisher) = &self.invalidation_publisher {
1490 let mut pub_lock = publisher.lock().await;
1491 let msg = InvalidationMessage::remove_bulk(keys.clone());
1492 pub_lock.publish(&msg).await?;
1493 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1494 }
1495
1496 println!("🔍 Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
1497 Ok(())
1498 }
1499
1500 /// Set value with automatic broadcast to all instances
1501 ///
1502 /// This is a write-through operation that updates the cache and
1503 /// broadcasts the update to all other instances automatically.
1504 ///
1505 /// # Arguments
1506 /// * `key` - Cache key
1507 /// * `value` - Value to cache
1508 /// * `strategy` - Cache strategy (determines TTL)
1509 ///
1510 /// # Example
1511 /// ```rust,ignore
1512 /// // Update and broadcast in one call
1513 /// let data = serde_json::json!({"status": "active"});
1514 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1515 /// ```
1516 pub async fn set_with_broadcast(
1517 &self,
1518 key: &str,
1519 value: serde_json::Value,
1520 strategy: CacheStrategy,
1521 ) -> Result<()> {
1522 let ttl = strategy.to_duration();
1523
1524 // Set in local caches
1525 self.set_with_strategy(key, value.clone(), strategy).await?;
1526
1527 // Broadcast update if invalidation is enabled
1528 if let Some(publisher) = &self.invalidation_publisher {
1529 let mut pub_lock = publisher.lock().await;
1530 let msg = InvalidationMessage::update(key, value, Some(ttl));
1531 pub_lock.publish(&msg).await?;
1532 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1533 }
1534
1535 Ok(())
1536 }
1537
1538 /// Get invalidation statistics
1539 ///
1540 /// Returns statistics about invalidation operations if invalidation is enabled.
1541 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1542 if self.invalidation_subscriber.is_some() {
1543 Some(self.invalidation_stats.snapshot())
1544 } else {
1545 None
1546 }
1547 }
1548}
1549
1550/// Cache Manager statistics
1551#[allow(dead_code)]
1552#[derive(Debug, Clone)]
1553pub struct CacheManagerStats {
1554 pub total_requests: u64,
1555 pub l1_hits: u64,
1556 pub l2_hits: u64,
1557 pub total_hits: u64,
1558 pub misses: u64,
1559 pub hit_rate: f64,
1560 pub l1_hit_rate: f64,
1561 pub promotions: usize,
1562 pub in_flight_requests: usize,
1563}