multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use tracing::{info, warn, error, debug};
15
16use crate::backends::{L1Cache, L2Cache};
17use crate::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
18use super::invalidation::{
19 InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
20 InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
21};
22
23/// Type alias for the in-flight requests map
24type InFlightMap = DashMap<String, Arc<Mutex<()>>>;
25
26/// RAII cleanup guard for in-flight request tracking
27/// Ensures that entries are removed from DashMap even on early return or panic
28struct CleanupGuard<'a> {
29 map: &'a InFlightMap,
30 key: String,
31}
32
33impl<'a> Drop for CleanupGuard<'a> {
34 fn drop(&mut self) {
35 self.map.remove(&self.key);
36 }
37}
38
39/// Cache strategies for different data types
40#[derive(Debug, Clone)]
41#[allow(dead_code)]
42pub enum CacheStrategy {
43 /// Real-time data - 10 seconds TTL
44 RealTime,
45 /// Short-term data - 5 minutes TTL
46 ShortTerm,
47 /// Medium-term data - 1 hour TTL
48 MediumTerm,
49 /// Long-term data - 3 hours TTL
50 LongTerm,
51 /// Custom TTL
52 Custom(Duration),
53 /// Default strategy (5 minutes)
54 Default,
55}
56
57impl CacheStrategy {
58 /// Convert strategy to duration
59 pub fn to_duration(&self) -> Duration {
60 match self {
61 Self::RealTime => Duration::from_secs(10),
62 Self::ShortTerm => Duration::from_secs(300), // 5 minutes
63 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
64 Self::LongTerm => Duration::from_secs(10800), // 3 hours
65 Self::Custom(duration) => *duration,
66 Self::Default => Duration::from_secs(300),
67 }
68 }
69}
70
71/// Statistics for a single cache tier
72#[derive(Debug)]
73pub struct TierStats {
74 /// Tier level (1 = L1, 2 = L2, 3 = L3, etc.)
75 pub tier_level: usize,
76 /// Number of cache hits at this tier
77 pub hits: AtomicU64,
78 /// Backend name for identification
79 pub backend_name: String,
80}
81
82impl Clone for TierStats {
83 fn clone(&self) -> Self {
84 Self {
85 tier_level: self.tier_level,
86 hits: AtomicU64::new(self.hits.load(Ordering::Relaxed)),
87 backend_name: self.backend_name.clone(),
88 }
89 }
90}
91
92impl TierStats {
93 fn new(tier_level: usize, backend_name: String) -> Self {
94 Self {
95 tier_level,
96 hits: AtomicU64::new(0),
97 backend_name,
98 }
99 }
100
101 /// Get current hit count
102 pub fn hit_count(&self) -> u64 {
103 self.hits.load(Ordering::Relaxed)
104 }
105}
106
107/// A single cache tier in the multi-tier architecture
108pub struct CacheTier {
109 /// Cache backend for this tier
110 backend: Arc<dyn L2CacheBackend>,
111 /// Tier level (1 = hottest/fastest, higher = colder/slower)
112 tier_level: usize,
113 /// Enable automatic promotion to upper tiers on cache hit
114 promotion_enabled: bool,
115 /// TTL scale factor (multiplier for TTL when storing/promoting)
116 ttl_scale: f64,
117 /// Statistics for this tier
118 stats: TierStats,
119}
120
121impl CacheTier {
122 /// Create a new cache tier
123 pub fn new(
124 backend: Arc<dyn L2CacheBackend>,
125 tier_level: usize,
126 promotion_enabled: bool,
127 ttl_scale: f64,
128 ) -> Self {
129 let backend_name = backend.name().to_string();
130 Self {
131 backend,
132 tier_level,
133 promotion_enabled,
134 ttl_scale,
135 stats: TierStats::new(tier_level, backend_name),
136 }
137 }
138
139 /// Get value with TTL from this tier
140 async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
141 self.backend.get_with_ttl(key).await
142 }
143
144 /// Set value with TTL in this tier
145 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
146 let scaled_ttl = Duration::from_secs_f64(ttl.as_secs_f64() * self.ttl_scale);
147 self.backend.set_with_ttl(key, value, scaled_ttl).await
148 }
149
150 /// Remove value from this tier
151 async fn remove(&self, key: &str) -> Result<()> {
152 self.backend.remove(key).await
153 }
154
155 /// Record a cache hit for this tier
156 fn record_hit(&self) {
157 self.stats.hits.fetch_add(1, Ordering::Relaxed);
158 }
159}
160
161/// Configuration for a cache tier (used in builder pattern)
162#[derive(Debug, Clone)]
163pub struct TierConfig {
164 /// Tier level (1, 2, 3, 4...)
165 pub tier_level: usize,
166 /// Enable promotion to upper tiers on hit
167 pub promotion_enabled: bool,
168 /// TTL scale factor (1.0 = same as base TTL)
169 pub ttl_scale: f64,
170}
171
172impl TierConfig {
173 /// Create new tier configuration
174 pub fn new(tier_level: usize) -> Self {
175 Self {
176 tier_level,
177 promotion_enabled: true,
178 ttl_scale: 1.0,
179 }
180 }
181
182 /// Configure as L1 (hot tier)
183 pub fn as_l1() -> Self {
184 Self {
185 tier_level: 1,
186 promotion_enabled: false, // L1 is already top tier
187 ttl_scale: 1.0,
188 }
189 }
190
191 /// Configure as L2 (warm tier)
192 pub fn as_l2() -> Self {
193 Self {
194 tier_level: 2,
195 promotion_enabled: true,
196 ttl_scale: 1.0,
197 }
198 }
199
200 /// Configure as L3 (cold tier) with longer TTL
201 pub fn as_l3() -> Self {
202 Self {
203 tier_level: 3,
204 promotion_enabled: true,
205 ttl_scale: 2.0, // Keep data 2x longer
206 }
207 }
208
209 /// Configure as L4 (archive tier) with much longer TTL
210 pub fn as_l4() -> Self {
211 Self {
212 tier_level: 4,
213 promotion_enabled: true,
214 ttl_scale: 8.0, // Keep data 8x longer
215 }
216 }
217
218 /// Set promotion enabled
219 pub fn with_promotion(mut self, enabled: bool) -> Self {
220 self.promotion_enabled = enabled;
221 self
222 }
223
224 /// Set TTL scale factor
225 pub fn with_ttl_scale(mut self, scale: f64) -> Self {
226 self.ttl_scale = scale;
227 self
228 }
229
230 /// Set tier level
231 pub fn with_level(mut self, level: usize) -> Self {
232 self.tier_level = level;
233 self
234 }
235}
236
237/// Proxy wrapper to convert L2CacheBackend to CacheBackend
238/// (Rust doesn't support automatic trait upcasting for trait objects)
239struct ProxyCacheBackend {
240 backend: Arc<dyn L2CacheBackend>,
241}
242
243#[async_trait::async_trait]
244impl CacheBackend for ProxyCacheBackend {
245 async fn get(&self, key: &str) -> Option<serde_json::Value> {
246 self.backend.get(key).await
247 }
248
249 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
250 self.backend.set_with_ttl(key, value, ttl).await
251 }
252
253 async fn remove(&self, key: &str) -> Result<()> {
254 self.backend.remove(key).await
255 }
256
257 async fn health_check(&self) -> bool {
258 self.backend.health_check().await
259 }
260
261 fn name(&self) -> &str {
262 self.backend.name()
263 }
264}
265
266/// Cache Manager - Unified operations across multiple cache tiers
267///
268/// Supports both legacy 2-tier (L1+L2) and new multi-tier (L1+L2+L3+L4+...) architectures.
269/// When `tiers` is Some, it uses the dynamic multi-tier system. Otherwise, falls back to
270/// legacy L1+L2 behavior for backward compatibility.
271pub struct CacheManager {
272 /// Dynamic multi-tier cache architecture (v0.5.0+)
273 /// If Some, this takes precedence over l1_cache/l2_cache fields
274 tiers: Option<Vec<CacheTier>>,
275
276 // ===== Legacy fields (v0.1.0 - v0.4.x) =====
277 // Maintained for backward compatibility
278 /// L1 Cache (trait object for pluggable backends)
279 l1_cache: Arc<dyn CacheBackend>,
280 /// L2 Cache (trait object for pluggable backends)
281 l2_cache: Arc<dyn L2CacheBackend>,
282 /// L2 Cache concrete instance (for invalidation scan_keys)
283 l2_cache_concrete: Option<Arc<L2Cache>>,
284
285 /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
286 streaming_backend: Option<Arc<dyn StreamingBackend>>,
287 /// Statistics (AtomicU64 is already thread-safe, no Arc needed)
288 total_requests: AtomicU64,
289 l1_hits: AtomicU64,
290 l2_hits: AtomicU64,
291 misses: AtomicU64,
292 promotions: AtomicUsize,
293 /// In-flight requests to prevent Cache Stampede on L2/compute operations
294 in_flight_requests: Arc<InFlightMap>,
295 /// Invalidation publisher (for broadcasting invalidation messages)
296 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
297 /// Invalidation subscriber (for receiving invalidation messages)
298 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
299 /// Invalidation statistics
300 invalidation_stats: Arc<AtomicInvalidationStats>,
301}
302
303impl CacheManager {
304 /// Create new cache manager with trait objects (pluggable backends)
305 ///
306 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
307 ///
308 /// # Arguments
309 ///
310 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
311 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
312 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
313 ///
314 /// # Example
315 ///
316 /// ```rust,ignore
317 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
318 /// use std::sync::Arc;
319 ///
320 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
321 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
322 ///
323 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
324 /// ```
325 pub async fn new_with_backends(
326 l1_cache: Arc<dyn CacheBackend>,
327 l2_cache: Arc<dyn L2CacheBackend>,
328 streaming_backend: Option<Arc<dyn StreamingBackend>>,
329 ) -> Result<Self> {
330 debug!("Initializing Cache Manager with custom backends...");
331 debug!(" L1: {}", l1_cache.name());
332 debug!(" L2: {}", l2_cache.name());
333 if streaming_backend.is_some() {
334 debug!(" Streaming: enabled");
335 } else {
336 debug!(" Streaming: disabled");
337 }
338
339 Ok(Self {
340 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
341 l1_cache,
342 l2_cache,
343 l2_cache_concrete: None,
344 streaming_backend,
345 total_requests: AtomicU64::new(0),
346 l1_hits: AtomicU64::new(0),
347 l2_hits: AtomicU64::new(0),
348 misses: AtomicU64::new(0),
349 promotions: AtomicUsize::new(0),
350 in_flight_requests: Arc::new(DashMap::new()),
351 invalidation_publisher: None,
352 invalidation_subscriber: None,
353 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
354 })
355 }
356
357 /// Create new cache manager with default backends (backward compatible)
358 ///
359 /// This is the legacy constructor maintained for backward compatibility.
360 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
361 ///
362 /// # Arguments
363 ///
364 /// * `l1_cache` - Moka L1 cache instance
365 /// * `l2_cache` - Redis L2 cache instance
366 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
367 debug!("Initializing Cache Manager...");
368
369 // Convert concrete types to trait objects
370 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
371 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
372
373 // Create RedisStreams backend for streaming functionality
374 let redis_url = std::env::var("REDIS_URL")
375 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
376 let redis_streams = crate::redis_streams::RedisStreams::new(&redis_url).await?;
377 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
378
379 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
380 }
381
382 /// Create new cache manager with invalidation support
383 ///
384 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
385 ///
386 /// # Arguments
387 ///
388 /// * `l1_cache` - Moka L1 cache instance
389 /// * `l2_cache` - Redis L2 cache instance
390 /// * `redis_url` - Redis connection URL for Pub/Sub
391 /// * `config` - Invalidation configuration
392 ///
393 /// # Example
394 ///
395 /// ```rust,ignore
396 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
397 ///
398 /// let config = InvalidationConfig {
399 /// channel: "my_app:cache:invalidate".to_string(),
400 /// ..Default::default()
401 /// };
402 ///
403 /// let manager = CacheManager::new_with_invalidation(
404 /// l1, l2, "redis://localhost", config
405 /// ).await?;
406 /// ```
407 pub async fn new_with_invalidation(
408 l1_cache: Arc<L1Cache>,
409 l2_cache: Arc<L2Cache>,
410 redis_url: &str,
411 config: InvalidationConfig,
412 ) -> Result<Self> {
413 debug!("Initializing Cache Manager with Invalidation...");
414 debug!(" Pub/Sub channel: {}", config.channel);
415
416 // Convert concrete types to trait objects
417 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
418 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
419
420 // Create RedisStreams backend for streaming functionality
421 let redis_streams = crate::redis_streams::RedisStreams::new(redis_url).await?;
422 let streaming_backend: Arc<dyn StreamingBackend> = Arc::new(redis_streams);
423
424 // Create publisher
425 let client = redis::Client::open(redis_url)?;
426 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
427 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
428
429 // Create subscriber
430 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
431 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
432
433 let manager = Self {
434 tiers: None, // Legacy mode: use l1_cache/l2_cache fields
435 l1_cache: l1_backend,
436 l2_cache: l2_backend,
437 l2_cache_concrete: Some(l2_cache),
438 streaming_backend: Some(streaming_backend),
439 total_requests: AtomicU64::new(0),
440 l1_hits: AtomicU64::new(0),
441 l2_hits: AtomicU64::new(0),
442 misses: AtomicU64::new(0),
443 promotions: AtomicUsize::new(0),
444 in_flight_requests: Arc::new(DashMap::new()),
445 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
446 invalidation_subscriber: Some(Arc::new(subscriber)),
447 invalidation_stats,
448 };
449
450 // Start subscriber with handler
451 manager.start_invalidation_subscriber();
452
453 info!("Cache Manager initialized with invalidation support");
454
455 Ok(manager)
456 }
457
458 /// Create new cache manager with multi-tier architecture (v0.5.0+)
459 ///
460 /// This constructor enables dynamic multi-tier caching with 3, 4, or more tiers.
461 /// Tiers are checked in order (lower tier_level = faster/hotter).
462 ///
463 /// # Arguments
464 ///
465 /// * `tiers` - Vector of configured cache tiers (must be sorted by tier_level ascending)
466 /// * `streaming_backend` - Optional streaming backend
467 ///
468 /// # Example
469 ///
470 /// ```rust,ignore
471 /// use multi_tier_cache::{CacheManager, CacheTier, TierConfig, L1Cache, L2Cache};
472 /// use std::sync::Arc;
473 ///
474 /// // L1 + L2 + L3 setup
475 /// let l1 = Arc::new(L1Cache::new().await?);
476 /// let l2 = Arc::new(L2Cache::new().await?);
477 /// let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?);
478 ///
479 /// let tiers = vec![
480 /// CacheTier::new(l1, 1, false, 1.0), // L1 - no promotion
481 /// CacheTier::new(l2, 2, true, 1.0), // L2 - promote to L1
482 /// CacheTier::new(l3, 3, true, 2.0), // L3 - promote to L2&L1, 2x TTL
483 /// ];
484 ///
485 /// let manager = CacheManager::new_with_tiers(tiers, None).await?;
486 /// ```
487 pub async fn new_with_tiers(
488 tiers: Vec<CacheTier>,
489 streaming_backend: Option<Arc<dyn StreamingBackend>>,
490 ) -> Result<Self> {
491 debug!("Initializing Multi-Tier Cache Manager...");
492 debug!(" Tier count: {}", tiers.len());
493 for tier in &tiers {
494 debug!(
495 " L{}: {} (promotion={}, ttl_scale={})",
496 tier.tier_level,
497 tier.stats.backend_name,
498 tier.promotion_enabled,
499 tier.ttl_scale
500 );
501 }
502
503 // Validate tiers are sorted by level
504 for i in 1..tiers.len() {
505 if tiers[i].tier_level <= tiers[i - 1].tier_level {
506 anyhow::bail!(
507 "Tiers must be sorted by tier_level ascending (found L{} after L{})",
508 tiers[i].tier_level,
509 tiers[i - 1].tier_level
510 );
511 }
512 }
513
514 // For backward compatibility with legacy code, we need dummy l1/l2 caches
515 // Use first tier as l1, second tier as l2 if available
516 let (l1_cache, l2_cache) = if tiers.len() >= 2 {
517 (tiers[0].backend.clone(), tiers[1].backend.clone())
518 } else if tiers.len() == 1 {
519 // Only one tier - use it for both
520 let tier = tiers[0].backend.clone();
521 (tier.clone(), tier)
522 } else {
523 anyhow::bail!("At least one cache tier is required");
524 };
525
526 // Convert to CacheBackend trait for l1 (L2CacheBackend extends CacheBackend)
527 let l1_backend: Arc<dyn CacheBackend> = Arc::new(ProxyCacheBackend {
528 backend: l1_cache.clone(),
529 });
530
531 Ok(Self {
532 tiers: Some(tiers),
533 l1_cache: l1_backend,
534 l2_cache,
535 l2_cache_concrete: None,
536 streaming_backend,
537 total_requests: AtomicU64::new(0),
538 l1_hits: AtomicU64::new(0),
539 l2_hits: AtomicU64::new(0),
540 misses: AtomicU64::new(0),
541 promotions: AtomicUsize::new(0),
542 in_flight_requests: Arc::new(DashMap::new()),
543 invalidation_publisher: None,
544 invalidation_subscriber: None,
545 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
546 })
547 }
548
549 /// Start the invalidation subscriber background task
550 fn start_invalidation_subscriber(&self) {
551 if let Some(subscriber) = &self.invalidation_subscriber {
552 let l1_cache = Arc::clone(&self.l1_cache);
553 let l2_cache_concrete = self.l2_cache_concrete.clone();
554
555 subscriber.start(move |msg| {
556 let l1 = Arc::clone(&l1_cache);
557 let _l2 = l2_cache_concrete.clone();
558
559 async move {
560 match msg {
561 InvalidationMessage::Remove { key } => {
562 // Remove from L1
563 l1.remove(&key).await?;
564 debug!("Invalidation: Removed '{}' from L1", key);
565 }
566 InvalidationMessage::Update { key, value, ttl_secs } => {
567 // Update L1 with new value
568 let ttl = ttl_secs
569 .map(Duration::from_secs)
570 .unwrap_or_else(|| Duration::from_secs(300));
571 l1.set_with_ttl(&key, value, ttl).await?;
572 debug!("Invalidation: Updated '{}' in L1", key);
573 }
574 InvalidationMessage::RemovePattern { pattern } => {
575 // For pattern-based invalidation, we can't easily iterate L1 cache
576 // So we just log it. The pattern invalidation is mainly for L2.
577 // L1 entries will naturally expire via TTL.
578 debug!("Invalidation: Pattern '{}' invalidated (L1 will expire naturally)", pattern);
579 }
580 InvalidationMessage::RemoveBulk { keys } => {
581 // Remove multiple keys from L1
582 for key in keys {
583 if let Err(e) = l1.remove(&key).await {
584 warn!("Failed to remove '{}' from L1: {}", key, e);
585 }
586 }
587 debug!("Invalidation: Bulk removed keys from L1");
588 }
589 }
590 Ok(())
591 }
592 });
593
594 info!("Invalidation subscriber started");
595 }
596 }
597
598 /// Get value from cache using multi-tier architecture (v0.5.0+)
599 ///
600 /// This method iterates through all configured tiers and automatically promotes
601 /// to upper tiers on cache hit.
602 async fn get_multi_tier(&self, key: &str) -> Result<Option<serde_json::Value>> {
603 let tiers = self.tiers.as_ref().unwrap(); // Safe: only called when tiers is Some
604
605 // Try each tier sequentially (sorted by tier_level)
606 for (tier_index, tier) in tiers.iter().enumerate() {
607 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
608 // Cache hit!
609 tier.record_hit();
610
611 // Promote to all upper tiers (if promotion enabled)
612 if tier.promotion_enabled && tier_index > 0 {
613 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
614
615 // Promote to all tiers above this one
616 for upper_tier in tiers[..tier_index].iter().rev() {
617 if let Err(e) = upper_tier.set_with_ttl(key, value.clone(), promotion_ttl).await {
618 warn!(
619 "Failed to promote '{}' from L{} to L{}: {}",
620 key, tier.tier_level, upper_tier.tier_level, e
621 );
622 } else {
623 self.promotions.fetch_add(1, Ordering::Relaxed);
624 debug!(
625 "Promoted '{}' from L{} to L{} (TTL: {:?})",
626 key, tier.tier_level, upper_tier.tier_level, promotion_ttl
627 );
628 }
629 }
630 }
631
632 return Ok(Some(value));
633 }
634 }
635
636 // Cache miss across all tiers
637 Ok(None)
638 }
639
640 /// Get value from cache (L1 first, then L2 fallback with promotion)
641 ///
642 /// This method now includes built-in Cache Stampede protection when cache misses occur.
643 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
644 /// unnecessary duplicate work on external data sources.
645 ///
646 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
647 ///
648 /// # Arguments
649 /// * `key` - Cache key to retrieve
650 ///
651 /// # Returns
652 /// * `Ok(Some(value))` - Cache hit, value found in any tier
653 /// * `Ok(None)` - Cache miss, value not found in any cache
654 /// * `Err(error)` - Cache operation failed
655 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
656 self.total_requests.fetch_add(1, Ordering::Relaxed);
657
658 // NEW: Multi-tier mode (v0.5.0+)
659 if self.tiers.is_some() {
660 // Fast path for L1 (first tier) - no locking needed
661 if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
662 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
663 tier1.record_hit();
664 // Update legacy stats for backward compatibility
665 self.l1_hits.fetch_add(1, Ordering::Relaxed);
666 return Ok(Some(value));
667 }
668 }
669
670 // L1 miss - use stampede protection for lower tiers
671 let key_owned = key.to_string();
672 let lock_guard = self.in_flight_requests
673 .entry(key_owned.clone())
674 .or_insert_with(|| Arc::new(Mutex::new(())))
675 .clone();
676
677 let _guard = lock_guard.lock().await;
678 let cleanup_guard = CleanupGuard {
679 map: &self.in_flight_requests,
680 key: key_owned.clone(),
681 };
682
683 // Double-check L1 after acquiring lock
684 if let Some(tier1) = self.tiers.as_ref().unwrap().first() {
685 if let Some((value, _ttl)) = tier1.get_with_ttl(key).await {
686 tier1.record_hit();
687 self.l1_hits.fetch_add(1, Ordering::Relaxed);
688 return Ok(Some(value));
689 }
690 }
691
692 // Check remaining tiers with promotion
693 let result = self.get_multi_tier(key).await?;
694
695 if result.is_some() {
696 // Hit in L2+ tier - update legacy stats
697 if self.tiers.as_ref().unwrap().len() >= 2 {
698 self.l2_hits.fetch_add(1, Ordering::Relaxed);
699 }
700 } else {
701 self.misses.fetch_add(1, Ordering::Relaxed);
702 }
703
704 drop(cleanup_guard);
705 return Ok(result);
706 }
707
708 // LEGACY: 2-tier mode (L1 + L2)
709 // Fast path: Try L1 first (no locking needed)
710 if let Some(value) = self.l1_cache.get(key).await {
711 self.l1_hits.fetch_add(1, Ordering::Relaxed);
712 return Ok(Some(value));
713 }
714
715 // L1 miss - implement Cache Stampede protection for L2 lookup
716 let key_owned = key.to_string();
717 let lock_guard = self.in_flight_requests
718 .entry(key_owned.clone())
719 .or_insert_with(|| Arc::new(Mutex::new(())))
720 .clone();
721
722 let _guard = lock_guard.lock().await;
723
724 // RAII cleanup guard - ensures entry is removed even on early return or panic
725 let cleanup_guard = CleanupGuard {
726 map: &self.in_flight_requests,
727 key: key_owned.clone(),
728 };
729
730 // Double-check L1 cache after acquiring lock
731 // (Another concurrent request might have populated it while we were waiting)
732 if let Some(value) = self.l1_cache.get(key).await {
733 self.l1_hits.fetch_add(1, Ordering::Relaxed);
734 // cleanup_guard will auto-remove entry on drop
735 return Ok(Some(value));
736 }
737
738 // Check L2 cache with TTL information
739 if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
740 self.l2_hits.fetch_add(1, Ordering::Relaxed);
741
742 // Promote to L1 with same TTL as Redis (or default if no TTL)
743 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
744
745 if self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await.is_err() {
746 // L1 promotion failed, but we still have the data
747 warn!("Failed to promote key '{}' to L1 cache", key);
748 } else {
749 self.promotions.fetch_add(1, Ordering::Relaxed);
750 debug!("Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
751 }
752
753 // cleanup_guard will auto-remove entry on drop
754 return Ok(Some(value));
755 }
756
757 // Both L1 and L2 miss
758 self.misses.fetch_add(1, Ordering::Relaxed);
759
760 // cleanup_guard will auto-remove entry on drop
761 drop(cleanup_guard);
762
763 Ok(None)
764 }
765
766
767 /// Set value with specific cache strategy (all tiers)
768 ///
769 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
770 /// In multi-tier mode, stores to ALL tiers with their respective TTL scaling.
771 pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
772 let ttl = strategy.to_duration();
773
774 // NEW: Multi-tier mode (v0.5.0+)
775 if let Some(tiers) = &self.tiers {
776 // Store in ALL tiers with their respective TTL scaling
777 let mut success_count = 0;
778 let mut last_error = None;
779
780 for tier in tiers {
781 match tier.set_with_ttl(key, value.clone(), ttl).await {
782 Ok(_) => {
783 success_count += 1;
784 }
785 Err(e) => {
786 error!("L{} cache set failed for key '{}': {}", tier.tier_level, key, e);
787 last_error = Some(e);
788 }
789 }
790 }
791
792 if success_count > 0 {
793 debug!("[Multi-Tier] Cached '{}' in {}/{} tiers (base TTL: {:?})",
794 key, success_count, tiers.len(), ttl);
795 return Ok(());
796 } else {
797 return Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All tiers failed for key '{}'", key)));
798 }
799 }
800
801 // LEGACY: 2-tier mode (L1 + L2)
802 // Store in both L1 and L2
803 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
804 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
805
806 // Return success if at least one cache succeeded
807 match (l1_result, l2_result) {
808 (Ok(_), Ok(_)) => {
809 // Both succeeded
810 debug!("[L1+L2] Cached '{}' with TTL {:?}", key, ttl);
811 Ok(())
812 }
813 (Ok(_), Err(_)) => {
814 // L1 succeeded, L2 failed
815 warn!("L2 cache set failed for key '{}', continuing with L1", key);
816 debug!("[L1] Cached '{}' with TTL {:?}", key, ttl);
817 Ok(())
818 }
819 (Err(_), Ok(_)) => {
820 // L1 failed, L2 succeeded
821 warn!("L1 cache set failed for key '{}', continuing with L2", key);
822 debug!("[L2] Cached '{}' with TTL {:?}", key, ttl);
823 Ok(())
824 }
825 (Err(e1), Err(_e2)) => {
826 // Both failed
827 Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
828 }
829 }
830 }
831
832 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
833 ///
834 /// This method provides comprehensive Cache Stampede protection:
835 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
836 /// 2. Check L2 cache with mutex-based coalescing
837 /// 3. Compute fresh data with protection against concurrent computations
838 ///
839 /// # Arguments
840 /// * `key` - Cache key
841 /// * `strategy` - Cache strategy for TTL and storage behavior
842 /// * `compute_fn` - Async function to compute the value if not in any cache
843 ///
844 /// # Example
845 /// ```ignore
846 /// let api_data = cache_manager.get_or_compute_with(
847 /// "api_response",
848 /// CacheStrategy::RealTime,
849 /// || async {
850 /// fetch_data_from_api().await
851 /// }
852 /// ).await?;
853 /// ```
854 #[allow(dead_code)]
855 pub async fn get_or_compute_with<F, Fut>(
856 &self,
857 key: &str,
858 strategy: CacheStrategy,
859 compute_fn: F,
860 ) -> Result<serde_json::Value>
861 where
862 F: FnOnce() -> Fut + Send,
863 Fut: Future<Output = Result<serde_json::Value>> + Send,
864 {
865 self.total_requests.fetch_add(1, Ordering::Relaxed);
866
867 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
868 if let Some(value) = self.l1_cache.get(key).await {
869 self.l1_hits.fetch_add(1, Ordering::Relaxed);
870 return Ok(value);
871 }
872
873 // 2. L1 miss - try L2 with Cache Stampede protection
874 let key_owned = key.to_string();
875 let lock_guard = self.in_flight_requests
876 .entry(key_owned.clone())
877 .or_insert_with(|| Arc::new(Mutex::new(())))
878 .clone();
879
880 let _guard = lock_guard.lock().await;
881
882 // RAII cleanup guard - ensures entry is removed even on early return or panic
883 let _cleanup_guard = CleanupGuard {
884 map: &self.in_flight_requests,
885 key: key_owned,
886 };
887
888 // 3. Double-check L1 cache after acquiring lock
889 // (Another request might have populated it while we were waiting)
890 if let Some(value) = self.l1_cache.get(key).await {
891 self.l1_hits.fetch_add(1, Ordering::Relaxed);
892 // _cleanup_guard will auto-remove entry on drop
893 return Ok(value);
894 }
895
896 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
897 if let Some(tiers) = &self.tiers {
898 // Check tiers starting from index 1 (skip L1 since already checked)
899 for tier in tiers.iter().skip(1) {
900 if let Some((value, ttl)) = tier.get_with_ttl(key).await {
901 tier.record_hit();
902
903
904 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
905 if let Err(e) = tiers[0].set_with_ttl(key, value.clone(), promotion_ttl).await {
906 warn!("Failed to promote '{}' from L{} to L1: {}", key, tier.tier_level, e);
907 } else {
908 self.promotions.fetch_add(1, Ordering::Relaxed);
909 debug!("Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
910 key, tier.tier_level, promotion_ttl);
911 }
912
913 // _cleanup_guard will auto-remove entry on drop
914 return Ok(value);
915 }
916 }
917 } else {
918 // LEGACY: Check L2 cache with TTL
919 if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
920 self.l2_hits.fetch_add(1, Ordering::Relaxed);
921
922 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
923 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
924
925 if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
926 warn!("Failed to promote key '{}' to L1: {}", key, e);
927 } else {
928 self.promotions.fetch_add(1, Ordering::Relaxed);
929 debug!("Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
930 }
931
932 // _cleanup_guard will auto-remove entry on drop
933 return Ok(value);
934 }
935 }
936
937 // 5. Cache miss across all tiers - compute fresh data
938 debug!("Computing fresh data for key: '{}' (Cache Stampede protected)", key);
939 let fresh_data = compute_fn().await?;
940
941 // 6. Store in both caches
942 if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
943 warn!("Failed to cache computed data for key '{}': {}", key, e);
944 }
945
946 // 7. _cleanup_guard will auto-remove entry on drop
947
948 Ok(fresh_data)
949 }
950
951 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
952 ///
953 /// This method provides the same functionality as `get_or_compute_with()` but with
954 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
955 /// API calls, or any computation that returns structured data.
956 ///
957 /// # Type Safety
958 ///
959 /// - Returns your actual type `T` instead of `serde_json::Value`
960 /// - Compiler enforces Serialize + DeserializeOwned bounds
961 /// - No manual JSON conversion needed
962 ///
963 /// # Cache Flow
964 ///
965 /// 1. Check L1 cache → deserialize if found
966 /// 2. Check L2 cache → deserialize + promote to L1 if found
967 /// 3. Execute compute_fn → serialize → store in L1+L2
968 /// 4. Full stampede protection (only ONE request computes)
969 ///
970 /// # Arguments
971 ///
972 /// * `key` - Cache key
973 /// * `strategy` - Cache strategy for TTL
974 /// * `compute_fn` - Async function returning `Result<T>`
975 ///
976 /// # Example - Database Query
977 ///
978 /// ```no_run
979 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
980 /// # use std::sync::Arc;
981 /// # use serde::{Serialize, Deserialize};
982 /// # async fn example() -> anyhow::Result<()> {
983 /// # let l1 = Arc::new(L1Cache::new().await?);
984 /// # let l2 = Arc::new(L2Cache::new().await?);
985 /// # let cache_manager = CacheManager::new(l1, l2);
986 ///
987 /// #[derive(Serialize, Deserialize)]
988 /// struct User {
989 /// id: i64,
990 /// name: String,
991 /// }
992 ///
993 /// // Type-safe database caching (example - requires sqlx)
994 /// // let user: User = cache_manager.get_or_compute_typed(
995 /// // "user:123",
996 /// // CacheStrategy::MediumTerm,
997 /// // || async {
998 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
999 /// // .bind(123)
1000 /// // .fetch_one(&pool)
1001 /// // .await
1002 /// // }
1003 /// // ).await?;
1004 /// # Ok(())
1005 /// # }
1006 /// ```
1007 ///
1008 /// # Example - API Call
1009 ///
1010 /// ```no_run
1011 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
1012 /// # use std::sync::Arc;
1013 /// # use serde::{Serialize, Deserialize};
1014 /// # async fn example() -> anyhow::Result<()> {
1015 /// # let l1 = Arc::new(L1Cache::new().await?);
1016 /// # let l2 = Arc::new(L2Cache::new().await?);
1017 /// # let cache_manager = CacheManager::new(l1, l2);
1018 /// #[derive(Serialize, Deserialize)]
1019 /// struct ApiResponse {
1020 /// data: String,
1021 /// timestamp: i64,
1022 /// }
1023 ///
1024 /// // API call caching (example - requires reqwest)
1025 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
1026 /// // "api:endpoint",
1027 /// // CacheStrategy::RealTime,
1028 /// // || async {
1029 /// // reqwest::get("https://api.example.com/data")
1030 /// // .await?
1031 /// // .json::<ApiResponse>()
1032 /// // .await
1033 /// // }
1034 /// // ).await?;
1035 /// # Ok(())
1036 /// # }
1037 /// ```
1038 ///
1039 /// # Performance
1040 ///
1041 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
1042 /// - L2 hit: 2-5ms + deserialization + L1 promotion
1043 /// - Compute: Your function time + serialization + L1+L2 storage
1044 /// - Stampede protection: 99.6% latency reduction under high concurrency
1045 ///
1046 /// # Errors
1047 ///
1048 /// Returns error if:
1049 /// - Compute function fails
1050 /// - Serialization fails (invalid type for JSON)
1051 /// - Deserialization fails (cache data doesn't match type T)
1052 /// - Cache operations fail (Redis connection issues)
1053 pub async fn get_or_compute_typed<T, F, Fut>(
1054 &self,
1055 key: &str,
1056 strategy: CacheStrategy,
1057 compute_fn: F,
1058 ) -> Result<T>
1059 where
1060 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1061 F: FnOnce() -> Fut + Send,
1062 Fut: Future<Output = Result<T>> + Send,
1063 {
1064 self.total_requests.fetch_add(1, Ordering::Relaxed);
1065
1066 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
1067 if let Some(cached_json) = self.l1_cache.get(key).await {
1068 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1069
1070 // Attempt to deserialize from JSON to type T
1071 match serde_json::from_value::<T>(cached_json) {
1072 Ok(typed_value) => {
1073 debug!("[L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
1074 return Ok(typed_value);
1075 }
1076 Err(e) => {
1077 // Deserialization failed - cache data may be stale or corrupt
1078 warn!("L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1079 // Fall through to recompute
1080 }
1081 }
1082 }
1083
1084 // 2. L1 miss - try L2 with Cache Stampede protection
1085 let key_owned = key.to_string();
1086 let lock_guard = self.in_flight_requests
1087 .entry(key_owned.clone())
1088 .or_insert_with(|| Arc::new(Mutex::new(())))
1089 .clone();
1090
1091 let _guard = lock_guard.lock().await;
1092
1093 // RAII cleanup guard - ensures entry is removed even on early return or panic
1094 let _cleanup_guard = CleanupGuard {
1095 map: &self.in_flight_requests,
1096 key: key_owned,
1097 };
1098
1099 // 3. Double-check L1 cache after acquiring lock
1100 // (Another request might have populated it while we were waiting)
1101 if let Some(cached_json) = self.l1_cache.get(key).await {
1102 self.l1_hits.fetch_add(1, Ordering::Relaxed);
1103 if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
1104 debug!("[L1 HIT] Deserialized '{}' after lock acquisition", key);
1105 return Ok(typed_value);
1106 }
1107 }
1108
1109 // 4. Check remaining tiers (L2, L3, L4...) with Stampede protection
1110 if let Some(tiers) = &self.tiers {
1111 // Check tiers starting from index 1 (skip L1 since already checked)
1112 for tier in tiers.iter().skip(1) {
1113 if let Some((cached_json, ttl)) = tier.get_with_ttl(key).await {
1114 tier.record_hit();
1115
1116 // Attempt to deserialize
1117 match serde_json::from_value::<T>(cached_json.clone()) {
1118 Ok(typed_value) => {
1119 debug!("[L{} HIT] Deserialized '{}' to type {}",
1120 tier.tier_level, key, std::any::type_name::<T>());
1121
1122 // Promote to L1 (first tier)
1123 let promotion_ttl = ttl.unwrap_or_else(|| strategy.to_duration());
1124 if let Err(e) = tiers[0].set_with_ttl(key, cached_json, promotion_ttl).await {
1125 warn!("Failed to promote '{}' from L{} to L1: {}",
1126 key, tier.tier_level, e);
1127 } else {
1128 self.promotions.fetch_add(1, Ordering::Relaxed);
1129 debug!("Promoted '{}' from L{} to L1 with TTL {:?} (Stampede protected)",
1130 key, tier.tier_level, promotion_ttl);
1131 }
1132
1133 return Ok(typed_value);
1134 }
1135 Err(e) => {
1136 warn!("L{} cache deserialization failed for key '{}': {}. Trying next tier.",
1137 tier.tier_level, key, e);
1138 // Continue to next tier
1139 }
1140 }
1141 }
1142 }
1143 } else {
1144 // LEGACY: Check L2 cache with TTL
1145 if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
1146 self.l2_hits.fetch_add(1, Ordering::Relaxed);
1147
1148 // Attempt to deserialize
1149 match serde_json::from_value::<T>(cached_json.clone()) {
1150 Ok(typed_value) => {
1151 debug!("[L2 HIT] Deserialized '{}' from Redis", key);
1152
1153 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
1154 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
1155
1156 if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
1157 warn!("Failed to promote key '{}' to L1: {}", key, e);
1158 } else {
1159 self.promotions.fetch_add(1, Ordering::Relaxed);
1160 debug!("Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
1161 }
1162
1163 return Ok(typed_value);
1164 }
1165 Err(e) => {
1166 warn!("L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
1167 // Fall through to recompute
1168 }
1169 }
1170 }
1171 }
1172
1173 // 5. Cache miss across all tiers (or deserialization failed) - compute fresh data
1174 debug!("Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
1175 let typed_value = compute_fn().await?;
1176
1177 // 6. Serialize to JSON for storage
1178 let json_value = serde_json::to_value(&typed_value)
1179 .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
1180
1181 // 7. Store in both L1 and L2 caches
1182 if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
1183 warn!("Failed to cache computed typed data for key '{}': {}", key, e);
1184 } else {
1185 debug!("Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
1186 }
1187
1188 // 8. _cleanup_guard will auto-remove entry on drop
1189
1190 Ok(typed_value)
1191 }
1192
1193 /// Get comprehensive cache statistics
1194 ///
1195 /// In multi-tier mode, aggregates statistics from all tiers.
1196 /// In legacy mode, returns L1 and L2 stats.
1197 #[allow(dead_code)]
1198 pub fn get_stats(&self) -> CacheManagerStats {
1199 let total_reqs = self.total_requests.load(Ordering::Relaxed);
1200 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
1201 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
1202 let misses = self.misses.load(Ordering::Relaxed);
1203
1204 CacheManagerStats {
1205 total_requests: total_reqs,
1206 l1_hits,
1207 l2_hits,
1208 total_hits: l1_hits + l2_hits,
1209 misses,
1210 hit_rate: if total_reqs > 0 {
1211 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
1212 } else { 0.0 },
1213 l1_hit_rate: if total_reqs > 0 {
1214 (l1_hits as f64 / total_reqs as f64) * 100.0
1215 } else { 0.0 },
1216 promotions: self.promotions.load(Ordering::Relaxed),
1217 in_flight_requests: self.in_flight_requests.len(),
1218 }
1219 }
1220
1221 /// Get per-tier statistics (v0.5.0+)
1222 ///
1223 /// Returns statistics for each tier if multi-tier mode is enabled.
1224 /// Returns None if using legacy 2-tier mode.
1225 ///
1226 /// # Example
1227 /// ```rust,ignore
1228 /// if let Some(tier_stats) = cache_manager.get_tier_stats() {
1229 /// for stats in tier_stats {
1230 /// println!("L{}: {} hits ({})",
1231 /// stats.tier_level,
1232 /// stats.hit_count(),
1233 /// stats.backend_name);
1234 /// }
1235 /// }
1236 /// ```
1237 pub fn get_tier_stats(&self) -> Option<Vec<TierStats>> {
1238 self.tiers.as_ref().map(|tiers| {
1239 tiers.iter().map(|tier| tier.stats.clone()).collect()
1240 })
1241 }
1242
1243 // ===== Redis Streams Methods =====
1244
1245 /// Publish data to Redis Stream
1246 ///
1247 /// # Arguments
1248 /// * `stream_key` - Name of the stream (e.g., "events_stream")
1249 /// * `fields` - Field-value pairs to publish
1250 /// * `maxlen` - Optional max length for stream trimming
1251 ///
1252 /// # Returns
1253 /// The entry ID generated by Redis
1254 ///
1255 /// # Errors
1256 /// Returns error if streaming backend is not configured
1257 pub async fn publish_to_stream(
1258 &self,
1259 stream_key: &str,
1260 fields: Vec<(String, String)>,
1261 maxlen: Option<usize>
1262 ) -> Result<String> {
1263 match &self.streaming_backend {
1264 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
1265 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1266 }
1267 }
1268
1269 /// Read latest entries from Redis Stream
1270 ///
1271 /// # Arguments
1272 /// * `stream_key` - Name of the stream
1273 /// * `count` - Number of latest entries to retrieve
1274 ///
1275 /// # Returns
1276 /// Vector of (entry_id, fields) tuples (newest first)
1277 ///
1278 /// # Errors
1279 /// Returns error if streaming backend is not configured
1280 pub async fn read_stream_latest(
1281 &self,
1282 stream_key: &str,
1283 count: usize
1284 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1285 match &self.streaming_backend {
1286 Some(backend) => backend.stream_read_latest(stream_key, count).await,
1287 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1288 }
1289 }
1290
1291 /// Read from Redis Stream with optional blocking
1292 ///
1293 /// # Arguments
1294 /// * `stream_key` - Name of the stream
1295 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
1296 /// * `count` - Max entries to retrieve
1297 /// * `block_ms` - Optional blocking timeout in ms
1298 ///
1299 /// # Returns
1300 /// Vector of (entry_id, fields) tuples
1301 ///
1302 /// # Errors
1303 /// Returns error if streaming backend is not configured
1304 pub async fn read_stream(
1305 &self,
1306 stream_key: &str,
1307 last_id: &str,
1308 count: usize,
1309 block_ms: Option<usize>
1310 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
1311 match &self.streaming_backend {
1312 Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
1313 None => Err(anyhow::anyhow!("Streaming backend not configured"))
1314 }
1315 }
1316
1317 // ===== Cache Invalidation Methods =====
1318
1319 /// Invalidate a cache key across all instances
1320 ///
1321 /// This removes the key from all cache tiers and broadcasts
1322 /// the invalidation to all other cache instances via Redis Pub/Sub.
1323 ///
1324 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1325 ///
1326 /// # Arguments
1327 /// * `key` - Cache key to invalidate
1328 ///
1329 /// # Example
1330 /// ```rust,ignore
1331 /// // Invalidate user cache after profile update
1332 /// cache_manager.invalidate("user:123").await?;
1333 /// ```
1334 pub async fn invalidate(&self, key: &str) -> Result<()> {
1335 // NEW: Multi-tier mode (v0.5.0+)
1336 if let Some(tiers) = &self.tiers {
1337 // Remove from ALL tiers
1338 for tier in tiers {
1339 if let Err(e) = tier.remove(key).await {
1340 warn!("Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1341 }
1342 }
1343 } else {
1344 // LEGACY: 2-tier mode
1345 self.l1_cache.remove(key).await?;
1346 self.l2_cache.remove(key).await?;
1347 }
1348
1349 // Broadcast to other instances
1350 if let Some(publisher) = &self.invalidation_publisher {
1351 let mut pub_lock = publisher.lock().await;
1352 let msg = InvalidationMessage::remove(key);
1353 pub_lock.publish(&msg).await?;
1354 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1355 }
1356
1357 debug!("Invalidated '{}' across all instances", key);
1358 Ok(())
1359 }
1360
1361 /// Update cache value across all instances
1362 ///
1363 /// This updates the key in all cache tiers and broadcasts
1364 /// the update to all other cache instances, avoiding cache misses.
1365 ///
1366 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1367 ///
1368 /// # Arguments
1369 /// * `key` - Cache key to update
1370 /// * `value` - New value
1371 /// * `ttl` - Optional TTL (uses default if None)
1372 ///
1373 /// # Example
1374 /// ```rust,ignore
1375 /// // Update user cache with new data
1376 /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
1377 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
1378 /// ```
1379 pub async fn update_cache(
1380 &self,
1381 key: &str,
1382 value: serde_json::Value,
1383 ttl: Option<Duration>,
1384 ) -> Result<()> {
1385 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
1386
1387 // NEW: Multi-tier mode (v0.5.0+)
1388 if let Some(tiers) = &self.tiers {
1389 // Update ALL tiers with their respective TTL scaling
1390 for tier in tiers {
1391 if let Err(e) = tier.set_with_ttl(key, value.clone(), ttl).await {
1392 warn!("Failed to update '{}' in L{}: {}", key, tier.tier_level, e);
1393 }
1394 }
1395 } else {
1396 // LEGACY: 2-tier mode
1397 self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
1398 self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
1399 }
1400
1401 // Broadcast update to other instances
1402 if let Some(publisher) = &self.invalidation_publisher {
1403 let mut pub_lock = publisher.lock().await;
1404 let msg = InvalidationMessage::update(key, value, Some(ttl));
1405 pub_lock.publish(&msg).await?;
1406 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1407 }
1408
1409 debug!("Updated '{}' across all instances", key);
1410 Ok(())
1411 }
1412
1413 /// Invalidate all keys matching a pattern
1414 ///
1415 /// This scans L2 cache for keys matching the pattern, removes them from all tiers,
1416 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
1417 ///
1418 /// Supports both legacy 2-tier mode and new multi-tier mode (v0.5.0+).
1419 ///
1420 /// **Note**: Pattern scanning requires a concrete L2Cache instance with `scan_keys()`.
1421 /// In multi-tier mode, this scans from L2 but removes from all tiers.
1422 ///
1423 /// # Arguments
1424 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
1425 ///
1426 /// # Example
1427 /// ```rust,ignore
1428 /// // Invalidate all user caches
1429 /// cache_manager.invalidate_pattern("user:*").await?;
1430 ///
1431 /// // Invalidate specific user's related caches
1432 /// cache_manager.invalidate_pattern("user:123:*").await?;
1433 /// ```
1434 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
1435 // Scan L2 for matching keys
1436 // (Note: Pattern scanning requires concrete L2Cache with scan_keys support)
1437 let keys = if let Some(l2) = &self.l2_cache_concrete {
1438 l2.scan_keys(pattern).await?
1439 } else {
1440 return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
1441 };
1442
1443 if keys.is_empty() {
1444 debug!("No keys found matching pattern '{}'", pattern);
1445 return Ok(());
1446 }
1447
1448 // NEW: Multi-tier mode (v0.5.0+)
1449 if let Some(tiers) = &self.tiers {
1450 // Remove from ALL tiers
1451 for key in &keys {
1452 for tier in tiers {
1453 if let Err(e) = tier.remove(key).await {
1454 warn!("Failed to remove '{}' from L{}: {}", key, tier.tier_level, e);
1455 }
1456 }
1457 }
1458 } else {
1459 // LEGACY: 2-tier mode - Remove from L2 in bulk
1460 if let Some(l2) = &self.l2_cache_concrete {
1461 l2.remove_bulk(&keys).await?;
1462 }
1463 }
1464
1465 // Broadcast pattern invalidation
1466 if let Some(publisher) = &self.invalidation_publisher {
1467 let mut pub_lock = publisher.lock().await;
1468 let msg = InvalidationMessage::remove_bulk(keys.clone());
1469 pub_lock.publish(&msg).await?;
1470 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1471 }
1472
1473 debug!("Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
1474 Ok(())
1475 }
1476
1477 /// Set value with automatic broadcast to all instances
1478 ///
1479 /// This is a write-through operation that updates the cache and
1480 /// broadcasts the update to all other instances automatically.
1481 ///
1482 /// # Arguments
1483 /// * `key` - Cache key
1484 /// * `value` - Value to cache
1485 /// * `strategy` - Cache strategy (determines TTL)
1486 ///
1487 /// # Example
1488 /// ```rust,ignore
1489 /// // Update and broadcast in one call
1490 /// let data = serde_json::json!({"status": "active"});
1491 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
1492 /// ```
1493 pub async fn set_with_broadcast(
1494 &self,
1495 key: &str,
1496 value: serde_json::Value,
1497 strategy: CacheStrategy,
1498 ) -> Result<()> {
1499 let ttl = strategy.to_duration();
1500
1501 // Set in local caches
1502 self.set_with_strategy(key, value.clone(), strategy).await?;
1503
1504 // Broadcast update if invalidation is enabled
1505 if let Some(publisher) = &self.invalidation_publisher {
1506 let mut pub_lock = publisher.lock().await;
1507 let msg = InvalidationMessage::update(key, value, Some(ttl));
1508 pub_lock.publish(&msg).await?;
1509 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
1510 }
1511
1512 Ok(())
1513 }
1514
1515 /// Get invalidation statistics
1516 ///
1517 /// Returns statistics about invalidation operations if invalidation is enabled.
1518 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
1519 if self.invalidation_subscriber.is_some() {
1520 Some(self.invalidation_stats.snapshot())
1521 } else {
1522 None
1523 }
1524 }
1525}
1526
1527/// Cache Manager statistics
1528#[allow(dead_code)]
1529#[derive(Debug, Clone)]
1530pub struct CacheManagerStats {
1531 pub total_requests: u64,
1532 pub l1_hits: u64,
1533 pub l2_hits: u64,
1534 pub total_hits: u64,
1535 pub misses: u64,
1536 pub hit_rate: f64,
1537 pub l1_hit_rate: f64,
1538 pub promotions: usize,
1539 pub in_flight_requests: usize,
1540}