multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use crate::backends::{L1Cache, L2Cache};
15use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
16use super::invalidation::{
17 InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
18 InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
19};
20
21/// RAII cleanup guard for in-flight request tracking
22/// Ensures that entries are removed from DashMap even on early return or panic
23struct CleanupGuard<'a> {
24 map: &'a DashMap<String, Arc<Mutex<()>>>,
25 key: String,
26}
27
28impl<'a> Drop for CleanupGuard<'a> {
29 fn drop(&mut self) {
30 self.map.remove(&self.key);
31 }
32}
33
34/// Cache strategies for different data types
35#[derive(Debug, Clone)]
36#[allow(dead_code)]
37pub enum CacheStrategy {
38 /// Real-time data - 10 seconds TTL
39 RealTime,
40 /// Short-term data - 5 minutes TTL
41 ShortTerm,
42 /// Medium-term data - 1 hour TTL
43 MediumTerm,
44 /// Long-term data - 3 hours TTL
45 LongTerm,
46 /// Custom TTL
47 Custom(Duration),
48 /// Default strategy (5 minutes)
49 Default,
50}
51
52impl CacheStrategy {
53 /// Convert strategy to duration
54 pub fn to_duration(&self) -> Duration {
55 match self {
56 Self::RealTime => Duration::from_secs(10),
57 Self::ShortTerm => Duration::from_secs(300), // 5 minutes
58 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
59 Self::LongTerm => Duration::from_secs(10800), // 3 hours
60 Self::Custom(duration) => *duration,
61 Self::Default => Duration::from_secs(300),
62 }
63 }
64}
65
66/// Statistics for a single cache tier
67#[derive(Debug, Clone)]
68pub struct TierStats {
69 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
70 pub tier_level: usize,
71 /// Number of cache hits at this tier
72 pub hits: Arc<AtomicU64>,
73 /// Backend name for identification
74 pub backend_name: String,
75}
76
77impl TierStats {
78 fn new(tier_level: usize, backend_name: String) -> Self {
79 Self {
80 tier_level,
81 hits: Arc::new(AtomicU64::new(0)),
82 backend_name,
83 }
84 }
85
86 /// Get current hit count
87 pub fn hit_count(&self) -> u64 {
88 self.hits.load(Ordering::Relaxed)
89 }
90}
91
92/// A single cache tier in the multi-tier architecture
93pub struct CacheTier {
94 /// Cache backend for this tier
95 backend: Arc<dyn L2CacheBackend>,
96 /// Tier level (1 = hottest/fastest, higher = colder/slower)
97 tier_level: usize,
98 /// Enable automatic promotion to upper tiers on cache hit
99 promotion_enabled: bool,
100 /// TTL scale factor (multiplier for TTL when storing/promoting)
101 ttl_scale: f64,
102 /// Statistics for this tier
103 stats: TierStats,
104}
105
106impl CacheTier {
107 /// Create a new cache tier
108 pub fn new(
109 backend: Arc<dyn L2CacheBackend>,
110 tier_level: usize,
111 promotion_enabled: bool,
112 ttl_scale: f64,
113 ) -> Self {
114 let backend_name = backend.name().to_string();
115 Self {
116 backend,
117 tier_level,
118 promotion_enabled,
119 ttl_scale,
120 stats: TierStats::new(tier_level, backend_name),
121 }
122 }
123
124 /// Get value with TTL from this tier
125 async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
126 self.backend.get_with_ttl(key).await
127 }
128
129 /// Set value with TTL in this tier
130 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
131 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
132 self.backend.set_with_ttl(key, value, scaled_ttl).await
133 }
134
135 /// Remove value from this tier
136 async fn remove(&self, key: &str) -> Result<()> {
137 self.backend.remove(key).await
138 }
139
140 /// Record a cache hit for this tier
141 fn record_hit(&self) {
142 self.stats.hits.fetch_add(1, Ordering::Relaxed);
143 }
144}
145
146/// Configuration for a cache tier (used in builder pattern)
147#[derive(Debug, Clone)]
148pub struct TierConfig {
149 /// Tier level (1, 2, 3, 4...)
150 pub tier_level: usize,
151 /// Enable promotion to upper tiers on hit
152 pub promotion_enabled: bool,
153 /// TTL scale factor (1.0 = same as base TTL)
154 pub ttl_scale: f64,
155}
156
157impl TierConfig {
158 /// Create new tier configuration
159 pub fn new(tier_level: usize) -> Self {
160 Self {
161 tier_level,
162 promotion_enabled: true,
163 ttl_scale: 1.0,
164 }
165 }
166
167 /// Configure as L1 (hot tier)
168 pub fn as_l1() -> Self {
169 Self {
170 tier_level: 1,
171 promotion_enabled: false, // L1 is already top tier
172 ttl_scale: 1.0,
173 }
174 }
175
176 /// Configure as L2 (warm tier)
177 pub fn as_l2() -> Self {
178 Self {
179 tier_level: 2,
180 promotion_enabled: true,
181 ttl_scale: 1.0,
182 }
183 }
184
185 /// Configure as L3 (cold tier) with longer TTL
186 pub fn as_l3() -> Self {
187 Self {
188 tier_level: 3,
189 promotion_enabled: true,
190 ttl_scale: 2.0, // Keep data 2x longer
191 }
192 }
193
194 /// Configure as L4 (archive tier) with much longer TTL
195 pub fn as_l4() -> Self {
196 Self {
197 tier_level: 4,
198 promotion_enabled: true,
199 ttl_scale: 8.0, // Keep data 8x longer
200 }
201 }
202
203 /// Set promotion enabled
204 pub fn with_promotion(mut self, enabled: bool) -> Self {
205 self.promotion_enabled = enabled;
206 self
207 }
208
209 /// Set TTL scale factor
210 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
211 self.ttl_scale = scale;
212 self
213 }
214
215 /// Set tier level
216 pub fn with_level(mut self, level: usize) -> Self {
217 self.tier_level = level;
218 self
219 }
220}
221
222/// Proxy wrapper to convert L2CacheBackend to CacheBackend
223/// (Rust doesn't support automatic trait upcasting for trait objects)
224struct ProxyCacheBackend {
225 backend: Arc<dyn L2CacheBackend>,
226}
227
228#[async_trait::async_trait]
229impl CacheBackend for ProxyCacheBackend {
230 async fn get(&self, key: &str) -> Option<serde_json::Value> {
231 self.backend.get(key).await
232 }
233
234 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
235 self.backend.set_with_ttl(key, value, ttl).await
236 }
237
238 async fn remove(&self, key: &str) -> Result<()> {
239 self.backend.remove(key).await
240 }
241
242 async fn health_check(&self) -> bool {
243 self.backend.health_check().await
244 }
245
246 fn name(&self) -> &str {
247 self.backend.name()
248 }
249}
250
251/// Cache Manager - Unified operations across multiple cache tiers
252///
253/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
254/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
255/// legacy L1+L2 behavior for backward compatibility.
256pub struct CacheManager {
257 /// Dynamic multi-tier cache architecture (v0.5.0+)
258 /// If Some, this takes precedence over l1_cache/l2_cache fields
259 tiers: Option<Vec<CacheTier>>,
260
261 // ===== Legacy fields (v0.1.0 - v0.4.x) =====
262 // Maintained for backward compatibility
263 /// L1 Cache (trait object for pluggable backends)
264 l1_cache: Arc<dyn CacheBackend>,
265 /// L2 Cache (trait object for pluggable backends)
266 l2_cache: Arc<dyn L2CacheBackend>,
267 /// L2 Cache concrete instance (for invalidation scan_keys)
268 l2_cache_concrete: Option<Arc<L2Cache>>,
269
270 /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
271 streaming_backend: Option<Arc<dyn StreamingBackend>>,
272 /// Statistics
273 total_requests: Arc<AtomicU64>,
274 l1_hits: Arc<AtomicU64>,
275 l2_hits: Arc<AtomicU64>,
276 misses: Arc<AtomicU64>,
277 promotions: Arc<AtomicUsize>,
278 /// In-flight requests to prevent Cache Stampede on L2/compute operations
279 in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
280 /// Invalidation publisher (for broadcasting invalidation messages)
281 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
282 /// Invalidation subscriber (for receiving invalidation messages)
283 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
284 /// Invalidation statistics
285 invalidation_stats: Arc<AtomicInvalidationStats>,
286}
287
288impl CacheManager {
289 /// Create new cache manager with trait objects (pluggable backends)
290 ///
291 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
292 ///
293 /// # Arguments
294 ///
295 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
296 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
297 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
298 ///
299 /// # Example
300 ///
301 /// ```rust,ignore
302 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
303 /// use std::sync::Arc;
304 ///
305 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
306 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
307 ///
308 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
309 /// ```
310 pub async fn new_with_backends(
311 l1_cache: Arc<dyn CacheBackend>,
312 l2_cache: Arc<dyn L2CacheBackend>,
313 streaming_backend: Option<Arc<dyn StreamingBackend>>,
314 ) -> Result<Self> {
315 println!(" 🎯 Initializing Cache Manager with custom backends...");
316 println!(" L1: {}", l1_cache.name());
317 println!(" L2: {}", l2_cache.name());
318 if streaming_backend.is_some() {
319 println!(" Streaming: enabled");
320 } else {
321 println!(" Streaming: disabled");
322 }
323
324 Ok(Self {
325 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
326 l1_cache,
327 l2_cache,
328 l2_cache_concrete: None,
329 streaming_backend,
330 total_requests: Arc::new(AtomicU64::new(0)),
331 l1_hits: Arc::new(AtomicU64::new(0)),
332 l2_hits: Arc::new(AtomicU64::new(0)),
333 misses: Arc::new(AtomicU64::new(0)),
334 promotions: Arc::new(AtomicUsize::new(0)),
335 in_flight_requests: Arc::new(DashMap::new()),
336 invalidation_publisher: None,
337 invalidation_subscriber: None,
338 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
339 })
340 }
341
342 /// Create new cache manager with default backends (backward compatible)
343 ///
344 /// This is the legacy constructor maintained for backward compatibility.
345 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
346 ///
347 /// # Arguments
348 ///
349 /// * `l1_cache` - Moka L1 cache instance
350 /// * `l2_cache` - Redis L2 cache instance
351 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
352 println!(" 🎯 Initializing Cache Manager...");
353
354 // Convert concrete types to trait objects
355 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
356 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
357
358 // Create RedisStreams backend for streaming functionality
359 let redis_url = std::env::var("REDIS_URL")
360 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
361 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
362 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
363
364 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
365 }
366
367 /// Create new cache manager with invalidation support
368 ///
369 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
370 ///
371 /// # Arguments
372 ///
373 /// * `l1_cache` - Moka L1 cache instance
374 /// * `l2_cache` - Redis L2 cache instance
375 /// * `redis_url` - Redis connection URL for Pub/Sub
376 /// * `config` - Invalidation configuration
377 ///
378 /// # Example
379 ///
380 /// ```rust,ignore
381 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
382 ///
383 /// let config = InvalidationConfig {
384 /// channel: "my_app:cache:invalidate".to_string(),
385 /// ..Default::default()
386 /// };
387 ///
388 /// let manager = CacheManager::new_with_invalidation(
389 /// l1, l2, "redis://localhost", config
390 /// ).await?;
391 /// ```
392 pub async fn new_with_invalidation(
393 l1_cache: Arc<L1Cache>,
394 l2_cache: Arc<L2Cache>,
395 redis_url: &str,
396 config: InvalidationConfig,
397 ) -> Result<Self> {
398 println!(" 🎯 Initializing Cache Manager with Invalidation...");
399 println!(" Pub/Sub channel: {}", config.channel);
400
401 // Convert concrete types to trait objects
402 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
403 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
404
405 // Create RedisStreams backend for streaming functionality
406 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
407 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
408
409 // Create publisher
410 let client = redis::Client::open(redis_url)?;
411 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
412 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
413
414 // Create subscriber
415 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
416 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
417
418 let manager = Self {
419 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
420 l1_cache: l1_backend,
421 l2_cache: l2_backend,
422 l2_cache_concrete: Some(l2_cache),
423 streaming_backend: Some(streaming_backend),
424 total_requests: Arc::new(AtomicU64::new(0)),
425 l1_hits: Arc::new(AtomicU64::new(0)),
426 l2_hits: Arc::new(AtomicU64::new(0)),
427 misses: Arc::new(AtomicU64::new(0)),
428 promotions: Arc::new(AtomicUsize::new(0)),
429 in_flight_requests: Arc::new(DashMap::new()),
430 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
431 invalidation_subscriber: Some(Arc::new(subscriber)),
432 invalidation_stats,
433 };
434
435 // Start subscriber with handler
436 manager.start_invalidation_subscriber();
437
438 println!(" ✅ Cache Manager initialized with invalidation support");
439
440 Ok(manager)
441 }
442
443 /// Create new cache manager with multi-tier architecture (v0.5.0+)
444 ///
445 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
446 /// Tiers are checked in order (lower tier_level = faster/hotter).
447 ///
448 /// # Arguments
449 ///
450 /// * `tiers` - Vector of configured cache tiers (must be sorted by tier_level ascending)
451 /// * `streaming_backend` - Optional streaming backend
452 ///
453 /// # Example
454 ///
455 /// ```rust,ignore
456 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
457 /// use std::sync::Arc;
458 ///
459 /// // L1 + L2 + L3 setup
460 /// let l1 = Arc::new(L1Cache::new().await?);
461 /// let l2 = Arc::new(L2Cache::new().await?);
462 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
463 ///
464 /// let tiers = vec![
465 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
466 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
467 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
468 /// ];
469 ///
470 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
471 /// ```
472 pub async fn new_with_tiers(
473 tiers: Vec<CacheTier>,
474 streaming_backend: Option<Arc<dyn StreamingBackend>>,
475 ) -> Result<Self> {
476 println!(" 🎯 Initializing Multi-Tier Cache Manager...");
477 println!(" Tier count: {}", tiers.len());
478 for tier in &tiers {
479 println!(
480 " L{}: {} (promotion={}, ttl_scale={})",
481 tier.tier_level,
482 tier.stats.backend_name,
483 tier.promotion_enabled,
484 tier.ttl_scale
485 );
486 }
487
488 // Validate tiers are sorted by level
489 for i in 1..tiers.len() {
490 if tiers[i].tier_level <= tiers[i - 1].tier_level {
491 anyhow::bail!(
492 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
493 tiers[i].tier_level,
494 tiers[i - 1].tier_level
495 );
496 }
497 }
498
499 // For backward compatibility with legacy code, we need dummy l1/l2 caches
500 // Use first tier as l1, second tier as l2 if available
501 let (l1_cache, l2_cache) = if tiers.len() >= 2 {
502 (tiers[0].backend.clone(), tiers[1].backend.clone())
503 } else if tiers.len() == 1 {
504 // Only one tier - use it for both
505 let tier = tiers[0].backend.clone();
506 (tier.clone(), tier)
507 } else {
508 anyhow::bail!("At least one cache tier is required");
509 };
510
511 // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
512 let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
513 backend: l1_cache.clone(),
514 });
515
516 Ok(Self {
517 tiers: Some(tiers),
518 l1_cache: l1_backend,
519 l2_cache,
520 l2_cache_concrete: None,
521 streaming_backend,
522 total_requests: Arc::new(AtomicU64::new(0)),
523 l1_hits: Arc::new(AtomicU64::new(0)),
524 l2_hits: Arc::new(AtomicU64::new(0)),
525 misses: Arc::new(AtomicU64::new(0)),
526 promotions: Arc::new(AtomicUsize::new(0)),
527 in_flight_requests: Arc::new(DashMap::new()),
528 invalidation_publisher: None,
529 invalidation_subscriber: None,
530 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
531 })
532 }
533
534 /// Start the invalidation subscriber background task
535 fn start_invalidation_subscriber(&self) {
536 if let Some(subscriber) = &self.invalidation_subscriber {
537 let l1_cache = Arc::clone(&self.l1_cache);
538 let l2_cache_concrete = self.l2_cache_concrete.clone();
539
540 subscriber.start(move |msg| {
541 let l1 = Arc::clone(&l1_cache);
542 let _l2 = l2_cache_concrete.clone();
543
544 async move {
545 match msg {
546 InvalidationMessage::Remove { key } => {
547 // Remove from L1
548 l1.remove(&key).await?;
549 println!("🗑️ [Invalidation] Removed '{}' from L1", key);
550 }
551 InvalidationMessage::Update { key, value, ttl_secs } => {
552 // Update L1 with new value
553 let ttl = ttl_secs
554 .map(Duration::from_secs)
555 .unwrap_or_else(|| Duration::from_secs(300));
556 l1.set_with_ttl(&key, value, ttl).await?;
557 println!("🔄 [Invalidation] Updated '{}' in L1", key);
558 }
559 InvalidationMessage::RemovePattern { pattern } => {
560 // For pattern-based invalidation, we can't easily iterate L1 cache
561 // So we just log it. The pattern invalidation is mainly for L2.
562 // L1 entries will naturally expire via TTL.
563 println!("🔍 [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
564 }
565 InvalidationMessage::RemoveBulk { keys } => {
566 // Remove multiple keys from L1
567 for key in keys {
568 if let Err(e) = l1.remove(&key).await {
569 eprintln!("⚠️ Failed to remove '{}' from L1: {}", key, e);
570 }
571 }
572 println!("🗑️ [Invalidation] Bulk removed keys from L1");
573 }
574 }
575 Ok(())
576 }
577 });
578
579 println!("📡 Invalidation subscriber started");
580 }
581 }
582
583 /// Get value from cache using multi-tier architecture (v0.5.0+)
584 ///
585 /// This method iterates through all configured tiers and automatically promotes
586 /// to upper tiers on cache hit.
587 async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
588 let tiers = self.tiers.as_ref().unwrap(); // Safe: only called when tiers is Some
589
590 // Try each tier sequentially (sorted by tier_level)
591 for (tier_index, tier) in tiers.iter().enumerate() {
592 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
593 // Cache hit!
594 tier.record_hit();
595
596 // Promote to all upper tiers (if promotion enabled)
597 if tier.promotion_enabled && tier_index > 0 {
598 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
599
600 // Promote to all tiers above this one
601 for upper_tier in tiers[..tier_index].iter().rev() {
602 if let Err(e) = upper_tier.set_with_ttl(key, value.clone(), promotion_ttl).await {
603 eprintln!(
604 "⚠️ Failed to promote '{}' from L{} to L{}: {}",
605 key, tier.tier_level, upper_tier.tier_level, e
606 );
607 } else {
608 self.promotions.fetch_add(1, Ordering::Relaxed);
609 println!(
610 "⬆️ Promoted '{}' from L{} to L{} (TTL: {:?})",
611 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
612 );
613 }
614 }
615 }
616
617 return Ok(Some(value));
618 }
619 }
620
621 // Cache miss across all tiers
622 Ok(None)
623 }
624
625 /// Get value from cache (L1 first, then L2 fallback with promotion)
626 ///
627 /// This method now includes built-in Cache Stampede protection when cache misses occur.
628 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
629 /// unnecessary duplicate work on external data sources.
630 ///
631 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
632 ///
633 /// # Arguments
634 /// * `key` - Cache key to retrieve
635 ///
636 /// # Returns
637 /// * `Ok(Some(value))` - Cache hit, value found in any tier
638 /// * `Ok(None)` - Cache miss, value not found in any cache
639 /// * `Err(error)` - Cache operation failed
640 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
641 self.total_requests.fetch_add(1, Ordering::Relaxed);
642
643 // NEW: Multi-tier mode (v0.5.0+)
644 if self.tiers.is_some() {
645 // Fast path for L1 (first tier) - no locking needed
646 if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
647 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
648 tier1.record_hit();
649 // Update legacy stats for backward compatibility
650 self.l1_hits.fetch_add(1, Ordering::Relaxed);
651 return Ok(Some(value));
652 }
653 }
654
655 // L1 miss - use stampede protection for lower tiers
656 let key_owned = key.to_string();
657 let lock_guard = self.in_flight_requests
658 .entry(key_owned.clone())
659 .or_insert_with(|| Arc::new(Mutex::new(())))
660 .clone();
661
662 let _guard = lock_guard.lock().await;
663 let cleanup_guard = CleanupGuard {
664 map: &self.in_flight_requests,
665 key: key_owned.clone(),
666 };
667
668 // Double-check L1 after acquiring lock
669 if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
670 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
671 tier1.record_hit();
672 self.l1_hits.fetch_add(1, Ordering::Relaxed);
673 return Ok(Some(value));
674 }
675 }
676
677 // Check remaining tiers with promotion
678 let result = self.get_multi_tier(key).await?;
679
680 if result.is_some() {
681 // Hit in L2+ tier - update legacy stats
682 if self.tiers.as_ref().unwrap().len() >= 2 {
683 self.l2_hits.fetch_add(1, Ordering::Relaxed);
684 }
685 } else {
686 self.misses.fetch_add(1, Ordering::Relaxed);
687 }
688
689 drop(cleanup_guard);
690 return Ok(result);
691 }
692
693 // LEGACY: 2-tier mode (L1 + L2)
694 // Fast path: Try L1 first (no locking needed)
695 if let Some(value) = self.l1_cache.get(key).await {
696 self.l1_hits.fetch_add(1, Ordering::Relaxed);
697 return Ok(Some(value));
698 }
699
700 // L1 miss - implement Cache Stampede protection for L2 lookup
701 let key_owned = key.to_string();
702 let lock_guard = self.in_flight_requests
703 .entry(key_owned.clone())
704 .or_insert_with(|| Arc::new(Mutex::new(())))
705 .clone();
706
707 let _guard = lock_guard.lock().await;
708
709 // RAII cleanup guard - ensures entry is removed even on early return or panic
710 let cleanup_guard = CleanupGuard {
711 map: &self.in_flight_requests,
712 key: key_owned.clone(),
713 };
714
715 // Double-check L1 cache after acquiring lock
716 // (Another concurrent request might have populated it while we were waiting)
717 if let Some(value) = self.l1_cache.get(key).await {
718 self.l1_hits.fetch_add(1, Ordering::Relaxed);
719 // cleanup_guard will auto-remove entry on drop
720 return Ok(Some(value));
721 }
722
723 // Check L2 cache with TTL information
724 if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
725 self.l2_hits.fetch_add(1, Ordering::Relaxed);
726
727 // Promote to L1 with same TTL as Redis (or default if no TTL)
728 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
729
730 if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
731 // L1 promotion failed, but we still have the data
732 eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
733 } else {
734 self.promotions.fetch_add(1, Ordering::Relaxed);
735 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
736 }
737
738 // cleanup_guard will auto-remove entry on drop
739 return Ok(Some(value));
740 }
741
742 // Both L1 and L2 miss
743 self.misses.fetch_add(1, Ordering::Relaxed);
744
745 // cleanup_guard will auto-remove entry on drop
746 drop(cleanup_guard);
747
748 Ok(None)
749 }
750
751 /// Get value from cache with fallback computation (enhanced backward compatibility)
752 ///
753 /// This is a convenience method that combines `get()` with optional computation.
754 /// If the value is not found in cache, it will execute the compute function
755 /// and cache the result automatically.
756 ///
757 /// # Arguments
758 /// * `key` - Cache key
759 /// * `compute_fn` - Optional function to compute value if not in cache
760 /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
761 ///
762 /// # Returns
763 /// * `Ok(Some(value))` - Value found in cache or computed successfully
764 /// * `Ok(None)` - Value not in cache and no compute function provided
765 /// * `Err(error)` - Cache operation or computation failed
766 ///
767 /// # Example
768 /// ```ignore
769 /// // Simple cache get (existing behavior)
770 /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
771 ///
772 /// // Get with computation fallback (new enhanced behavior)
773 /// let api_data = cache_manager.get_with_fallback(
774 /// "api_response",
775 /// Some(|| async { fetch_data_from_api().await }),
776 /// Some(CacheStrategy::RealTime)
777 /// ).await?;
778 /// ```
779
780 /// Set value with specific cache strategy (all tiers)
781 ///
782 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
783 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
784 pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
785 let ttl = strategy.to_duration();
786
787 // NEW: Multi-tier mode (v0.5.0+)
788 if let Some(tiers) = &self.tiers {
789 // Store in ALL tiers with their respective TTL scaling
790 let mut success_count = 0;
791 let mut last_error = None;
792
793 for tier in tiers {
794 match tier.set_with_ttl(key, value.clone(), ttl).await {
795 Ok(_) => {
796 success_count += 1;
797 }
798 Err(e) => {
799 eprintln!("⚠️ L{} cache set failed for key '{}': {}", tier.tier_level, key, e);
800 last_error = Some(e);
801 }
802 }
803 }
804
805 if success_count > 0 {
806 println!("💾 [Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
807 key, success_count, tiers.len(), ttl);
808 return Ok(());
809 } else {
810 return Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{}'", key)));
811 }
812 }
813
814 // LEGACY: 2-tier mode (L1 + L2)
815 // Store in both L1 and L2
816 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
817 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
818
819 // Return success if at least one cache succeeded
820 match (l1_result, l2_result) {
821 (Ok(_), Ok(_)) => {
822 // Both succeeded
823 println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
824 Ok(())
825 }
826 (Ok(_), Err(_)) => {
827 // L1 succeeded, L2 failed
828 eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
829 println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
830 Ok(())
831 }
832 (Err(_), Ok(_)) => {
833 // L1 failed, L2 succeeded
834 eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
835 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
836 Ok(())
837 }
838 (Err(e1), Err(_e2)) => {
839 // Both failed
840 Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
841 }
842 }
843 }
844
845 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
846 ///
847 /// This method provides comprehensive Cache Stampede protection:
848 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
849 /// 2. Check L2 cache with mutex-based coalescing
850 /// 3. Compute fresh data with protection against concurrent computations
851 ///
852 /// # Arguments
853 /// * `key` - Cache key
854 /// * `strategy` - Cache strategy for TTL and storage behavior
855 /// * `compute_fn` - Async function to compute the value if not in any cache
856 ///
857 /// # Example
858 /// ```ignore
859 /// let api_data = cache_manager.get_or_compute_with(
860 /// "api_response",
861 /// CacheStrategy::RealTime,
862 /// || async {
863 /// fetch_data_from_api().await
864 /// }
865 /// ).await?;
866 /// ```
867 #[allow(dead_code)]
868 pub async fn get_or_compute_with<F, Fut>(
869 &self,
870 key: &str,
871 strategy: CacheStrategy,
872 compute_fn: F,
873 ) -> Result<serde_json::Value>
874 where
875 F: FnOnce() -> Fut + Send,
876 Fut: Future<Output = Result<serde_json::Value>> + Send,
877 {
878 self.total_requests.fetch_add(1, Ordering::Relaxed);
879
880 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
881 if let Some(value) = self.l1_cache.get(key).await {
882 self.l1_hits.fetch_add(1, Ordering::Relaxed);
883 return Ok(value);
884 }
885
886 // 2. L1 miss - try L2 with Cache Stampede protection
887 let key_owned = key.to_string();
888 let lock_guard = self.in_flight_requests
889 .entry(key_owned.clone())
890 .or_insert_with(|| Arc::new(Mutex::new(())))
891 .clone();
892
893 let _guard = lock_guard.lock().await;
894
895 // RAII cleanup guard - ensures entry is removed even on early return or panic
896 let _cleanup_guard = CleanupGuard {
897 map: &self.in_flight_requests,
898 key: key_owned,
899 };
900
901 // 3. Double-check L1 cache after acquiring lock
902 // (Another request might have populated it while we were waiting)
903 if let Some(value) = self.l1_cache.get(key).await {
904 self.l1_hits.fetch_add(1, Ordering::Relaxed);
905 // _cleanup_guard will auto-remove entry on drop
906 return Ok(value);
907 }
908
909 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
910 if let Some(tiers) = &self.tiers {
911 // Check tiers starting from index 1 (skip L1 since already checked)
912 for tier in tiers.iter().skip(1) {
913 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
914 tier.record_hit();
915
916 // Promote to L1 (first tier)
917 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
918 if let Err(e) = tiers[0].set_with_ttl(key, value.clone(), promotion_ttl).await {
919 eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}", key, tier.tier_level, e);
920 } else {
921 self.promotions.fetch_add(1, Ordering::Relaxed);
922 println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
923 key, tier.tier_level, promotion_ttl);
924 }
925
926 // _cleanup_guard will auto-remove entry on drop
927 return Ok(value);
928 }
929 }
930 } else {
931 // LEGACY: Check L2 cache with TTL
932 if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
933 self.l2_hits.fetch_add(1, Ordering::Relaxed);
934
935 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
936 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
937
938 if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
939 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
940 } else {
941 self.promotions.fetch_add(1, Ordering::Relaxed);
942 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
943 }
944
945 // _cleanup_guard will auto-remove entry on drop
946 return Ok(value);
947 }
948 }
949
950 // 5. Cache miss across all tiers - compute fresh data
951 println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
952 let fresh_data = compute_fn().await?;
953
954 // 6. Store in both caches
955 if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
956 eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
957 }
958
959 // 7. _cleanup_guard will auto-remove entry on drop
960
961 Ok(fresh_data)
962 }
963
964 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
965 ///
966 /// This method provides the same functionality as `get_or_compute_with()` but with
967 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
968 /// API calls, or any computation that returns structured data.
969 ///
970 /// # Type Safety
971 ///
972 /// - Returns your actual type `T` instead of `serde_json::Value`
973 /// - Compiler enforces Serialize + DeserializeOwned bounds
974 /// - No manual JSON conversion needed
975 ///
976 /// # Cache Flow
977 ///
978 /// 1. Check L1 cache → deserialize if found
979 /// 2. Check L2 cache → deserialize + promote to L1 if found
980 /// 3. Execute compute_fn → serialize → store in L1+L2
981 /// 4. Full stampede protection (only ONE request computes)
982 ///
983 /// # Arguments
984 ///
985 /// * `key` - Cache key
986 /// * `strategy` - Cache strategy for TTL
987 /// * `compute_fn` - Async function returning `Result<T>`
988 ///
989 /// # Example - Database Query
990 ///
991 /// ```no_run
992 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
993 /// # use std::sync::Arc;
994 /// # use serde::{Serialize, Deserialize};
995 /// # async fn example() -> anyhow::Result<()> {
996 /// # let l1 = Arc::new(L1Cache::new().await?);
997 /// # let l2 = Arc::new(L2Cache::new().await?);
998 /// # let cache_manager = CacheManager::new(l1, l2);
999 ///
1000 /// #[derive(Serialize, Deserialize)]
1001 /// struct User {
1002 /// id: i64,
1003 /// name: String,
1004 /// }
1005 ///
1006 /// // Type-safe database caching (example - requires sqlx)
1007 /// // let user: User = cache_manager.get_or_compute_typed(
1008 /// // "user:123",
1009 /// // CacheStrategy::MediumTerm,
1010 /// // || async {
1011 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1012 /// // .bind(123)
1013 /// // .fetch_one(&pool)
1014 /// // .await
1015 /// // }
1016 /// // ).await?;
1017 /// # Ok(())
1018 /// # }
1019 /// ```
1020 ///
1021 /// # Example - API Call
1022 ///
1023 /// ```no_run
1024 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1025 /// # use std::sync::Arc;
1026 /// # use serde::{Serialize, Deserialize};
1027 /// # async fn example() -> anyhow::Result<()> {
1028 /// # let l1 = Arc::new(L1Cache::new().await?);
1029 /// # let l2 = Arc::new(L2Cache::new().await?);
1030 /// # let cache_manager = CacheManager::new(l1, l2);
1031 /// #[derive(Serialize, Deserialize)]
1032 /// struct ApiResponse {
1033 /// data: String,
1034 /// timestamp: i64,
1035 /// }
1036 ///
1037 /// // API call caching (example - requires reqwest)
1038 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1039 /// // "api:endpoint",
1040 /// // CacheStrategy::RealTime,
1041 /// // || async {
1042 /// // reqwest::get("https://api.example.com/data")
1043 /// // .await?
1044 /// // .json::<ApiResponse>()
1045 /// // .await
1046 /// // }
1047 /// // ).await?;
1048 /// # Ok(())
1049 /// # }
1050 /// ```
1051 ///
1052 /// # Performance
1053 ///
1054 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1055 /// - L2 hit: 2-5ms + deserialization + L1 promotion
1056 /// - Compute: Your function time + serialization + L1+L2 storage
1057 /// - Stampede protection: 99.6% latency reduction under high concurrency
1058 ///
1059 /// # Errors
1060 ///
1061 /// Returns error if:
1062 /// - Compute function fails
1063 /// - Serialization fails (invalid type for JSON)
1064 /// - Deserialization fails (cache data doesn't match type T)
1065 /// - Cache operations fail (Redis connection issues)
1066 pub async fn get_or_compute_typed<T, F, Fut>(
1067 &self,
1068 key: &str,
1069 strategy: CacheStrategy,
1070 compute_fn: F,
1071 ) -> Result<T>
1072 where
1073 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1074 F: FnOnce() -> Fut + Send,
1075 Fut: Future<Output = Result<T>> + Send,
1076 {
1077 self.total_requests.fetch_add(1, Ordering::Relaxed);
1078
1079 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1080 if let Some(cached_json) = self.l1_cache.get(key).await {
1081 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1082
1083 // Attempt to deserialize from JSON to type T
1084 match serde_json::from_value::<T>(cached_json) {
1085 Ok(typed_value) => {
1086 println!("✅ [L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
1087 return Ok(typed_value);
1088 }
1089 Err(e) => {
1090 // Deserialization failed - cache data may be stale or corrupt
1091 eprintln!("⚠️ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1092 // Fall through to recompute
1093 }
1094 }
1095 }
1096
1097 // 2. L1 miss - try L2 with Cache Stampede protection
1098 let key_owned = key.to_string();
1099 let lock_guard = self.in_flight_requests
1100 .entry(key_owned.clone())
1101 .or_insert_with(|| Arc::new(Mutex::new(())))
1102 .clone();
1103
1104 let _guard = lock_guard.lock().await;
1105
1106 // RAII cleanup guard - ensures entry is removed even on early return or panic
1107 let _cleanup_guard = CleanupGuard {
1108 map: &self.in_flight_requests,
1109 key: key_owned,
1110 };
1111
1112 // 3. Double-check L1 cache after acquiring lock
1113 // (Another request might have populated it while we were waiting)
1114 if let Some(cached_json) = self.l1_cache.get(key).await {
1115 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1116 if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1117 println!("✅ [L1 HIT] Deserialized '{}' after lock acquisition", key);
1118 return Ok(typed_value);
1119 }
1120 }
1121
1122 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1123 if let Some(tiers) = &self.tiers {
1124 // Check tiers starting from index 1 (skip L1 since already checked)
1125 for tier in tiers.iter().skip(1) {
1126 if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1127 tier.record_hit();
1128
1129 // Attempt to deserialize
1130 match serde_json::from_value::<T>(cached_json.clone()) {
1131 Ok(typed_value) => {
1132 println!("✅ [L{} HIT] Deserialized '{}' to type {}",
1133 tier.tier_level, key, std::any::type_name::<T>());
1134
1135 // Promote to L1 (first tier)
1136 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1137 if let Err(e) = tiers[0].set_with_ttl(key, cached_json, promotion_ttl).await {
1138 eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}",
1139 key, tier.tier_level, e);
1140 } else {
1141 self.promotions.fetch_add(1, Ordering::Relaxed);
1142 println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1143 key, tier.tier_level, promotion_ttl);
1144 }
1145
1146 return Ok(typed_value);
1147 }
1148 Err(e) => {
1149 eprintln!("⚠️ L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1150 tier.tier_level, key, e);
1151 // Continue to next tier
1152 }
1153 }
1154 }
1155 }
1156 } else {
1157 // LEGACY: Check L2 cache with TTL
1158 if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1159 self.l2_hits.fetch_add(1, Ordering::Relaxed);
1160
1161 // Attempt to deserialize
1162 match serde_json::from_value::<T>(cached_json.clone()) {
1163 Ok(typed_value) => {
1164 println!("✅ [L2 HIT] Deserialized '{}' from Redis", key);
1165
1166 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1167 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1168
1169 if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
1170 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
1171 } else {
1172 self.promotions.fetch_add(1, Ordering::Relaxed);
1173 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
1174 }
1175
1176 return Ok(typed_value);
1177 }
1178 Err(e) => {
1179 eprintln!("⚠️ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1180 // Fall through to recompute
1181 }
1182 }
1183 }
1184 }
1185
1186 // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1187 println!("💻 Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
1188 let typed_value = compute_fn().await?;
1189
1190 // 6. Serialize to JSON for storage
1191 let json_value = serde_json::to_value(&typed_value)
1192 .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
1193
1194 // 7. Store in both L1 and L2 caches
1195 if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1196 eprintln!("⚠️ Failed to cache computed typed data for key '{}': {}", key, e);
1197 } else {
1198 println!("💾 Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
1199 }
1200
1201 // 8. _cleanup_guard will auto-remove entry on drop
1202
1203 Ok(typed_value)
1204 }
1205
1206 /// Get comprehensive cache statistics
1207 ///
1208 /// In multi-tier mode, aggregates statistics from all tiers.
1209 /// In legacy mode, returns L1 and L2 stats.
1210 #[allow(dead_code)]
1211 pub fn get_stats(&self) -> CacheManagerStats {
1212 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1213 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1214 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1215 let misses = self.misses.load(Ordering::Relaxed);
1216
1217 CacheManagerStats {
1218 total_requests: total_reqs,
1219 l1_hits,
1220 l2_hits,
1221 total_hits: l1_hits + l2_hits,
1222 misses,
1223 hit_rate: if total_reqs > 0 {
1224 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1225 } else { 0.0 },
1226 l1_hit_rate: if total_reqs > 0 {
1227 (l1_hits as f64 / total_reqs as f64) * 100.0
1228 } else { 0.0 },
1229 promotions: self.promotions.load(Ordering::Relaxed),
1230 in_flight_requests: self.in_flight_requests.len(),
1231 }
1232 }
1233
1234 /// Get per-tier statistics (v0.5.0+)
1235 ///
1236 /// Returns statistics for each tier if multi-tier mode is enabled.
1237 /// Returns None if using legacy 2-tier mode.
1238 ///
1239 /// # Example
1240 /// ```rust,ignore
1241 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1242 /// for stats in tier_stats {
1243 /// println!("L{}: {} hits ({})",
1244 /// stats.tier_level,
1245 /// stats.hit_count(),
1246 /// stats.backend_name);
1247 /// }
1248 /// }
1249 /// ```
1250 pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1251 self.tiers.as_ref().map(|tiers| {
1252 tiers.iter().map(|tier| tier.stats.clone()).collect()
1253 })
1254 }
1255
1256 // ===== Redis Streams Methods =====
1257
1258 /// Publish data to Redis Stream
1259 ///
1260 /// # Arguments
1261 /// * `stream_key` - Name of the stream (e.g., "events_stream")
1262 /// * `fields` - Field-value pairs to publish
1263 /// * `maxlen` - Optional max length for stream trimming
1264 ///
1265 /// # Returns
1266 /// The entry ID generated by Redis
1267 ///
1268 /// # Errors
1269 /// Returns error if streaming backend is not configured
1270 pub async fn publish_to_stream(
1271 &self,
1272 stream_key: &str,
1273 fields: Vec<(String, String)>,
1274 maxlen: Option<usize>
1275 ) -> Result<String> {
1276 match &self.streaming_backend {
1277 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1278 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1279 }
1280 }
1281
1282 /// Read latest entries from Redis Stream
1283 ///
1284 /// # Arguments
1285 /// * `stream_key` - Name of the stream
1286 /// * `count` - Number of latest entries to retrieve
1287 ///
1288 /// # Returns
1289 /// Vector of (entry_id, fields) tuples (newest first)
1290 ///
1291 /// # Errors
1292 /// Returns error if streaming backend is not configured
1293 pub async fn read_stream_latest(
1294 &self,
1295 stream_key: &str,
1296 count: usize
1297 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1298 match &self.streaming_backend {
1299 Some(backend) => backend.stream_read_latest(stream_key, count).await,
1300 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1301 }
1302 }
1303
1304 /// Read from Redis Stream with optional blocking
1305 ///
1306 /// # Arguments
1307 /// * `stream_key` - Name of the stream
1308 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1309 /// * `count` - Max entries to retrieve
1310 /// * `block_ms` - Optional blocking timeout in ms
1311 ///
1312 /// # Returns
1313 /// Vector of (entry_id, fields) tuples
1314 ///
1315 /// # Errors
1316 /// Returns error if streaming backend is not configured
1317 pub async fn read_stream(
1318 &self,
1319 stream_key: &str,
1320 last_id: &str,
1321 count: usize,
1322 block_ms: Option<usize>
1323 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1324 match &self.streaming_backend {
1325 Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
1326 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1327 }
1328 }
1329
1330 // ===== Cache Invalidation Methods =====
1331
1332 /// Invalidate a cache key across all instances
1333 ///
1334 /// This removes the key from all cache tiers and broadcasts
1335 /// the invalidation to all other cache instances via Redis Pub/Sub.
1336 ///
1337 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1338 ///
1339 /// # Arguments
1340 /// * `key` - Cache key to invalidate
1341 ///
1342 /// # Example
1343 /// ```rust,ignore
1344 /// // Invalidate user cache after profile update
1345 /// cache_manager.invalidate("user:123").await?;
1346 /// ```
1347 pub async fn invalidate(&self, key: &str) -> Result<()> {
1348 // NEW: Multi-tier mode (v0.5.0+)
1349 if let Some(tiers) = &self.tiers {
1350 // Remove from ALL tiers
1351 for tier in tiers {
1352 if let Err(e) = tier.remove(key).await {
1353 eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1354 }
1355 }
1356 } else {
1357 // LEGACY: 2-tier mode
1358 self.l1_cache.remove(key).await?;
1359 self.l2_cache.remove(key).await?;
1360 }
1361
1362 // Broadcast to other instances
1363 if let Some(publisher) = &self.invalidation_publisher {
1364 let mut pub_lock = publisher.lock().await;
1365 let msg = InvalidationMessage::remove(key);
1366 pub_lock.publish(&msg).await?;
1367 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1368 }
1369
1370 println!("🗑️ Invalidated '{}' across all instances", key);
1371 Ok(())
1372 }
1373
1374 /// Update cache value across all instances
1375 ///
1376 /// This updates the key in all cache tiers and broadcasts
1377 /// the update to all other cache instances, avoiding cache misses.
1378 ///
1379 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1380 ///
1381 /// # Arguments
1382 /// * `key` - Cache key to update
1383 /// * `value` - New value
1384 /// * `ttl` - Optional TTL (uses default if None)
1385 ///
1386 /// # Example
1387 /// ```rust,ignore
1388 /// // Update user cache with new data
1389 /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1390 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1391 /// ```
1392 pub async fn update_cache(
1393 &self,
1394 key: &str,
1395 value: serde_json::Value,
1396 ttl: Option<Duration>,
1397 ) -> Result<()> {
1398 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1399
1400 // NEW: Multi-tier mode (v0.5.0+)
1401 if let Some(tiers) = &self.tiers {
1402 // Update ALL tiers with their respective TTL scaling
1403 for tier in tiers {
1404 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1405 eprintln!("⚠️ Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1406 }
1407 }
1408 } else {
1409 // LEGACY: 2-tier mode
1410 self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1411 self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1412 }
1413
1414 // Broadcast update to other instances
1415 if let Some(publisher) = &self.invalidation_publisher {
1416 let mut pub_lock = publisher.lock().await;
1417 let msg = InvalidationMessage::update(key, value, Some(ttl));
1418 pub_lock.publish(&msg).await?;
1419 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1420 }
1421
1422 println!("🔄 Updated '{}' across all instances", key);
1423 Ok(())
1424 }
1425
1426 /// Invalidate all keys matching a pattern
1427 ///
1428 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1429 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1430 ///
1431 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1432 ///
1433 /// **Note**: Pattern scanning requires a concrete L2Cache instance with `scan_keys()`.
1434 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1435 ///
1436 /// # Arguments
1437 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1438 ///
1439 /// # Example
1440 /// ```rust,ignore
1441 /// // Invalidate all user caches
1442 /// cache_manager.invalidate_pattern("user:*").await?;
1443 ///
1444 /// // Invalidate specific user's related caches
1445 /// cache_manager.invalidate_pattern("user:123:*").await?;
1446 /// ```
1447 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1448 // Scan L2 for matching keys
1449 // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1450 let keys = if let Some(l2) = &self.l2_cache_concrete {
1451 l2.scan_keys(pattern).await?
1452 } else {
1453 return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
1454 };
1455
1456 if keys.is_empty() {
1457 println!("🔍 No keys found matching pattern '{}'", pattern);
1458 return Ok(());
1459 }
1460
1461 // NEW: Multi-tier mode (v0.5.0+)
1462 if let Some(tiers) = &self.tiers {
1463 // Remove from ALL tiers
1464 for key in &keys {
1465 for tier in tiers {
1466 if let Err(e) = tier.remove(key).await {
1467 eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1468 }
1469 }
1470 }
1471 } else {
1472 // LEGACY: 2-tier mode - Remove from L2 in bulk
1473 if let Some(l2) = &self.l2_cache_concrete {
1474 l2.remove_bulk(&keys).await?;
1475 }
1476 }
1477
1478 // Broadcast pattern invalidation
1479 if let Some(publisher) = &self.invalidation_publisher {
1480 let mut pub_lock = publisher.lock().await;
1481 let msg = InvalidationMessage::remove_bulk(keys.clone());
1482 pub_lock.publish(&msg).await?;
1483 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1484 }
1485
1486 println!("🔍 Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
1487 Ok(())
1488 }
1489
1490 /// Set value with automatic broadcast to all instances
1491 ///
1492 /// This is a write-through operation that updates the cache and
1493 /// broadcasts the update to all other instances automatically.
1494 ///
1495 /// # Arguments
1496 /// * `key` - Cache key
1497 /// * `value` - Value to cache
1498 /// * `strategy` - Cache strategy (determines TTL)
1499 ///
1500 /// # Example
1501 /// ```rust,ignore
1502 /// // Update and broadcast in one call
1503 /// let data = serde_json::json!({"status": "active"});
1504 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1505 /// ```
1506 pub async fn set_with_broadcast(
1507 &self,
1508 key: &str,
1509 value: serde_json::Value,
1510 strategy: CacheStrategy,
1511 ) -> Result<()> {
1512 let ttl = strategy.to_duration();
1513
1514 // Set in local caches
1515 self.set_with_strategy(key, value.clone(), strategy).await?;
1516
1517 // Broadcast update if invalidation is enabled
1518 if let Some(publisher) = &self.invalidation_publisher {
1519 let mut pub_lock = publisher.lock().await;
1520 let msg = InvalidationMessage::update(key, value, Some(ttl));
1521 pub_lock.publish(&msg).await?;
1522 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1523 }
1524
1525 Ok(())
1526 }
1527
1528 /// Get invalidation statistics
1529 ///
1530 /// Returns statistics about invalidation operations if invalidation is enabled.
1531 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1532 if self.invalidation_subscriber.is_some() {
1533 Some(self.invalidation_stats.snapshot())
1534 } else {
1535 None
1536 }
1537 }
1538}
1539
1540/// Cache Manager statistics
1541#[allow(dead_code)]
1542#[derive(Debug, Clone)]
1543pub struct CacheManagerStats {
1544 pub total_requests: u64,
1545 pub l1_hits: u64,
1546 pub l2_hits: u64,
1547 pub total_hits: u64,
1548 pub misses: u64,
1549 pub hit_rate: f64,
1550 pub l1_hit_rate: f64,
1551 pub promotions: usize,
1552 pub in_flight_requests: usize,
1553}