multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use super::l1_cache::L1Cache;
15use super::l2_cache::L2Cache;
16use super::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
17use super::invalidation::{
18 InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
19 InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
20};
21
22/// RAII cleanup guard for in-flight request tracking
23/// Ensures that entries are removed from DashMap even on early return or panic
24struct CleanupGuard<'a> {
25 map: &'a DashMap<String, Arc<Mutex<()>>>,
26 key: String,
27}
28
29impl<'a> Drop for CleanupGuard<'a> {
30 fn drop(&mut self) {
31 self.map.remove(&self.key);
32 }
33}
34
35/// Cache strategies for different data types
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub enum CacheStrategy {
39 /// Real-time data - 10 seconds TTL
40 RealTime,
41 /// Short-term data - 5 minutes TTL
42 ShortTerm,
43 /// Medium-term data - 1 hour TTL
44 MediumTerm,
45 /// Long-term data - 3 hours TTL
46 LongTerm,
47 /// Custom TTL
48 Custom(Duration),
49 /// Default strategy (5 minutes)
50 Default,
51}
52
53impl CacheStrategy {
54 /// Convert strategy to duration
55 pub fn to_duration(&self) -> Duration {
56 match self {
57 Self::RealTime => Duration::from_secs(10),
58 Self::ShortTerm => Duration::from_secs(300), // 5 minutes
59 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
60 Self::LongTerm => Duration::from_secs(10800), // 3 hours
61 Self::Custom(duration) => *duration,
62 Self::Default => Duration::from_secs(300),
63 }
64 }
65}
66
67/// Cache Manager - Unified operations across L1 and L2
68pub struct CacheManager {
69 /// L1 Cache (trait object for pluggable backends)
70 l1_cache: Arc<dyn CacheBackend>,
71 /// L2 Cache (trait object for pluggable backends)
72 l2_cache: Arc<dyn L2CacheBackend>,
73 /// L2 Cache concrete instance (for invalidation scan_keys)
74 l2_cache_concrete: Option<Arc<L2Cache>>,
75 /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
76 streaming_backend: Option<Arc<dyn StreamingBackend>>,
77 /// Statistics
78 total_requests: Arc<AtomicU64>,
79 l1_hits: Arc<AtomicU64>,
80 l2_hits: Arc<AtomicU64>,
81 misses: Arc<AtomicU64>,
82 promotions: Arc<AtomicUsize>,
83 /// In-flight requests to prevent Cache Stampede on L2/compute operations
84 in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
85 /// Invalidation publisher (for broadcasting invalidation messages)
86 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
87 /// Invalidation subscriber (for receiving invalidation messages)
88 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
89 /// Invalidation statistics
90 invalidation_stats: Arc<AtomicInvalidationStats>,
91}
92
93impl CacheManager {
94 /// Create new cache manager with trait objects (pluggable backends)
95 ///
96 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
97 ///
98 /// # Arguments
99 ///
100 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
101 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
102 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
103 ///
104 /// # Example
105 ///
106 /// ```rust,ignore
107 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
108 /// use std::sync::Arc;
109 ///
110 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
111 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
112 ///
113 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
114 /// ```
115 pub async fn new_with_backends(
116 l1_cache: Arc<dyn CacheBackend>,
117 l2_cache: Arc<dyn L2CacheBackend>,
118 streaming_backend: Option<Arc<dyn StreamingBackend>>,
119 ) -> Result<Self> {
120 println!(" ðŊ Initializing Cache Manager with custom backends...");
121 println!(" L1: {}", l1_cache.name());
122 println!(" L2: {}", l2_cache.name());
123 if streaming_backend.is_some() {
124 println!(" Streaming: enabled");
125 } else {
126 println!(" Streaming: disabled");
127 }
128
129 Ok(Self {
130 l1_cache,
131 l2_cache,
132 l2_cache_concrete: None,
133 streaming_backend,
134 total_requests: Arc::new(AtomicU64::new(0)),
135 l1_hits: Arc::new(AtomicU64::new(0)),
136 l2_hits: Arc::new(AtomicU64::new(0)),
137 misses: Arc::new(AtomicU64::new(0)),
138 promotions: Arc::new(AtomicUsize::new(0)),
139 in_flight_requests: Arc::new(DashMap::new()),
140 invalidation_publisher: None,
141 invalidation_subscriber: None,
142 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
143 })
144 }
145
146 /// Create new cache manager with default backends (backward compatible)
147 ///
148 /// This is the legacy constructor maintained for backward compatibility.
149 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
150 ///
151 /// # Arguments
152 ///
153 /// * `l1_cache` - Moka L1 cache instance
154 /// * `l2_cache` - Redis L2 cache instance
155 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
156 println!(" ðŊ Initializing Cache Manager...");
157
158 // Convert concrete types to trait objects
159 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
160 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
161 // L2Cache also implements StreamingBackend, so use it for streaming
162 let streaming_backend: Arc<dyn StreamingBackend> = l2_cache;
163
164 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
165 }
166
167 /// Create new cache manager with invalidation support
168 ///
169 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
170 ///
171 /// # Arguments
172 ///
173 /// * `l1_cache` - Moka L1 cache instance
174 /// * `l2_cache` - Redis L2 cache instance
175 /// * `redis_url` - Redis connection URL for Pub/Sub
176 /// * `config` - Invalidation configuration
177 ///
178 /// # Example
179 ///
180 /// ```rust,ignore
181 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
182 ///
183 /// let config = InvalidationConfig {
184 /// channel: "my_app:cache:invalidate".to_string(),
185 /// ..Default::default()
186 /// };
187 ///
188 /// let manager = CacheManager::new_with_invalidation(
189 /// l1, l2, "redis://localhost", config
190 /// ).await?;
191 /// ```
192 pub async fn new_with_invalidation(
193 l1_cache: Arc<L1Cache>,
194 l2_cache: Arc<L2Cache>,
195 redis_url: &str,
196 config: InvalidationConfig,
197 ) -> Result<Self> {
198 println!(" ðŊ Initializing Cache Manager with Invalidation...");
199 println!(" Pub/Sub channel: {}", config.channel);
200
201 // Convert concrete types to trait objects
202 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
203 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
204 let streaming_backend: Arc<dyn StreamingBackend> = l2_cache.clone();
205
206 // Create publisher
207 let client = redis::Client::open(redis_url)?;
208 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
209 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
210
211 // Create subscriber
212 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
213 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
214
215 let manager = Self {
216 l1_cache: l1_backend,
217 l2_cache: l2_backend,
218 l2_cache_concrete: Some(l2_cache),
219 streaming_backend: Some(streaming_backend),
220 total_requests: Arc::new(AtomicU64::new(0)),
221 l1_hits: Arc::new(AtomicU64::new(0)),
222 l2_hits: Arc::new(AtomicU64::new(0)),
223 misses: Arc::new(AtomicU64::new(0)),
224 promotions: Arc::new(AtomicUsize::new(0)),
225 in_flight_requests: Arc::new(DashMap::new()),
226 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
227 invalidation_subscriber: Some(Arc::new(subscriber)),
228 invalidation_stats,
229 };
230
231 // Start subscriber with handler
232 manager.start_invalidation_subscriber();
233
234 println!(" â
Cache Manager initialized with invalidation support");
235
236 Ok(manager)
237 }
238
239 /// Start the invalidation subscriber background task
240 fn start_invalidation_subscriber(&self) {
241 if let Some(subscriber) = &self.invalidation_subscriber {
242 let l1_cache = Arc::clone(&self.l1_cache);
243 let l2_cache_concrete = self.l2_cache_concrete.clone();
244
245 subscriber.start(move |msg| {
246 let l1 = Arc::clone(&l1_cache);
247 let _l2 = l2_cache_concrete.clone();
248
249 async move {
250 match msg {
251 InvalidationMessage::Remove { key } => {
252 // Remove from L1
253 l1.remove(&key).await?;
254 println!("ðïļ [Invalidation] Removed '{}' from L1", key);
255 }
256 InvalidationMessage::Update { key, value, ttl_secs } => {
257 // Update L1 with new value
258 let ttl = ttl_secs
259 .map(Duration::from_secs)
260 .unwrap_or_else(|| Duration::from_secs(300));
261 l1.set_with_ttl(&key, value, ttl).await?;
262 println!("ð [Invalidation] Updated '{}' in L1", key);
263 }
264 InvalidationMessage::RemovePattern { pattern } => {
265 // For pattern-based invalidation, we can't easily iterate L1 cache
266 // So we just log it. The pattern invalidation is mainly for L2.
267 // L1 entries will naturally expire via TTL.
268 println!("ð [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
269 }
270 InvalidationMessage::RemoveBulk { keys } => {
271 // Remove multiple keys from L1
272 for key in keys {
273 if let Err(e) = l1.remove(&key).await {
274 eprintln!("â ïļ Failed to remove '{}' from L1: {}", key, e);
275 }
276 }
277 println!("ðïļ [Invalidation] Bulk removed keys from L1");
278 }
279 }
280 Ok(())
281 }
282 });
283
284 println!("ðĄ Invalidation subscriber started");
285 }
286 }
287
288 /// Get value from cache (L1 first, then L2 fallback with promotion)
289 ///
290 /// This method now includes built-in Cache Stampede protection when cache misses occur.
291 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
292 /// unnecessary duplicate work on external data sources.
293 ///
294 /// # Arguments
295 /// * `key` - Cache key to retrieve
296 ///
297 /// # Returns
298 /// * `Ok(Some(value))` - Cache hit, value found in L1 or L2
299 /// * `Ok(None)` - Cache miss, value not found in either cache
300 /// * `Err(error)` - Cache operation failed
301 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
302 self.total_requests.fetch_add(1, Ordering::Relaxed);
303
304 // Fast path: Try L1 first (no locking needed)
305 if let Some(value) = self.l1_cache.get(key).await {
306 self.l1_hits.fetch_add(1, Ordering::Relaxed);
307 return Ok(Some(value));
308 }
309
310 // L1 miss - implement Cache Stampede protection for L2 lookup
311 let key_owned = key.to_string();
312 let lock_guard = self.in_flight_requests
313 .entry(key_owned.clone())
314 .or_insert_with(|| Arc::new(Mutex::new(())))
315 .clone();
316
317 let _guard = lock_guard.lock().await;
318
319 // RAII cleanup guard - ensures entry is removed even on early return or panic
320 let cleanup_guard = CleanupGuard {
321 map: &self.in_flight_requests,
322 key: key_owned.clone(),
323 };
324
325 // Double-check L1 cache after acquiring lock
326 // (Another concurrent request might have populated it while we were waiting)
327 if let Some(value) = self.l1_cache.get(key).await {
328 self.l1_hits.fetch_add(1, Ordering::Relaxed);
329 // cleanup_guard will auto-remove entry on drop
330 return Ok(Some(value));
331 }
332
333 // Check L2 cache with TTL information
334 if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
335 self.l2_hits.fetch_add(1, Ordering::Relaxed);
336
337 // Promote to L1 with same TTL as Redis (or default if no TTL)
338 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
339
340 if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
341 // L1 promotion failed, but we still have the data
342 eprintln!("â ïļ Failed to promote key '{}' to L1 cache", key);
343 } else {
344 self.promotions.fetch_add(1, Ordering::Relaxed);
345 println!("âŽïļ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
346 }
347
348 // cleanup_guard will auto-remove entry on drop
349 return Ok(Some(value));
350 }
351
352 // Both L1 and L2 miss
353 self.misses.fetch_add(1, Ordering::Relaxed);
354
355 // cleanup_guard will auto-remove entry on drop
356 drop(cleanup_guard);
357
358 Ok(None)
359 }
360
361 /// Get value from cache with fallback computation (enhanced backward compatibility)
362 ///
363 /// This is a convenience method that combines `get()` with optional computation.
364 /// If the value is not found in cache, it will execute the compute function
365 /// and cache the result automatically.
366 ///
367 /// # Arguments
368 /// * `key` - Cache key
369 /// * `compute_fn` - Optional function to compute value if not in cache
370 /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
371 ///
372 /// # Returns
373 /// * `Ok(Some(value))` - Value found in cache or computed successfully
374 /// * `Ok(None)` - Value not in cache and no compute function provided
375 /// * `Err(error)` - Cache operation or computation failed
376 ///
377 /// # Example
378 /// ```rust
379 /// // Simple cache get (existing behavior)
380 /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
381 ///
382 /// // Get with computation fallback (new enhanced behavior)
383 /// let api_data = cache_manager.get_with_fallback(
384 /// "api_response",
385 /// Some(|| async { fetch_data_from_api().await }),
386 /// Some(CacheStrategy::RealTime)
387 /// ).await?;
388 /// ```
389
390 /// Set value with specific cache strategy (both L1 and L2)
391 pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
392 let ttl = strategy.to_duration();
393
394 // Store in both L1 and L2
395 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
396 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
397
398 // Return success if at least one cache succeeded
399 match (l1_result, l2_result) {
400 (Ok(_), Ok(_)) => {
401 // Both succeeded
402 println!("ðū [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
403 Ok(())
404 }
405 (Ok(_), Err(_)) => {
406 // L1 succeeded, L2 failed
407 eprintln!("â ïļ L2 cache set failed for key '{}', continuing with L1", key);
408 println!("ðū [L1] Cached '{}' with TTL {:?}", key, ttl);
409 Ok(())
410 }
411 (Err(_), Ok(_)) => {
412 // L1 failed, L2 succeeded
413 eprintln!("â ïļ L1 cache set failed for key '{}', continuing with L2", key);
414 println!("ðū [L2] Cached '{}' with TTL {:?}", key, ttl);
415 Ok(())
416 }
417 (Err(e1), Err(_e2)) => {
418 // Both failed
419 Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
420 }
421 }
422 }
423
424 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
425 ///
426 /// This method provides comprehensive Cache Stampede protection:
427 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
428 /// 2. Check L2 cache with mutex-based coalescing
429 /// 3. Compute fresh data with protection against concurrent computations
430 ///
431 /// # Arguments
432 /// * `key` - Cache key
433 /// * `strategy` - Cache strategy for TTL and storage behavior
434 /// * `compute_fn` - Async function to compute the value if not in any cache
435 ///
436 /// # Example
437 /// ```rust
438 /// let api_data = cache_manager.get_or_compute_with(
439 /// "api_response",
440 /// CacheStrategy::RealTime,
441 /// || async {
442 /// fetch_data_from_api().await
443 /// }
444 /// ).await?;
445 /// ```
446 #[allow(dead_code)]
447 pub async fn get_or_compute_with<F, Fut>(
448 &self,
449 key: &str,
450 strategy: CacheStrategy,
451 compute_fn: F,
452 ) -> Result<serde_json::Value>
453 where
454 F: FnOnce() -> Fut + Send,
455 Fut: Future<Output = Result<serde_json::Value>> + Send,
456 {
457 self.total_requests.fetch_add(1, Ordering::Relaxed);
458
459 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
460 if let Some(value) = self.l1_cache.get(key).await {
461 self.l1_hits.fetch_add(1, Ordering::Relaxed);
462 return Ok(value);
463 }
464
465 // 2. L1 miss - try L2 with Cache Stampede protection
466 let key_owned = key.to_string();
467 let lock_guard = self.in_flight_requests
468 .entry(key_owned.clone())
469 .or_insert_with(|| Arc::new(Mutex::new(())))
470 .clone();
471
472 let _guard = lock_guard.lock().await;
473
474 // RAII cleanup guard - ensures entry is removed even on early return or panic
475 let _cleanup_guard = CleanupGuard {
476 map: &self.in_flight_requests,
477 key: key_owned,
478 };
479
480 // 3. Double-check L1 cache after acquiring lock
481 // (Another request might have populated it while we were waiting)
482 if let Some(value) = self.l1_cache.get(key).await {
483 self.l1_hits.fetch_add(1, Ordering::Relaxed);
484 // _cleanup_guard will auto-remove entry on drop
485 return Ok(value);
486 }
487
488 // 4. Check L2 cache with TTL
489 if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
490 self.l2_hits.fetch_add(1, Ordering::Relaxed);
491
492 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
493 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
494
495 if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
496 eprintln!("â ïļ Failed to promote key '{}' to L1: {}", key, e);
497 } else {
498 self.promotions.fetch_add(1, Ordering::Relaxed);
499 println!("âŽïļ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
500 }
501
502 // _cleanup_guard will auto-remove entry on drop
503 return Ok(value);
504 }
505
506 // 5. Both L1 and L2 miss - compute fresh data
507 println!("ðŧ Computing fresh data for key: '{}' (Cache Stampede protected)", key);
508 let fresh_data = compute_fn().await?;
509
510 // 6. Store in both caches
511 if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
512 eprintln!("â ïļ Failed to cache computed data for key '{}': {}", key, e);
513 }
514
515 // 7. _cleanup_guard will auto-remove entry on drop
516
517 Ok(fresh_data)
518 }
519
520 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
521 ///
522 /// This method provides the same functionality as `get_or_compute_with()` but with
523 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
524 /// API calls, or any computation that returns structured data.
525 ///
526 /// # Type Safety
527 ///
528 /// - Returns your actual type `T` instead of `serde_json::Value`
529 /// - Compiler enforces Serialize + DeserializeOwned bounds
530 /// - No manual JSON conversion needed
531 ///
532 /// # Cache Flow
533 ///
534 /// 1. Check L1 cache â deserialize if found
535 /// 2. Check L2 cache â deserialize + promote to L1 if found
536 /// 3. Execute compute_fn â serialize â store in L1+L2
537 /// 4. Full stampede protection (only ONE request computes)
538 ///
539 /// # Arguments
540 ///
541 /// * `key` - Cache key
542 /// * `strategy` - Cache strategy for TTL
543 /// * `compute_fn` - Async function returning `Result<T>`
544 ///
545 /// # Example - Database Query
546 ///
547 /// ```rust
548 /// use serde::{Serialize, Deserialize};
549 ///
550 /// #[derive(Serialize, Deserialize)]
551 /// struct User {
552 /// id: i64,
553 /// name: String,
554 /// }
555 ///
556 /// // Type-safe database caching
557 /// let user: User = cache_manager.get_or_compute_typed(
558 /// "user:123",
559 /// CacheStrategy::MediumTerm,
560 /// || async {
561 /// // Your database query here
562 /// sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
563 /// .bind(123)
564 /// .fetch_one(&pool)
565 /// .await
566 /// }
567 /// ).await?;
568 /// ```
569 ///
570 /// # Example - API Call
571 ///
572 /// ```rust
573 /// #[derive(Serialize, Deserialize)]
574 /// struct ApiResponse {
575 /// data: String,
576 /// timestamp: i64,
577 /// }
578 ///
579 /// let response: ApiResponse = cache_manager.get_or_compute_typed(
580 /// "api:endpoint",
581 /// CacheStrategy::RealTime,
582 /// || async {
583 /// reqwest::get("https://api.example.com/data")
584 /// .await?
585 /// .json::<ApiResponse>()
586 /// .await
587 /// }
588 /// ).await?;
589 /// ```
590 ///
591 /// # Performance
592 ///
593 /// - L1 hit: <1ms + deserialization (~10-50Ξs for small structs)
594 /// - L2 hit: 2-5ms + deserialization + L1 promotion
595 /// - Compute: Your function time + serialization + L1+L2 storage
596 /// - Stampede protection: 99.6% latency reduction under high concurrency
597 ///
598 /// # Errors
599 ///
600 /// Returns error if:
601 /// - Compute function fails
602 /// - Serialization fails (invalid type for JSON)
603 /// - Deserialization fails (cache data doesn't match type T)
604 /// - Cache operations fail (Redis connection issues)
605 pub async fn get_or_compute_typed<T, F, Fut>(
606 &self,
607 key: &str,
608 strategy: CacheStrategy,
609 compute_fn: F,
610 ) -> Result<T>
611 where
612 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
613 F: FnOnce() -> Fut + Send,
614 Fut: Future<Output = Result<T>> + Send,
615 {
616 self.total_requests.fetch_add(1, Ordering::Relaxed);
617
618 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
619 if let Some(cached_json) = self.l1_cache.get(key).await {
620 self.l1_hits.fetch_add(1, Ordering::Relaxed);
621
622 // Attempt to deserialize from JSON to type T
623 match serde_json::from_value::<T>(cached_json) {
624 Ok(typed_value) => {
625 println!("â
[L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
626 return Ok(typed_value);
627 }
628 Err(e) => {
629 // Deserialization failed - cache data may be stale or corrupt
630 eprintln!("â ïļ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
631 // Fall through to recompute
632 }
633 }
634 }
635
636 // 2. L1 miss - try L2 with Cache Stampede protection
637 let key_owned = key.to_string();
638 let lock_guard = self.in_flight_requests
639 .entry(key_owned.clone())
640 .or_insert_with(|| Arc::new(Mutex::new(())))
641 .clone();
642
643 let _guard = lock_guard.lock().await;
644
645 // RAII cleanup guard - ensures entry is removed even on early return or panic
646 let _cleanup_guard = CleanupGuard {
647 map: &self.in_flight_requests,
648 key: key_owned,
649 };
650
651 // 3. Double-check L1 cache after acquiring lock
652 // (Another request might have populated it while we were waiting)
653 if let Some(cached_json) = self.l1_cache.get(key).await {
654 self.l1_hits.fetch_add(1, Ordering::Relaxed);
655 if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
656 println!("â
[L1 HIT] Deserialized '{}' after lock acquisition", key);
657 return Ok(typed_value);
658 }
659 }
660
661 // 4. Check L2 cache with TTL
662 if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
663 self.l2_hits.fetch_add(1, Ordering::Relaxed);
664
665 // Attempt to deserialize
666 match serde_json::from_value::<T>(cached_json.clone()) {
667 Ok(typed_value) => {
668 println!("â
[L2 HIT] Deserialized '{}' from Redis", key);
669
670 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
671 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
672
673 if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
674 eprintln!("â ïļ Failed to promote key '{}' to L1: {}", key, e);
675 } else {
676 self.promotions.fetch_add(1, Ordering::Relaxed);
677 println!("âŽïļ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
678 }
679
680 return Ok(typed_value);
681 }
682 Err(e) => {
683 eprintln!("â ïļ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
684 // Fall through to recompute
685 }
686 }
687 }
688
689 // 5. Both L1 and L2 miss (or deserialization failed) - compute fresh data
690 println!("ðŧ Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
691 let typed_value = compute_fn().await?;
692
693 // 6. Serialize to JSON for storage
694 let json_value = serde_json::to_value(&typed_value)
695 .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
696
697 // 7. Store in both L1 and L2 caches
698 if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
699 eprintln!("â ïļ Failed to cache computed typed data for key '{}': {}", key, e);
700 } else {
701 println!("ðū Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
702 }
703
704 // 8. _cleanup_guard will auto-remove entry on drop
705
706 Ok(typed_value)
707 }
708
709 /// Get comprehensive cache statistics
710 #[allow(dead_code)]
711 pub fn get_stats(&self) -> CacheManagerStats {
712 let total_reqs = self.total_requests.load(Ordering::Relaxed);
713 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
714 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
715 let misses = self.misses.load(Ordering::Relaxed);
716
717 CacheManagerStats {
718 total_requests: total_reqs,
719 l1_hits,
720 l2_hits,
721 total_hits: l1_hits + l2_hits,
722 misses,
723 hit_rate: if total_reqs > 0 {
724 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
725 } else { 0.0 },
726 l1_hit_rate: if total_reqs > 0 {
727 (l1_hits as f64 / total_reqs as f64) * 100.0
728 } else { 0.0 },
729 promotions: self.promotions.load(Ordering::Relaxed),
730 in_flight_requests: self.in_flight_requests.len(),
731 }
732 }
733
734 // ===== Redis Streams Methods =====
735
736 /// Publish data to Redis Stream
737 ///
738 /// # Arguments
739 /// * `stream_key` - Name of the stream (e.g., "events_stream")
740 /// * `fields` - Field-value pairs to publish
741 /// * `maxlen` - Optional max length for stream trimming
742 ///
743 /// # Returns
744 /// The entry ID generated by Redis
745 ///
746 /// # Errors
747 /// Returns error if streaming backend is not configured
748 pub async fn publish_to_stream(
749 &self,
750 stream_key: &str,
751 fields: Vec<(String, String)>,
752 maxlen: Option<usize>
753 ) -> Result<String> {
754 match &self.streaming_backend {
755 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
756 None => Err(anyhow::anyhow!("Streaming backend not configured"))
757 }
758 }
759
760 /// Read latest entries from Redis Stream
761 ///
762 /// # Arguments
763 /// * `stream_key` - Name of the stream
764 /// * `count` - Number of latest entries to retrieve
765 ///
766 /// # Returns
767 /// Vector of (entry_id, fields) tuples (newest first)
768 ///
769 /// # Errors
770 /// Returns error if streaming backend is not configured
771 pub async fn read_stream_latest(
772 &self,
773 stream_key: &str,
774 count: usize
775 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
776 match &self.streaming_backend {
777 Some(backend) => backend.stream_read_latest(stream_key, count).await,
778 None => Err(anyhow::anyhow!("Streaming backend not configured"))
779 }
780 }
781
782 /// Read from Redis Stream with optional blocking
783 ///
784 /// # Arguments
785 /// * `stream_key` - Name of the stream
786 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
787 /// * `count` - Max entries to retrieve
788 /// * `block_ms` - Optional blocking timeout in ms
789 ///
790 /// # Returns
791 /// Vector of (entry_id, fields) tuples
792 ///
793 /// # Errors
794 /// Returns error if streaming backend is not configured
795 pub async fn read_stream(
796 &self,
797 stream_key: &str,
798 last_id: &str,
799 count: usize,
800 block_ms: Option<usize>
801 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
802 match &self.streaming_backend {
803 Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
804 None => Err(anyhow::anyhow!("Streaming backend not configured"))
805 }
806 }
807
808 // ===== Cache Invalidation Methods =====
809
810 /// Invalidate a cache key across all instances
811 ///
812 /// This removes the key from both L1 and L2 caches and broadcasts
813 /// the invalidation to all other cache instances via Redis Pub/Sub.
814 ///
815 /// # Arguments
816 /// * `key` - Cache key to invalidate
817 ///
818 /// # Example
819 /// ```rust,ignore
820 /// // Invalidate user cache after profile update
821 /// cache_manager.invalidate("user:123").await?;
822 /// ```
823 pub async fn invalidate(&self, key: &str) -> Result<()> {
824 // Remove from local L1 cache
825 self.l1_cache.remove(key).await?;
826
827 // Remove from L2 cache
828 self.l2_cache.remove(key).await?;
829
830 // Broadcast to other instances
831 if let Some(publisher) = &self.invalidation_publisher {
832 let mut pub_lock = publisher.lock().await;
833 let msg = InvalidationMessage::remove(key);
834 pub_lock.publish(&msg).await?;
835 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
836 }
837
838 println!("ðïļ Invalidated '{}' across all instances", key);
839 Ok(())
840 }
841
842 /// Update cache value across all instances
843 ///
844 /// This updates the key in both L1 and L2 caches and broadcasts
845 /// the update to all other cache instances, avoiding cache misses.
846 ///
847 /// # Arguments
848 /// * `key` - Cache key to update
849 /// * `value` - New value
850 /// * `ttl` - Optional TTL (uses default if None)
851 ///
852 /// # Example
853 /// ```rust,ignore
854 /// // Update user cache with new data
855 /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
856 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
857 /// ```
858 pub async fn update_cache(
859 &self,
860 key: &str,
861 value: serde_json::Value,
862 ttl: Option<Duration>,
863 ) -> Result<()> {
864 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
865
866 // Update local L1 cache
867 self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
868
869 // Update L2 cache
870 self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
871
872 // Broadcast update to other instances
873 if let Some(publisher) = &self.invalidation_publisher {
874 let mut pub_lock = publisher.lock().await;
875 let msg = InvalidationMessage::update(key, value, Some(ttl));
876 pub_lock.publish(&msg).await?;
877 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
878 }
879
880 println!("ð Updated '{}' across all instances", key);
881 Ok(())
882 }
883
884 /// Invalidate all keys matching a pattern
885 ///
886 /// This scans L2 cache for keys matching the pattern, removes them,
887 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
888 ///
889 /// # Arguments
890 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
891 ///
892 /// # Example
893 /// ```rust,ignore
894 /// // Invalidate all user caches
895 /// cache_manager.invalidate_pattern("user:*").await?;
896 ///
897 /// // Invalidate specific user's related caches
898 /// cache_manager.invalidate_pattern("user:123:*").await?;
899 /// ```
900 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
901 // Scan L2 for matching keys
902 let keys = if let Some(l2) = &self.l2_cache_concrete {
903 l2.scan_keys(pattern).await?
904 } else {
905 return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
906 };
907
908 if keys.is_empty() {
909 println!("ð No keys found matching pattern '{}'", pattern);
910 return Ok(());
911 }
912
913 // Remove from L2 in bulk
914 if let Some(l2) = &self.l2_cache_concrete {
915 l2.remove_bulk(&keys).await?;
916 }
917
918 // Broadcast pattern invalidation
919 if let Some(publisher) = &self.invalidation_publisher {
920 let mut pub_lock = publisher.lock().await;
921 let msg = InvalidationMessage::remove_bulk(keys.clone());
922 pub_lock.publish(&msg).await?;
923 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
924 }
925
926 println!("ð Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
927 Ok(())
928 }
929
930 /// Set value with automatic broadcast to all instances
931 ///
932 /// This is a write-through operation that updates the cache and
933 /// broadcasts the update to all other instances automatically.
934 ///
935 /// # Arguments
936 /// * `key` - Cache key
937 /// * `value` - Value to cache
938 /// * `strategy` - Cache strategy (determines TTL)
939 ///
940 /// # Example
941 /// ```rust,ignore
942 /// // Update and broadcast in one call
943 /// let data = serde_json::json!({"status": "active"});
944 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
945 /// ```
946 pub async fn set_with_broadcast(
947 &self,
948 key: &str,
949 value: serde_json::Value,
950 strategy: CacheStrategy,
951 ) -> Result<()> {
952 let ttl = strategy.to_duration();
953
954 // Set in local caches
955 self.set_with_strategy(key, value.clone(), strategy).await?;
956
957 // Broadcast update if invalidation is enabled
958 if let Some(publisher) = &self.invalidation_publisher {
959 let mut pub_lock = publisher.lock().await;
960 let msg = InvalidationMessage::update(key, value, Some(ttl));
961 pub_lock.publish(&msg).await?;
962 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
963 }
964
965 Ok(())
966 }
967
968 /// Get invalidation statistics
969 ///
970 /// Returns statistics about invalidation operations if invalidation is enabled.
971 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
972 if self.invalidation_subscriber.is_some() {
973 Some(self.invalidation_stats.snapshot())
974 } else {
975 None
976 }
977 }
978}
979
980/// Cache Manager statistics
981#[allow(dead_code)]
982#[derive(Debug, Clone)]
983pub struct CacheManagerStats {
984 pub total_requests: u64,
985 pub l1_hits: u64,
986 pub l2_hits: u64,
987 pub total_hits: u64,
988 pub misses: u64,
989 pub hit_rate: f64,
990 pub l1_hit_rate: f64,
991 pub promotions: usize,
992 pub in_flight_requests: usize,
993}