multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use super::l1_cache::L1Cache;
15use super::l2_cache::L2Cache;
16use super::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
17use super::invalidation::{
18 InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
19 InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
20};
21
22/// RAII cleanup guard for in-flight request tracking
23/// Ensures that entries are removed from DashMap even on early return or panic
24struct CleanupGuard<'a> {
25 map: &'a DashMap<String, Arc<Mutex<()>>>,
26 key: String,
27}
28
29impl<'a> Drop for CleanupGuard<'a> {
30 fn drop(&mut self) {
31 self.map.remove(&self.key);
32 }
33}
34
35/// Cache strategies for different data types
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub enum CacheStrategy {
39 /// Real-time data - 10 seconds TTL
40 RealTime,
41 /// Short-term data - 5 minutes TTL
42 ShortTerm,
43 /// Medium-term data - 1 hour TTL
44 MediumTerm,
45 /// Long-term data - 3 hours TTL
46 LongTerm,
47 /// Custom TTL
48 Custom(Duration),
49 /// Default strategy (5 minutes)
50 Default,
51}
52
53impl CacheStrategy {
54 /// Convert strategy to duration
55 pub fn to_duration(&self) -> Duration {
56 match self {
57 Self::RealTime => Duration::from_secs(10),
58 Self::ShortTerm => Duration::from_secs(300), // 5 minutes
59 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
60 Self::LongTerm => Duration::from_secs(10800), // 3 hours
61 Self::Custom(duration) => *duration,
62 Self::Default => Duration::from_secs(300),
63 }
64 }
65}
66
67/// Statistics for a single cache tier
68#[derive(Debug, Clone)]
69pub struct TierStats {
70 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
71 pub tier_level: usize,
72 /// Number of cache hits at this tier
73 pub hits: Arc<AtomicU64>,
74 /// Backend name for identification
75 pub backend_name: String,
76}
77
78impl TierStats {
79 fn new(tier_level: usize, backend_name: String) -> Self {
80 Self {
81 tier_level,
82 hits: Arc::new(AtomicU64::new(0)),
83 backend_name,
84 }
85 }
86
87 /// Get current hit count
88 pub fn hit_count(&self) -> u64 {
89 self.hits.load(Ordering::Relaxed)
90 }
91}
92
93/// A single cache tier in the multi-tier architecture
94pub struct CacheTier {
95 /// Cache backend for this tier
96 backend: Arc<dyn L2CacheBackend>,
97 /// Tier level (1 = hottest/fastest, higher = colder/slower)
98 tier_level: usize,
99 /// Enable automatic promotion to upper tiers on cache hit
100 promotion_enabled: bool,
101 /// TTL scale factor (multiplier for TTL when storing/promoting)
102 ttl_scale: f64,
103 /// Statistics for this tier
104 stats: TierStats,
105}
106
107impl CacheTier {
108 /// Create a new cache tier
109 pub fn new(
110 backend: Arc<dyn L2CacheBackend>,
111 tier_level: usize,
112 promotion_enabled: bool,
113 ttl_scale: f64,
114 ) -> Self {
115 let backend_name = backend.name().to_string();
116 Self {
117 backend,
118 tier_level,
119 promotion_enabled,
120 ttl_scale,
121 stats: TierStats::new(tier_level, backend_name),
122 }
123 }
124
125 /// Get value with TTL from this tier
126 async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
127 self.backend.get_with_ttl(key).await
128 }
129
130 /// Set value with TTL in this tier
131 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
132 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
133 self.backend.set_with_ttl(key, value, scaled_ttl).await
134 }
135
136 /// Remove value from this tier
137 async fn remove(&self, key: &str) -> Result<()> {
138 self.backend.remove(key).await
139 }
140
141 /// Record a cache hit for this tier
142 fn record_hit(&self) {
143 self.stats.hits.fetch_add(1, Ordering::Relaxed);
144 }
145}
146
147/// Configuration for a cache tier (used in builder pattern)
148#[derive(Debug, Clone)]
149pub struct TierConfig {
150 /// Tier level (1, 2, 3, 4...)
151 pub tier_level: usize,
152 /// Enable promotion to upper tiers on hit
153 pub promotion_enabled: bool,
154 /// TTL scale factor (1.0 = same as base TTL)
155 pub ttl_scale: f64,
156}
157
158impl TierConfig {
159 /// Create new tier configuration
160 pub fn new(tier_level: usize) -> Self {
161 Self {
162 tier_level,
163 promotion_enabled: true,
164 ttl_scale: 1.0,
165 }
166 }
167
168 /// Configure as L1 (hot tier)
169 pub fn as_l1() -> Self {
170 Self {
171 tier_level: 1,
172 promotion_enabled: false, // L1 is already top tier
173 ttl_scale: 1.0,
174 }
175 }
176
177 /// Configure as L2 (warm tier)
178 pub fn as_l2() -> Self {
179 Self {
180 tier_level: 2,
181 promotion_enabled: true,
182 ttl_scale: 1.0,
183 }
184 }
185
186 /// Configure as L3 (cold tier) with longer TTL
187 pub fn as_l3() -> Self {
188 Self {
189 tier_level: 3,
190 promotion_enabled: true,
191 ttl_scale: 2.0, // Keep data 2x longer
192 }
193 }
194
195 /// Configure as L4 (archive tier) with much longer TTL
196 pub fn as_l4() -> Self {
197 Self {
198 tier_level: 4,
199 promotion_enabled: true,
200 ttl_scale: 8.0, // Keep data 8x longer
201 }
202 }
203
204 /// Set promotion enabled
205 pub fn with_promotion(mut self, enabled: bool) -> Self {
206 self.promotion_enabled = enabled;
207 self
208 }
209
210 /// Set TTL scale factor
211 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
212 self.ttl_scale = scale;
213 self
214 }
215
216 /// Set tier level
217 pub fn with_level(mut self, level: usize) -> Self {
218 self.tier_level = level;
219 self
220 }
221}
222
223/// Proxy wrapper to convert L2CacheBackend to CacheBackend
224/// (Rust doesn't support automatic trait upcasting for trait objects)
225struct ProxyCacheBackend {
226 backend: Arc<dyn L2CacheBackend>,
227}
228
229#[async_trait::async_trait]
230impl CacheBackend for ProxyCacheBackend {
231 async fn get(&self, key: &str) -> Option<serde_json::Value> {
232 self.backend.get(key).await
233 }
234
235 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
236 self.backend.set_with_ttl(key, value, ttl).await
237 }
238
239 async fn remove(&self, key: &str) -> Result<()> {
240 self.backend.remove(key).await
241 }
242
243 async fn health_check(&self) -> bool {
244 self.backend.health_check().await
245 }
246
247 fn name(&self) -> &str {
248 self.backend.name()
249 }
250}
251
252/// Cache Manager - Unified operations across multiple cache tiers
253///
254/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
255/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
256/// legacy L1+L2 behavior for backward compatibility.
257pub struct CacheManager {
258 /// Dynamic multi-tier cache architecture (v0.5.0+)
259 /// If Some, this takes precedence over l1_cache/l2_cache fields
260 tiers: Option<Vec<CacheTier>>,
261
262 // ===== Legacy fields (v0.1.0 - v0.4.x) =====
263 // Maintained for backward compatibility
264 /// L1 Cache (trait object for pluggable backends)
265 l1_cache: Arc<dyn CacheBackend>,
266 /// L2 Cache (trait object for pluggable backends)
267 l2_cache: Arc<dyn L2CacheBackend>,
268 /// L2 Cache concrete instance (for invalidation scan_keys)
269 l2_cache_concrete: Option<Arc<L2Cache>>,
270
271 /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
272 streaming_backend: Option<Arc<dyn StreamingBackend>>,
273 /// Statistics
274 total_requests: Arc<AtomicU64>,
275 l1_hits: Arc<AtomicU64>,
276 l2_hits: Arc<AtomicU64>,
277 misses: Arc<AtomicU64>,
278 promotions: Arc<AtomicUsize>,
279 /// In-flight requests to prevent Cache Stampede on L2/compute operations
280 in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
281 /// Invalidation publisher (for broadcasting invalidation messages)
282 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
283 /// Invalidation subscriber (for receiving invalidation messages)
284 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
285 /// Invalidation statistics
286 invalidation_stats: Arc<AtomicInvalidationStats>,
287}
288
289impl CacheManager {
290 /// Create new cache manager with trait objects (pluggable backends)
291 ///
292 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
293 ///
294 /// # Arguments
295 ///
296 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
297 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
298 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
299 ///
300 /// # Example
301 ///
302 /// ```rust,ignore
303 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
304 /// use std::sync::Arc;
305 ///
306 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
307 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
308 ///
309 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
310 /// ```
311 pub async fn new_with_backends(
312 l1_cache: Arc<dyn CacheBackend>,
313 l2_cache: Arc<dyn L2CacheBackend>,
314 streaming_backend: Option<Arc<dyn StreamingBackend>>,
315 ) -> Result<Self> {
316 println!(" 🎯 Initializing Cache Manager with custom backends...");
317 println!(" L1: {}", l1_cache.name());
318 println!(" L2: {}", l2_cache.name());
319 if streaming_backend.is_some() {
320 println!(" Streaming: enabled");
321 } else {
322 println!(" Streaming: disabled");
323 }
324
325 Ok(Self {
326 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
327 l1_cache,
328 l2_cache,
329 l2_cache_concrete: None,
330 streaming_backend,
331 total_requests: Arc::new(AtomicU64::new(0)),
332 l1_hits: Arc::new(AtomicU64::new(0)),
333 l2_hits: Arc::new(AtomicU64::new(0)),
334 misses: Arc::new(AtomicU64::new(0)),
335 promotions: Arc::new(AtomicUsize::new(0)),
336 in_flight_requests: Arc::new(DashMap::new()),
337 invalidation_publisher: None,
338 invalidation_subscriber: None,
339 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
340 })
341 }
342
343 /// Create new cache manager with default backends (backward compatible)
344 ///
345 /// This is the legacy constructor maintained for backward compatibility.
346 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
347 ///
348 /// # Arguments
349 ///
350 /// * `l1_cache` - Moka L1 cache instance
351 /// * `l2_cache` - Redis L2 cache instance
352 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
353 println!(" 🎯 Initializing Cache Manager...");
354
355 // Convert concrete types to trait objects
356 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
357 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
358
359 // Create RedisStreams backend for streaming functionality
360 let redis_url = std::env::var("REDIS_URL")
361 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
362 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
363 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
364
365 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
366 }
367
368 /// Create new cache manager with invalidation support
369 ///
370 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
371 ///
372 /// # Arguments
373 ///
374 /// * `l1_cache` - Moka L1 cache instance
375 /// * `l2_cache` - Redis L2 cache instance
376 /// * `redis_url` - Redis connection URL for Pub/Sub
377 /// * `config` - Invalidation configuration
378 ///
379 /// # Example
380 ///
381 /// ```rust,ignore
382 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
383 ///
384 /// let config = InvalidationConfig {
385 /// channel: "my_app:cache:invalidate".to_string(),
386 /// ..Default::default()
387 /// };
388 ///
389 /// let manager = CacheManager::new_with_invalidation(
390 /// l1, l2, "redis://localhost", config
391 /// ).await?;
392 /// ```
393 pub async fn new_with_invalidation(
394 l1_cache: Arc<L1Cache>,
395 l2_cache: Arc<L2Cache>,
396 redis_url: &str,
397 config: InvalidationConfig,
398 ) -> Result<Self> {
399 println!(" 🎯 Initializing Cache Manager with Invalidation...");
400 println!(" Pub/Sub channel: {}", config.channel);
401
402 // Convert concrete types to trait objects
403 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
404 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
405
406 // Create RedisStreams backend for streaming functionality
407 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
408 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
409
410 // Create publisher
411 let client = redis::Client::open(redis_url)?;
412 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
413 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
414
415 // Create subscriber
416 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
417 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
418
419 let manager = Self {
420 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
421 l1_cache: l1_backend,
422 l2_cache: l2_backend,
423 l2_cache_concrete: Some(l2_cache),
424 streaming_backend: Some(streaming_backend),
425 total_requests: Arc::new(AtomicU64::new(0)),
426 l1_hits: Arc::new(AtomicU64::new(0)),
427 l2_hits: Arc::new(AtomicU64::new(0)),
428 misses: Arc::new(AtomicU64::new(0)),
429 promotions: Arc::new(AtomicUsize::new(0)),
430 in_flight_requests: Arc::new(DashMap::new()),
431 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
432 invalidation_subscriber: Some(Arc::new(subscriber)),
433 invalidation_stats,
434 };
435
436 // Start subscriber with handler
437 manager.start_invalidation_subscriber();
438
439 println!(" ✅ Cache Manager initialized with invalidation support");
440
441 Ok(manager)
442 }
443
444 /// Create new cache manager with multi-tier architecture (v0.5.0+)
445 ///
446 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
447 /// Tiers are checked in order (lower tier_level = faster/hotter).
448 ///
449 /// # Arguments
450 ///
451 /// * `tiers` - Vector of configured cache tiers (must be sorted by tier_level ascending)
452 /// * `streaming_backend` - Optional streaming backend
453 ///
454 /// # Example
455 ///
456 /// ```rust,ignore
457 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
458 /// use std::sync::Arc;
459 ///
460 /// // L1 + L2 + L3 setup
461 /// let l1 = Arc::new(L1Cache::new().await?);
462 /// let l2 = Arc::new(L2Cache::new().await?);
463 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
464 ///
465 /// let tiers = vec![
466 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
467 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
468 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
469 /// ];
470 ///
471 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
472 /// ```
473 pub async fn new_with_tiers(
474 tiers: Vec<CacheTier>,
475 streaming_backend: Option<Arc<dyn StreamingBackend>>,
476 ) -> Result<Self> {
477 println!(" 🎯 Initializing Multi-Tier Cache Manager...");
478 println!(" Tier count: {}", tiers.len());
479 for tier in &tiers {
480 println!(
481 " L{}: {} (promotion={}, ttl_scale={})",
482 tier.tier_level,
483 tier.stats.backend_name,
484 tier.promotion_enabled,
485 tier.ttl_scale
486 );
487 }
488
489 // Validate tiers are sorted by level
490 for i in 1..tiers.len() {
491 if tiers[i].tier_level <= tiers[i - 1].tier_level {
492 anyhow::bail!(
493 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
494 tiers[i].tier_level,
495 tiers[i - 1].tier_level
496 );
497 }
498 }
499
500 // For backward compatibility with legacy code, we need dummy l1/l2 caches
501 // Use first tier as l1, second tier as l2 if available
502 let (l1_cache, l2_cache) = if tiers.len() >= 2 {
503 (tiers[0].backend.clone(), tiers[1].backend.clone())
504 } else if tiers.len() == 1 {
505 // Only one tier - use it for both
506 let tier = tiers[0].backend.clone();
507 (tier.clone(), tier)
508 } else {
509 anyhow::bail!("At least one cache tier is required");
510 };
511
512 // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
513 let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
514 backend: l1_cache.clone(),
515 });
516
517 Ok(Self {
518 tiers: Some(tiers),
519 l1_cache: l1_backend,
520 l2_cache,
521 l2_cache_concrete: None,
522 streaming_backend,
523 total_requests: Arc::new(AtomicU64::new(0)),
524 l1_hits: Arc::new(AtomicU64::new(0)),
525 l2_hits: Arc::new(AtomicU64::new(0)),
526 misses: Arc::new(AtomicU64::new(0)),
527 promotions: Arc::new(AtomicUsize::new(0)),
528 in_flight_requests: Arc::new(DashMap::new()),
529 invalidation_publisher: None,
530 invalidation_subscriber: None,
531 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
532 })
533 }
534
535 /// Start the invalidation subscriber background task
536 fn start_invalidation_subscriber(&self) {
537 if let Some(subscriber) = &self.invalidation_subscriber {
538 let l1_cache = Arc::clone(&self.l1_cache);
539 let l2_cache_concrete = self.l2_cache_concrete.clone();
540
541 subscriber.start(move |msg| {
542 let l1 = Arc::clone(&l1_cache);
543 let _l2 = l2_cache_concrete.clone();
544
545 async move {
546 match msg {
547 InvalidationMessage::Remove { key } => {
548 // Remove from L1
549 l1.remove(&key).await?;
550 println!("🗑️ [Invalidation] Removed '{}' from L1", key);
551 }
552 InvalidationMessage::Update { key, value, ttl_secs } => {
553 // Update L1 with new value
554 let ttl = ttl_secs
555 .map(Duration::from_secs)
556 .unwrap_or_else(|| Duration::from_secs(300));
557 l1.set_with_ttl(&key, value, ttl).await?;
558 println!("🔄 [Invalidation] Updated '{}' in L1", key);
559 }
560 InvalidationMessage::RemovePattern { pattern } => {
561 // For pattern-based invalidation, we can't easily iterate L1 cache
562 // So we just log it. The pattern invalidation is mainly for L2.
563 // L1 entries will naturally expire via TTL.
564 println!("🔍 [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
565 }
566 InvalidationMessage::RemoveBulk { keys } => {
567 // Remove multiple keys from L1
568 for key in keys {
569 if let Err(e) = l1.remove(&key).await {
570 eprintln!("⚠️ Failed to remove '{}' from L1: {}", key, e);
571 }
572 }
573 println!("🗑️ [Invalidation] Bulk removed keys from L1");
574 }
575 }
576 Ok(())
577 }
578 });
579
580 println!("📡 Invalidation subscriber started");
581 }
582 }
583
584 /// Get value from cache using multi-tier architecture (v0.5.0+)
585 ///
586 /// This method iterates through all configured tiers and automatically promotes
587 /// to upper tiers on cache hit.
588 async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
589 let tiers = self.tiers.as_ref().unwrap(); // Safe: only called when tiers is Some
590
591 // Try each tier sequentially (sorted by tier_level)
592 for (tier_index, tier) in tiers.iter().enumerate() {
593 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
594 // Cache hit!
595 tier.record_hit();
596
597 // Promote to all upper tiers (if promotion enabled)
598 if tier.promotion_enabled && tier_index > 0 {
599 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
600
601 // Promote to all tiers above this one
602 for upper_tier in tiers[..tier_index].iter().rev() {
603 if let Err(e) = upper_tier.set_with_ttl(key, value.clone(), promotion_ttl).await {
604 eprintln!(
605 "⚠️ Failed to promote '{}' from L{} to L{}: {}",
606 key, tier.tier_level, upper_tier.tier_level, e
607 );
608 } else {
609 self.promotions.fetch_add(1, Ordering::Relaxed);
610 println!(
611 "⬆️ Promoted '{}' from L{} to L{} (TTL: {:?})",
612 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
613 );
614 }
615 }
616 }
617
618 return Ok(Some(value));
619 }
620 }
621
622 // Cache miss across all tiers
623 Ok(None)
624 }
625
626 /// Get value from cache (L1 first, then L2 fallback with promotion)
627 ///
628 /// This method now includes built-in Cache Stampede protection when cache misses occur.
629 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
630 /// unnecessary duplicate work on external data sources.
631 ///
632 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
633 ///
634 /// # Arguments
635 /// * `key` - Cache key to retrieve
636 ///
637 /// # Returns
638 /// * `Ok(Some(value))` - Cache hit, value found in any tier
639 /// * `Ok(None)` - Cache miss, value not found in any cache
640 /// * `Err(error)` - Cache operation failed
641 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
642 self.total_requests.fetch_add(1, Ordering::Relaxed);
643
644 // NEW: Multi-tier mode (v0.5.0+)
645 if self.tiers.is_some() {
646 // Fast path for L1 (first tier) - no locking needed
647 if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
648 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
649 tier1.record_hit();
650 // Update legacy stats for backward compatibility
651 self.l1_hits.fetch_add(1, Ordering::Relaxed);
652 return Ok(Some(value));
653 }
654 }
655
656 // L1 miss - use stampede protection for lower tiers
657 let key_owned = key.to_string();
658 let lock_guard = self.in_flight_requests
659 .entry(key_owned.clone())
660 .or_insert_with(|| Arc::new(Mutex::new(())))
661 .clone();
662
663 let _guard = lock_guard.lock().await;
664 let cleanup_guard = CleanupGuard {
665 map: &self.in_flight_requests,
666 key: key_owned.clone(),
667 };
668
669 // Double-check L1 after acquiring lock
670 if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
671 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
672 tier1.record_hit();
673 self.l1_hits.fetch_add(1, Ordering::Relaxed);
674 return Ok(Some(value));
675 }
676 }
677
678 // Check remaining tiers with promotion
679 let result = self.get_multi_tier(key).await?;
680
681 if result.is_some() {
682 // Hit in L2+ tier - update legacy stats
683 if self.tiers.as_ref().unwrap().len() >= 2 {
684 self.l2_hits.fetch_add(1, Ordering::Relaxed);
685 }
686 } else {
687 self.misses.fetch_add(1, Ordering::Relaxed);
688 }
689
690 drop(cleanup_guard);
691 return Ok(result);
692 }
693
694 // LEGACY: 2-tier mode (L1 + L2)
695 // Fast path: Try L1 first (no locking needed)
696 if let Some(value) = self.l1_cache.get(key).await {
697 self.l1_hits.fetch_add(1, Ordering::Relaxed);
698 return Ok(Some(value));
699 }
700
701 // L1 miss - implement Cache Stampede protection for L2 lookup
702 let key_owned = key.to_string();
703 let lock_guard = self.in_flight_requests
704 .entry(key_owned.clone())
705 .or_insert_with(|| Arc::new(Mutex::new(())))
706 .clone();
707
708 let _guard = lock_guard.lock().await;
709
710 // RAII cleanup guard - ensures entry is removed even on early return or panic
711 let cleanup_guard = CleanupGuard {
712 map: &self.in_flight_requests,
713 key: key_owned.clone(),
714 };
715
716 // Double-check L1 cache after acquiring lock
717 // (Another concurrent request might have populated it while we were waiting)
718 if let Some(value) = self.l1_cache.get(key).await {
719 self.l1_hits.fetch_add(1, Ordering::Relaxed);
720 // cleanup_guard will auto-remove entry on drop
721 return Ok(Some(value));
722 }
723
724 // Check L2 cache with TTL information
725 if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
726 self.l2_hits.fetch_add(1, Ordering::Relaxed);
727
728 // Promote to L1 with same TTL as Redis (or default if no TTL)
729 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
730
731 if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
732 // L1 promotion failed, but we still have the data
733 eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
734 } else {
735 self.promotions.fetch_add(1, Ordering::Relaxed);
736 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
737 }
738
739 // cleanup_guard will auto-remove entry on drop
740 return Ok(Some(value));
741 }
742
743 // Both L1 and L2 miss
744 self.misses.fetch_add(1, Ordering::Relaxed);
745
746 // cleanup_guard will auto-remove entry on drop
747 drop(cleanup_guard);
748
749 Ok(None)
750 }
751
752 /// Get value from cache with fallback computation (enhanced backward compatibility)
753 ///
754 /// This is a convenience method that combines `get()` with optional computation.
755 /// If the value is not found in cache, it will execute the compute function
756 /// and cache the result automatically.
757 ///
758 /// # Arguments
759 /// * `key` - Cache key
760 /// * `compute_fn` - Optional function to compute value if not in cache
761 /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
762 ///
763 /// # Returns
764 /// * `Ok(Some(value))` - Value found in cache or computed successfully
765 /// * `Ok(None)` - Value not in cache and no compute function provided
766 /// * `Err(error)` - Cache operation or computation failed
767 ///
768 /// # Example
769 /// ```ignore
770 /// // Simple cache get (existing behavior)
771 /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
772 ///
773 /// // Get with computation fallback (new enhanced behavior)
774 /// let api_data = cache_manager.get_with_fallback(
775 /// "api_response",
776 /// Some(|| async { fetch_data_from_api().await }),
777 /// Some(CacheStrategy::RealTime)
778 /// ).await?;
779 /// ```
780
781 /// Set value with specific cache strategy (all tiers)
782 ///
783 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
784 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
785 pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
786 let ttl = strategy.to_duration();
787
788 // NEW: Multi-tier mode (v0.5.0+)
789 if let Some(tiers) = &self.tiers {
790 // Store in ALL tiers with their respective TTL scaling
791 let mut success_count = 0;
792 let mut last_error = None;
793
794 for tier in tiers {
795 match tier.set_with_ttl(key, value.clone(), ttl).await {
796 Ok(_) => {
797 success_count += 1;
798 }
799 Err(e) => {
800 eprintln!("⚠️ L{} cache set failed for key '{}': {}", tier.tier_level, key, e);
801 last_error = Some(e);
802 }
803 }
804 }
805
806 if success_count > 0 {
807 println!("💾 [Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
808 key, success_count, tiers.len(), ttl);
809 return Ok(());
810 } else {
811 return Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{}'", key)));
812 }
813 }
814
815 // LEGACY: 2-tier mode (L1 + L2)
816 // Store in both L1 and L2
817 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
818 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
819
820 // Return success if at least one cache succeeded
821 match (l1_result, l2_result) {
822 (Ok(_), Ok(_)) => {
823 // Both succeeded
824 println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
825 Ok(())
826 }
827 (Ok(_), Err(_)) => {
828 // L1 succeeded, L2 failed
829 eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
830 println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
831 Ok(())
832 }
833 (Err(_), Ok(_)) => {
834 // L1 failed, L2 succeeded
835 eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
836 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
837 Ok(())
838 }
839 (Err(e1), Err(_e2)) => {
840 // Both failed
841 Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
842 }
843 }
844 }
845
846 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
847 ///
848 /// This method provides comprehensive Cache Stampede protection:
849 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
850 /// 2. Check L2 cache with mutex-based coalescing
851 /// 3. Compute fresh data with protection against concurrent computations
852 ///
853 /// # Arguments
854 /// * `key` - Cache key
855 /// * `strategy` - Cache strategy for TTL and storage behavior
856 /// * `compute_fn` - Async function to compute the value if not in any cache
857 ///
858 /// # Example
859 /// ```ignore
860 /// let api_data = cache_manager.get_or_compute_with(
861 /// "api_response",
862 /// CacheStrategy::RealTime,
863 /// || async {
864 /// fetch_data_from_api().await
865 /// }
866 /// ).await?;
867 /// ```
868 #[allow(dead_code)]
869 pub async fn get_or_compute_with<F, Fut>(
870 &self,
871 key: &str,
872 strategy: CacheStrategy,
873 compute_fn: F,
874 ) -> Result<serde_json::Value>
875 where
876 F: FnOnce() -> Fut + Send,
877 Fut: Future<Output = Result<serde_json::Value>> + Send,
878 {
879 self.total_requests.fetch_add(1, Ordering::Relaxed);
880
881 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
882 if let Some(value) = self.l1_cache.get(key).await {
883 self.l1_hits.fetch_add(1, Ordering::Relaxed);
884 return Ok(value);
885 }
886
887 // 2. L1 miss - try L2 with Cache Stampede protection
888 let key_owned = key.to_string();
889 let lock_guard = self.in_flight_requests
890 .entry(key_owned.clone())
891 .or_insert_with(|| Arc::new(Mutex::new(())))
892 .clone();
893
894 let _guard = lock_guard.lock().await;
895
896 // RAII cleanup guard - ensures entry is removed even on early return or panic
897 let _cleanup_guard = CleanupGuard {
898 map: &self.in_flight_requests,
899 key: key_owned,
900 };
901
902 // 3. Double-check L1 cache after acquiring lock
903 // (Another request might have populated it while we were waiting)
904 if let Some(value) = self.l1_cache.get(key).await {
905 self.l1_hits.fetch_add(1, Ordering::Relaxed);
906 // _cleanup_guard will auto-remove entry on drop
907 return Ok(value);
908 }
909
910 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
911 if let Some(tiers) = &self.tiers {
912 // Check tiers starting from index 1 (skip L1 since already checked)
913 for tier in tiers.iter().skip(1) {
914 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
915 tier.record_hit();
916
917 // Promote to L1 (first tier)
918 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
919 if let Err(e) = tiers[0].set_with_ttl(key, value.clone(), promotion_ttl).await {
920 eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}", key, tier.tier_level, e);
921 } else {
922 self.promotions.fetch_add(1, Ordering::Relaxed);
923 println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
924 key, tier.tier_level, promotion_ttl);
925 }
926
927 // _cleanup_guard will auto-remove entry on drop
928 return Ok(value);
929 }
930 }
931 } else {
932 // LEGACY: Check L2 cache with TTL
933 if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
934 self.l2_hits.fetch_add(1, Ordering::Relaxed);
935
936 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
937 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
938
939 if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
940 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
941 } else {
942 self.promotions.fetch_add(1, Ordering::Relaxed);
943 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
944 }
945
946 // _cleanup_guard will auto-remove entry on drop
947 return Ok(value);
948 }
949 }
950
951 // 5. Cache miss across all tiers - compute fresh data
952 println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
953 let fresh_data = compute_fn().await?;
954
955 // 6. Store in both caches
956 if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
957 eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
958 }
959
960 // 7. _cleanup_guard will auto-remove entry on drop
961
962 Ok(fresh_data)
963 }
964
965 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
966 ///
967 /// This method provides the same functionality as `get_or_compute_with()` but with
968 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
969 /// API calls, or any computation that returns structured data.
970 ///
971 /// # Type Safety
972 ///
973 /// - Returns your actual type `T` instead of `serde_json::Value`
974 /// - Compiler enforces Serialize + DeserializeOwned bounds
975 /// - No manual JSON conversion needed
976 ///
977 /// # Cache Flow
978 ///
979 /// 1. Check L1 cache → deserialize if found
980 /// 2. Check L2 cache → deserialize + promote to L1 if found
981 /// 3. Execute compute_fn → serialize → store in L1+L2
982 /// 4. Full stampede protection (only ONE request computes)
983 ///
984 /// # Arguments
985 ///
986 /// * `key` - Cache key
987 /// * `strategy` - Cache strategy for TTL
988 /// * `compute_fn` - Async function returning `Result<T>`
989 ///
990 /// # Example - Database Query
991 ///
992 /// ```no_run
993 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
994 /// # use std::sync::Arc;
995 /// # use serde::{Serialize, Deserialize};
996 /// # async fn example() -> anyhow::Result<()> {
997 /// # let l1 = Arc::new(L1Cache::new().await?);
998 /// # let l2 = Arc::new(L2Cache::new().await?);
999 /// # let cache_manager = CacheManager::new(l1, l2);
1000 ///
1001 /// #[derive(Serialize, Deserialize)]
1002 /// struct User {
1003 /// id: i64,
1004 /// name: String,
1005 /// }
1006 ///
1007 /// // Type-safe database caching (example - requires sqlx)
1008 /// // let user: User = cache_manager.get_or_compute_typed(
1009 /// // "user:123",
1010 /// // CacheStrategy::MediumTerm,
1011 /// // || async {
1012 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
1013 /// // .bind(123)
1014 /// // .fetch_one(&pool)
1015 /// // .await
1016 /// // }
1017 /// // ).await?;
1018 /// # Ok(())
1019 /// # }
1020 /// ```
1021 ///
1022 /// # Example - API Call
1023 ///
1024 /// ```no_run
1025 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1026 /// # use std::sync::Arc;
1027 /// # use serde::{Serialize, Deserialize};
1028 /// # async fn example() -> anyhow::Result<()> {
1029 /// # let l1 = Arc::new(L1Cache::new().await?);
1030 /// # let l2 = Arc::new(L2Cache::new().await?);
1031 /// # let cache_manager = CacheManager::new(l1, l2);
1032 /// #[derive(Serialize, Deserialize)]
1033 /// struct ApiResponse {
1034 /// data: String,
1035 /// timestamp: i64,
1036 /// }
1037 ///
1038 /// // API call caching (example - requires reqwest)
1039 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1040 /// // "api:endpoint",
1041 /// // CacheStrategy::RealTime,
1042 /// // || async {
1043 /// // reqwest::get("https://api.example.com/data")
1044 /// // .await?
1045 /// // .json::<ApiResponse>()
1046 /// // .await
1047 /// // }
1048 /// // ).await?;
1049 /// # Ok(())
1050 /// # }
1051 /// ```
1052 ///
1053 /// # Performance
1054 ///
1055 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1056 /// - L2 hit: 2-5ms + deserialization + L1 promotion
1057 /// - Compute: Your function time + serialization + L1+L2 storage
1058 /// - Stampede protection: 99.6% latency reduction under high concurrency
1059 ///
1060 /// # Errors
1061 ///
1062 /// Returns error if:
1063 /// - Compute function fails
1064 /// - Serialization fails (invalid type for JSON)
1065 /// - Deserialization fails (cache data doesn't match type T)
1066 /// - Cache operations fail (Redis connection issues)
1067 pub async fn get_or_compute_typed<T, F, Fut>(
1068 &self,
1069 key: &str,
1070 strategy: CacheStrategy,
1071 compute_fn: F,
1072 ) -> Result<T>
1073 where
1074 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1075 F: FnOnce() -> Fut + Send,
1076 Fut: Future<Output = Result<T>> + Send,
1077 {
1078 self.total_requests.fetch_add(1, Ordering::Relaxed);
1079
1080 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1081 if let Some(cached_json) = self.l1_cache.get(key).await {
1082 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1083
1084 // Attempt to deserialize from JSON to type T
1085 match serde_json::from_value::<T>(cached_json) {
1086 Ok(typed_value) => {
1087 println!("✅ [L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
1088 return Ok(typed_value);
1089 }
1090 Err(e) => {
1091 // Deserialization failed - cache data may be stale or corrupt
1092 eprintln!("⚠️ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1093 // Fall through to recompute
1094 }
1095 }
1096 }
1097
1098 // 2. L1 miss - try L2 with Cache Stampede protection
1099 let key_owned = key.to_string();
1100 let lock_guard = self.in_flight_requests
1101 .entry(key_owned.clone())
1102 .or_insert_with(|| Arc::new(Mutex::new(())))
1103 .clone();
1104
1105 let _guard = lock_guard.lock().await;
1106
1107 // RAII cleanup guard - ensures entry is removed even on early return or panic
1108 let _cleanup_guard = CleanupGuard {
1109 map: &self.in_flight_requests,
1110 key: key_owned,
1111 };
1112
1113 // 3. Double-check L1 cache after acquiring lock
1114 // (Another request might have populated it while we were waiting)
1115 if let Some(cached_json) = self.l1_cache.get(key).await {
1116 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1117 if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1118 println!("✅ [L1 HIT] Deserialized '{}' after lock acquisition", key);
1119 return Ok(typed_value);
1120 }
1121 }
1122
1123 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1124 if let Some(tiers) = &self.tiers {
1125 // Check tiers starting from index 1 (skip L1 since already checked)
1126 for tier in tiers.iter().skip(1) {
1127 if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1128 tier.record_hit();
1129
1130 // Attempt to deserialize
1131 match serde_json::from_value::<T>(cached_json.clone()) {
1132 Ok(typed_value) => {
1133 println!("✅ [L{} HIT] Deserialized '{}' to type {}",
1134 tier.tier_level, key, std::any::type_name::<T>());
1135
1136 // Promote to L1 (first tier)
1137 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1138 if let Err(e) = tiers[0].set_with_ttl(key, cached_json, promotion_ttl).await {
1139 eprintln!("⚠️ Failed to promote '{}' from L{} to L1: {}",
1140 key, tier.tier_level, e);
1141 } else {
1142 self.promotions.fetch_add(1, Ordering::Relaxed);
1143 println!("⬆️ Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1144 key, tier.tier_level, promotion_ttl);
1145 }
1146
1147 return Ok(typed_value);
1148 }
1149 Err(e) => {
1150 eprintln!("⚠️ L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1151 tier.tier_level, key, e);
1152 // Continue to next tier
1153 }
1154 }
1155 }
1156 }
1157 } else {
1158 // LEGACY: Check L2 cache with TTL
1159 if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1160 self.l2_hits.fetch_add(1, Ordering::Relaxed);
1161
1162 // Attempt to deserialize
1163 match serde_json::from_value::<T>(cached_json.clone()) {
1164 Ok(typed_value) => {
1165 println!("✅ [L2 HIT] Deserialized '{}' from Redis", key);
1166
1167 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1168 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1169
1170 if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
1171 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
1172 } else {
1173 self.promotions.fetch_add(1, Ordering::Relaxed);
1174 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
1175 }
1176
1177 return Ok(typed_value);
1178 }
1179 Err(e) => {
1180 eprintln!("⚠️ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1181 // Fall through to recompute
1182 }
1183 }
1184 }
1185 }
1186
1187 // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1188 println!("💻 Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
1189 let typed_value = compute_fn().await?;
1190
1191 // 6. Serialize to JSON for storage
1192 let json_value = serde_json::to_value(&typed_value)
1193 .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
1194
1195 // 7. Store in both L1 and L2 caches
1196 if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1197 eprintln!("⚠️ Failed to cache computed typed data for key '{}': {}", key, e);
1198 } else {
1199 println!("💾 Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
1200 }
1201
1202 // 8. _cleanup_guard will auto-remove entry on drop
1203
1204 Ok(typed_value)
1205 }
1206
1207 /// Get comprehensive cache statistics
1208 ///
1209 /// In multi-tier mode, aggregates statistics from all tiers.
1210 /// In legacy mode, returns L1 and L2 stats.
1211 #[allow(dead_code)]
1212 pub fn get_stats(&self) -> CacheManagerStats {
1213 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1214 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1215 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1216 let misses = self.misses.load(Ordering::Relaxed);
1217
1218 CacheManagerStats {
1219 total_requests: total_reqs,
1220 l1_hits,
1221 l2_hits,
1222 total_hits: l1_hits + l2_hits,
1223 misses,
1224 hit_rate: if total_reqs > 0 {
1225 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1226 } else { 0.0 },
1227 l1_hit_rate: if total_reqs > 0 {
1228 (l1_hits as f64 / total_reqs as f64) * 100.0
1229 } else { 0.0 },
1230 promotions: self.promotions.load(Ordering::Relaxed),
1231 in_flight_requests: self.in_flight_requests.len(),
1232 }
1233 }
1234
1235 /// Get per-tier statistics (v0.5.0+)
1236 ///
1237 /// Returns statistics for each tier if multi-tier mode is enabled.
1238 /// Returns None if using legacy 2-tier mode.
1239 ///
1240 /// # Example
1241 /// ```rust,ignore
1242 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1243 /// for stats in tier_stats {
1244 /// println!("L{}: {} hits ({})",
1245 /// stats.tier_level,
1246 /// stats.hit_count(),
1247 /// stats.backend_name);
1248 /// }
1249 /// }
1250 /// ```
1251 pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1252 self.tiers.as_ref().map(|tiers| {
1253 tiers.iter().map(|tier| tier.stats.clone()).collect()
1254 })
1255 }
1256
1257 // ===== Redis Streams Methods =====
1258
1259 /// Publish data to Redis Stream
1260 ///
1261 /// # Arguments
1262 /// * `stream_key` - Name of the stream (e.g., "events_stream")
1263 /// * `fields` - Field-value pairs to publish
1264 /// * `maxlen` - Optional max length for stream trimming
1265 ///
1266 /// # Returns
1267 /// The entry ID generated by Redis
1268 ///
1269 /// # Errors
1270 /// Returns error if streaming backend is not configured
1271 pub async fn publish_to_stream(
1272 &self,
1273 stream_key: &str,
1274 fields: Vec<(String, String)>,
1275 maxlen: Option<usize>
1276 ) -> Result<String> {
1277 match &self.streaming_backend {
1278 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1279 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1280 }
1281 }
1282
1283 /// Read latest entries from Redis Stream
1284 ///
1285 /// # Arguments
1286 /// * `stream_key` - Name of the stream
1287 /// * `count` - Number of latest entries to retrieve
1288 ///
1289 /// # Returns
1290 /// Vector of (entry_id, fields) tuples (newest first)
1291 ///
1292 /// # Errors
1293 /// Returns error if streaming backend is not configured
1294 pub async fn read_stream_latest(
1295 &self,
1296 stream_key: &str,
1297 count: usize
1298 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1299 match &self.streaming_backend {
1300 Some(backend) => backend.stream_read_latest(stream_key, count).await,
1301 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1302 }
1303 }
1304
1305 /// Read from Redis Stream with optional blocking
1306 ///
1307 /// # Arguments
1308 /// * `stream_key` - Name of the stream
1309 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1310 /// * `count` - Max entries to retrieve
1311 /// * `block_ms` - Optional blocking timeout in ms
1312 ///
1313 /// # Returns
1314 /// Vector of (entry_id, fields) tuples
1315 ///
1316 /// # Errors
1317 /// Returns error if streaming backend is not configured
1318 pub async fn read_stream(
1319 &self,
1320 stream_key: &str,
1321 last_id: &str,
1322 count: usize,
1323 block_ms: Option<usize>
1324 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1325 match &self.streaming_backend {
1326 Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
1327 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1328 }
1329 }
1330
1331 // ===== Cache Invalidation Methods =====
1332
1333 /// Invalidate a cache key across all instances
1334 ///
1335 /// This removes the key from all cache tiers and broadcasts
1336 /// the invalidation to all other cache instances via Redis Pub/Sub.
1337 ///
1338 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1339 ///
1340 /// # Arguments
1341 /// * `key` - Cache key to invalidate
1342 ///
1343 /// # Example
1344 /// ```rust,ignore
1345 /// // Invalidate user cache after profile update
1346 /// cache_manager.invalidate("user:123").await?;
1347 /// ```
1348 pub async fn invalidate(&self, key: &str) -> Result<()> {
1349 // NEW: Multi-tier mode (v0.5.0+)
1350 if let Some(tiers) = &self.tiers {
1351 // Remove from ALL tiers
1352 for tier in tiers {
1353 if let Err(e) = tier.remove(key).await {
1354 eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1355 }
1356 }
1357 } else {
1358 // LEGACY: 2-tier mode
1359 self.l1_cache.remove(key).await?;
1360 self.l2_cache.remove(key).await?;
1361 }
1362
1363 // Broadcast to other instances
1364 if let Some(publisher) = &self.invalidation_publisher {
1365 let mut pub_lock = publisher.lock().await;
1366 let msg = InvalidationMessage::remove(key);
1367 pub_lock.publish(&msg).await?;
1368 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1369 }
1370
1371 println!("🗑️ Invalidated '{}' across all instances", key);
1372 Ok(())
1373 }
1374
1375 /// Update cache value across all instances
1376 ///
1377 /// This updates the key in all cache tiers and broadcasts
1378 /// the update to all other cache instances, avoiding cache misses.
1379 ///
1380 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1381 ///
1382 /// # Arguments
1383 /// * `key` - Cache key to update
1384 /// * `value` - New value
1385 /// * `ttl` - Optional TTL (uses default if None)
1386 ///
1387 /// # Example
1388 /// ```rust,ignore
1389 /// // Update user cache with new data
1390 /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1391 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1392 /// ```
1393 pub async fn update_cache(
1394 &self,
1395 key: &str,
1396 value: serde_json::Value,
1397 ttl: Option<Duration>,
1398 ) -> Result<()> {
1399 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1400
1401 // NEW: Multi-tier mode (v0.5.0+)
1402 if let Some(tiers) = &self.tiers {
1403 // Update ALL tiers with their respective TTL scaling
1404 for tier in tiers {
1405 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1406 eprintln!("⚠️ Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1407 }
1408 }
1409 } else {
1410 // LEGACY: 2-tier mode
1411 self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1412 self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1413 }
1414
1415 // Broadcast update to other instances
1416 if let Some(publisher) = &self.invalidation_publisher {
1417 let mut pub_lock = publisher.lock().await;
1418 let msg = InvalidationMessage::update(key, value, Some(ttl));
1419 pub_lock.publish(&msg).await?;
1420 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1421 }
1422
1423 println!("🔄 Updated '{}' across all instances", key);
1424 Ok(())
1425 }
1426
1427 /// Invalidate all keys matching a pattern
1428 ///
1429 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1430 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1431 ///
1432 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1433 ///
1434 /// **Note**: Pattern scanning requires a concrete L2Cache instance with `scan_keys()`.
1435 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1436 ///
1437 /// # Arguments
1438 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1439 ///
1440 /// # Example
1441 /// ```rust,ignore
1442 /// // Invalidate all user caches
1443 /// cache_manager.invalidate_pattern("user:*").await?;
1444 ///
1445 /// // Invalidate specific user's related caches
1446 /// cache_manager.invalidate_pattern("user:123:*").await?;
1447 /// ```
1448 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1449 // Scan L2 for matching keys
1450 // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1451 let keys = if let Some(l2) = &self.l2_cache_concrete {
1452 l2.scan_keys(pattern).await?
1453 } else {
1454 return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
1455 };
1456
1457 if keys.is_empty() {
1458 println!("🔍 No keys found matching pattern '{}'", pattern);
1459 return Ok(());
1460 }
1461
1462 // NEW: Multi-tier mode (v0.5.0+)
1463 if let Some(tiers) = &self.tiers {
1464 // Remove from ALL tiers
1465 for key in &keys {
1466 for tier in tiers {
1467 if let Err(e) = tier.remove(key).await {
1468 eprintln!("⚠️ Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1469 }
1470 }
1471 }
1472 } else {
1473 // LEGACY: 2-tier mode - Remove from L2 in bulk
1474 if let Some(l2) = &self.l2_cache_concrete {
1475 l2.remove_bulk(&keys).await?;
1476 }
1477 }
1478
1479 // Broadcast pattern invalidation
1480 if let Some(publisher) = &self.invalidation_publisher {
1481 let mut pub_lock = publisher.lock().await;
1482 let msg = InvalidationMessage::remove_bulk(keys.clone());
1483 pub_lock.publish(&msg).await?;
1484 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1485 }
1486
1487 println!("🔍 Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
1488 Ok(())
1489 }
1490
1491 /// Set value with automatic broadcast to all instances
1492 ///
1493 /// This is a write-through operation that updates the cache and
1494 /// broadcasts the update to all other instances automatically.
1495 ///
1496 /// # Arguments
1497 /// * `key` - Cache key
1498 /// * `value` - Value to cache
1499 /// * `strategy` - Cache strategy (determines TTL)
1500 ///
1501 /// # Example
1502 /// ```rust,ignore
1503 /// // Update and broadcast in one call
1504 /// let data = serde_json::json!({"status": "active"});
1505 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1506 /// ```
1507 pub async fn set_with_broadcast(
1508 &self,
1509 key: &str,
1510 value: serde_json::Value,
1511 strategy: CacheStrategy,
1512 ) -> Result<()> {
1513 let ttl = strategy.to_duration();
1514
1515 // Set in local caches
1516 self.set_with_strategy(key, value.clone(), strategy).await?;
1517
1518 // Broadcast update if invalidation is enabled
1519 if let Some(publisher) = &self.invalidation_publisher {
1520 let mut pub_lock = publisher.lock().await;
1521 let msg = InvalidationMessage::update(key, value, Some(ttl));
1522 pub_lock.publish(&msg).await?;
1523 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1524 }
1525
1526 Ok(())
1527 }
1528
1529 /// Get invalidation statistics
1530 ///
1531 /// Returns statistics about invalidation operations if invalidation is enabled.
1532 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1533 if self.invalidation_subscriber.is_some() {
1534 Some(self.invalidation_stats.snapshot())
1535 } else {
1536 None
1537 }
1538 }
1539}
1540
1541/// Cache Manager statistics
1542#[allow(dead_code)]
1543#[derive(Debug, Clone)]
1544pub struct CacheManagerStats {
1545 pub total_requests: u64,
1546 pub l1_hits: u64,
1547 pub l2_hits: u64,
1548 pub total_hits: u64,
1549 pub misses: u64,
1550 pub hit_rate: f64,
1551 pub l1_hit_rate: f64,
1552 pub promotions: usize,
1553 pub in_flight_requests: usize,
1554}