multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use super::l1_cache::L1Cache;
15use super::l2_cache::L2Cache;
16use super::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
17use super::invalidation::{
18 InvalidationConfig, InvalidationPublisher, InvalidationSubscriber,
19 InvalidationMessage, AtomicInvalidationStats, InvalidationStats,
20};
21
22/// RAII cleanup guard for in-flight request tracking
23/// Ensures that entries are removed from DashMap even on early return or panic
24struct CleanupGuard<'a> {
25 map: &'a DashMap<String, Arc<Mutex<()>>>,
26 key: String,
27}
28
29impl<'a> Drop for CleanupGuard<'a> {
30 fn drop(&mut self) {
31 self.map.remove(&self.key);
32 }
33}
34
35/// Cache strategies for different data types
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub enum CacheStrategy {
39 /// Real-time data - 10 seconds TTL
40 RealTime,
41 /// Short-term data - 5 minutes TTL
42 ShortTerm,
43 /// Medium-term data - 1 hour TTL
44 MediumTerm,
45 /// Long-term data - 3 hours TTL
46 LongTerm,
47 /// Custom TTL
48 Custom(Duration),
49 /// Default strategy (5 minutes)
50 Default,
51}
52
53impl CacheStrategy {
54 /// Convert strategy to duration
55 pub fn to_duration(&self) -> Duration {
56 match self {
57 Self::RealTime => Duration::from_secs(10),
58 Self::ShortTerm => Duration::from_secs(300), // 5 minutes
59 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
60 Self::LongTerm => Duration::from_secs(10800), // 3 hours
61 Self::Custom(duration) => *duration,
62 Self::Default => Duration::from_secs(300),
63 }
64 }
65}
66
67/// Cache Manager - Unified operations across L1 and L2
68pub struct CacheManager {
69 /// L1 Cache (trait object for pluggable backends)
70 l1_cache: Arc<dyn CacheBackend>,
71 /// L2 Cache (trait object for pluggable backends)
72 l2_cache: Arc<dyn L2CacheBackend>,
73 /// L2 Cache concrete instance (for invalidation scan_keys)
74 l2_cache_concrete: Option<Arc<L2Cache>>,
75 /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
76 streaming_backend: Option<Arc<dyn StreamingBackend>>,
77 /// Statistics
78 total_requests: Arc<AtomicU64>,
79 l1_hits: Arc<AtomicU64>,
80 l2_hits: Arc<AtomicU64>,
81 misses: Arc<AtomicU64>,
82 promotions: Arc<AtomicUsize>,
83 /// In-flight requests to prevent Cache Stampede on L2/compute operations
84 in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
85 /// Invalidation publisher (for broadcasting invalidation messages)
86 invalidation_publisher: Option<Arc<Mutex<InvalidationPublisher>>>,
87 /// Invalidation subscriber (for receiving invalidation messages)
88 invalidation_subscriber: Option<Arc<InvalidationSubscriber>>,
89 /// Invalidation statistics
90 invalidation_stats: Arc<AtomicInvalidationStats>,
91}
92
93impl CacheManager {
94 /// Create new cache manager with trait objects (pluggable backends)
95 ///
96 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
97 ///
98 /// # Arguments
99 ///
100 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
101 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
102 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
103 ///
104 /// # Example
105 ///
106 /// ```rust,ignore
107 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
108 /// use std::sync::Arc;
109 ///
110 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
111 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
112 ///
113 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
114 /// ```
115 pub async fn new_with_backends(
116 l1_cache: Arc<dyn CacheBackend>,
117 l2_cache: Arc<dyn L2CacheBackend>,
118 streaming_backend: Option<Arc<dyn StreamingBackend>>,
119 ) -> Result<Self> {
120 println!(" ðŊ Initializing Cache Manager with custom backends...");
121 println!(" L1: {}", l1_cache.name());
122 println!(" L2: {}", l2_cache.name());
123 if streaming_backend.is_some() {
124 println!(" Streaming: enabled");
125 } else {
126 println!(" Streaming: disabled");
127 }
128
129 Ok(Self {
130 l1_cache,
131 l2_cache,
132 l2_cache_concrete: None,
133 streaming_backend,
134 total_requests: Arc::new(AtomicU64::new(0)),
135 l1_hits: Arc::new(AtomicU64::new(0)),
136 l2_hits: Arc::new(AtomicU64::new(0)),
137 misses: Arc::new(AtomicU64::new(0)),
138 promotions: Arc::new(AtomicUsize::new(0)),
139 in_flight_requests: Arc::new(DashMap::new()),
140 invalidation_publisher: None,
141 invalidation_subscriber: None,
142 invalidation_stats: Arc::new(AtomicInvalidationStats::default()),
143 })
144 }
145
146 /// Create new cache manager with default backends (backward compatible)
147 ///
148 /// This is the legacy constructor maintained for backward compatibility.
149 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
150 ///
151 /// # Arguments
152 ///
153 /// * `l1_cache` - Moka L1 cache instance
154 /// * `l2_cache` - Redis L2 cache instance
155 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
156 println!(" ðŊ Initializing Cache Manager...");
157
158 // Convert concrete types to trait objects
159 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
160 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
161 // L2Cache also implements StreamingBackend, so use it for streaming
162 let streaming_backend: Arc<dyn StreamingBackend> = l2_cache;
163
164 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
165 }
166
167 /// Create new cache manager with invalidation support
168 ///
169 /// This constructor enables cross-instance cache invalidation via Redis Pub/Sub.
170 ///
171 /// # Arguments
172 ///
173 /// * `l1_cache` - Moka L1 cache instance
174 /// * `l2_cache` - Redis L2 cache instance
175 /// * `redis_url` - Redis connection URL for Pub/Sub
176 /// * `config` - Invalidation configuration
177 ///
178 /// # Example
179 ///
180 /// ```rust,ignore
181 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
182 ///
183 /// let config = InvalidationConfig {
184 /// channel: "my_app:cache:invalidate".to_string(),
185 /// ..Default::default()
186 /// };
187 ///
188 /// let manager = CacheManager::new_with_invalidation(
189 /// l1, l2, "redis://localhost", config
190 /// ).await?;
191 /// ```
192 pub async fn new_with_invalidation(
193 l1_cache: Arc<L1Cache>,
194 l2_cache: Arc<L2Cache>,
195 redis_url: &str,
196 config: InvalidationConfig,
197 ) -> Result<Self> {
198 println!(" ðŊ Initializing Cache Manager with Invalidation...");
199 println!(" Pub/Sub channel: {}", config.channel);
200
201 // Convert concrete types to trait objects
202 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
203 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
204 let streaming_backend: Arc<dyn StreamingBackend> = l2_cache.clone();
205
206 // Create publisher
207 let client = redis::Client::open(redis_url)?;
208 let conn_manager = redis::aio::ConnectionManager::new(client).await?;
209 let publisher = InvalidationPublisher::new(conn_manager, config.clone());
210
211 // Create subscriber
212 let subscriber = InvalidationSubscriber::new(redis_url, config.clone())?;
213 let invalidation_stats = Arc::new(AtomicInvalidationStats::default());
214
215 let manager = Self {
216 l1_cache: l1_backend,
217 l2_cache: l2_backend,
218 l2_cache_concrete: Some(l2_cache),
219 streaming_backend: Some(streaming_backend),
220 total_requests: Arc::new(AtomicU64::new(0)),
221 l1_hits: Arc::new(AtomicU64::new(0)),
222 l2_hits: Arc::new(AtomicU64::new(0)),
223 misses: Arc::new(AtomicU64::new(0)),
224 promotions: Arc::new(AtomicUsize::new(0)),
225 in_flight_requests: Arc::new(DashMap::new()),
226 invalidation_publisher: Some(Arc::new(Mutex::new(publisher))),
227 invalidation_subscriber: Some(Arc::new(subscriber)),
228 invalidation_stats,
229 };
230
231 // Start subscriber with handler
232 manager.start_invalidation_subscriber();
233
234 println!(" â
Cache Manager initialized with invalidation support");
235
236 Ok(manager)
237 }
238
239 /// Start the invalidation subscriber background task
240 fn start_invalidation_subscriber(&self) {
241 if let Some(subscriber) = &self.invalidation_subscriber {
242 let l1_cache = Arc::clone(&self.l1_cache);
243 let l2_cache_concrete = self.l2_cache_concrete.clone();
244
245 subscriber.start(move |msg| {
246 let l1 = Arc::clone(&l1_cache);
247 let _l2 = l2_cache_concrete.clone();
248
249 async move {
250 match msg {
251 InvalidationMessage::Remove { key } => {
252 // Remove from L1
253 l1.remove(&key).await?;
254 println!("ðïļ [Invalidation] Removed '{}' from L1", key);
255 }
256 InvalidationMessage::Update { key, value, ttl_secs } => {
257 // Update L1 with new value
258 let ttl = ttl_secs
259 .map(Duration::from_secs)
260 .unwrap_or_else(|| Duration::from_secs(300));
261 l1.set_with_ttl(&key, value, ttl).await?;
262 println!("ð [Invalidation] Updated '{}' in L1", key);
263 }
264 InvalidationMessage::RemovePattern { pattern } => {
265 // For pattern-based invalidation, we can't easily iterate L1 cache
266 // So we just log it. The pattern invalidation is mainly for L2.
267 // L1 entries will naturally expire via TTL.
268 println!("ð [Invalidation] Pattern '{}' invalidated (L1 will expire naturally)", pattern);
269 }
270 InvalidationMessage::RemoveBulk { keys } => {
271 // Remove multiple keys from L1
272 for key in keys {
273 if let Err(e) = l1.remove(&key).await {
274 eprintln!("â ïļ Failed to remove '{}' from L1: {}", key, e);
275 }
276 }
277 println!("ðïļ [Invalidation] Bulk removed keys from L1");
278 }
279 }
280 Ok(())
281 }
282 });
283
284 println!("ðĄ Invalidation subscriber started");
285 }
286 }
287
288 /// Get value from cache (L1 first, then L2 fallback with promotion)
289 ///
290 /// This method now includes built-in Cache Stampede protection when cache misses occur.
291 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
292 /// unnecessary duplicate work on external data sources.
293 ///
294 /// # Arguments
295 /// * `key` - Cache key to retrieve
296 ///
297 /// # Returns
298 /// * `Ok(Some(value))` - Cache hit, value found in L1 or L2
299 /// * `Ok(None)` - Cache miss, value not found in either cache
300 /// * `Err(error)` - Cache operation failed
301 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
302 self.total_requests.fetch_add(1, Ordering::Relaxed);
303
304 // Fast path: Try L1 first (no locking needed)
305 if let Some(value) = self.l1_cache.get(key).await {
306 self.l1_hits.fetch_add(1, Ordering::Relaxed);
307 return Ok(Some(value));
308 }
309
310 // L1 miss - implement Cache Stampede protection for L2 lookup
311 let key_owned = key.to_string();
312 let lock_guard = self.in_flight_requests
313 .entry(key_owned.clone())
314 .or_insert_with(|| Arc::new(Mutex::new(())))
315 .clone();
316
317 let _guard = lock_guard.lock().await;
318
319 // RAII cleanup guard - ensures entry is removed even on early return or panic
320 let cleanup_guard = CleanupGuard {
321 map: &self.in_flight_requests,
322 key: key_owned.clone(),
323 };
324
325 // Double-check L1 cache after acquiring lock
326 // (Another concurrent request might have populated it while we were waiting)
327 if let Some(value) = self.l1_cache.get(key).await {
328 self.l1_hits.fetch_add(1, Ordering::Relaxed);
329 // cleanup_guard will auto-remove entry on drop
330 return Ok(Some(value));
331 }
332
333 // Check L2 cache with TTL information
334 if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
335 self.l2_hits.fetch_add(1, Ordering::Relaxed);
336
337 // Promote to L1 with same TTL as Redis (or default if no TTL)
338 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
339
340 if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
341 // L1 promotion failed, but we still have the data
342 eprintln!("â ïļ Failed to promote key '{}' to L1 cache", key);
343 } else {
344 self.promotions.fetch_add(1, Ordering::Relaxed);
345 println!("âŽïļ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
346 }
347
348 // cleanup_guard will auto-remove entry on drop
349 return Ok(Some(value));
350 }
351
352 // Both L1 and L2 miss
353 self.misses.fetch_add(1, Ordering::Relaxed);
354
355 // cleanup_guard will auto-remove entry on drop
356 drop(cleanup_guard);
357
358 Ok(None)
359 }
360
361 /// Get value from cache with fallback computation (enhanced backward compatibility)
362 ///
363 /// This is a convenience method that combines `get()` with optional computation.
364 /// If the value is not found in cache, it will execute the compute function
365 /// and cache the result automatically.
366 ///
367 /// # Arguments
368 /// * `key` - Cache key
369 /// * `compute_fn` - Optional function to compute value if not in cache
370 /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
371 ///
372 /// # Returns
373 /// * `Ok(Some(value))` - Value found in cache or computed successfully
374 /// * `Ok(None)` - Value not in cache and no compute function provided
375 /// * `Err(error)` - Cache operation or computation failed
376 ///
377 /// # Example
378 /// ```ignore
379 /// // Simple cache get (existing behavior)
380 /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
381 ///
382 /// // Get with computation fallback (new enhanced behavior)
383 /// let api_data = cache_manager.get_with_fallback(
384 /// "api_response",
385 /// Some(|| async { fetch_data_from_api().await }),
386 /// Some(CacheStrategy::RealTime)
387 /// ).await?;
388 /// ```
389
390 /// Set value with specific cache strategy (both L1 and L2)
391 pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
392 let ttl = strategy.to_duration();
393
394 // Store in both L1 and L2
395 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
396 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
397
398 // Return success if at least one cache succeeded
399 match (l1_result, l2_result) {
400 (Ok(_), Ok(_)) => {
401 // Both succeeded
402 println!("ðū [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
403 Ok(())
404 }
405 (Ok(_), Err(_)) => {
406 // L1 succeeded, L2 failed
407 eprintln!("â ïļ L2 cache set failed for key '{}', continuing with L1", key);
408 println!("ðū [L1] Cached '{}' with TTL {:?}", key, ttl);
409 Ok(())
410 }
411 (Err(_), Ok(_)) => {
412 // L1 failed, L2 succeeded
413 eprintln!("â ïļ L1 cache set failed for key '{}', continuing with L2", key);
414 println!("ðū [L2] Cached '{}' with TTL {:?}", key, ttl);
415 Ok(())
416 }
417 (Err(e1), Err(_e2)) => {
418 // Both failed
419 Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
420 }
421 }
422 }
423
424 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
425 ///
426 /// This method provides comprehensive Cache Stampede protection:
427 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
428 /// 2. Check L2 cache with mutex-based coalescing
429 /// 3. Compute fresh data with protection against concurrent computations
430 ///
431 /// # Arguments
432 /// * `key` - Cache key
433 /// * `strategy` - Cache strategy for TTL and storage behavior
434 /// * `compute_fn` - Async function to compute the value if not in any cache
435 ///
436 /// # Example
437 /// ```ignore
438 /// let api_data = cache_manager.get_or_compute_with(
439 /// "api_response",
440 /// CacheStrategy::RealTime,
441 /// || async {
442 /// fetch_data_from_api().await
443 /// }
444 /// ).await?;
445 /// ```
446 #[allow(dead_code)]
447 pub async fn get_or_compute_with<F, Fut>(
448 &self,
449 key: &str,
450 strategy: CacheStrategy,
451 compute_fn: F,
452 ) -> Result<serde_json::Value>
453 where
454 F: FnOnce() -> Fut + Send,
455 Fut: Future<Output = Result<serde_json::Value>> + Send,
456 {
457 self.total_requests.fetch_add(1, Ordering::Relaxed);
458
459 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
460 if let Some(value) = self.l1_cache.get(key).await {
461 self.l1_hits.fetch_add(1, Ordering::Relaxed);
462 return Ok(value);
463 }
464
465 // 2. L1 miss - try L2 with Cache Stampede protection
466 let key_owned = key.to_string();
467 let lock_guard = self.in_flight_requests
468 .entry(key_owned.clone())
469 .or_insert_with(|| Arc::new(Mutex::new(())))
470 .clone();
471
472 let _guard = lock_guard.lock().await;
473
474 // RAII cleanup guard - ensures entry is removed even on early return or panic
475 let _cleanup_guard = CleanupGuard {
476 map: &self.in_flight_requests,
477 key: key_owned,
478 };
479
480 // 3. Double-check L1 cache after acquiring lock
481 // (Another request might have populated it while we were waiting)
482 if let Some(value) = self.l1_cache.get(key).await {
483 self.l1_hits.fetch_add(1, Ordering::Relaxed);
484 // _cleanup_guard will auto-remove entry on drop
485 return Ok(value);
486 }
487
488 // 4. Check L2 cache with TTL
489 if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
490 self.l2_hits.fetch_add(1, Ordering::Relaxed);
491
492 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
493 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
494
495 if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
496 eprintln!("â ïļ Failed to promote key '{}' to L1: {}", key, e);
497 } else {
498 self.promotions.fetch_add(1, Ordering::Relaxed);
499 println!("âŽïļ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
500 }
501
502 // _cleanup_guard will auto-remove entry on drop
503 return Ok(value);
504 }
505
506 // 5. Both L1 and L2 miss - compute fresh data
507 println!("ðŧ Computing fresh data for key: '{}' (Cache Stampede protected)", key);
508 let fresh_data = compute_fn().await?;
509
510 // 6. Store in both caches
511 if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
512 eprintln!("â ïļ Failed to cache computed data for key '{}': {}", key, e);
513 }
514
515 // 7. _cleanup_guard will auto-remove entry on drop
516
517 Ok(fresh_data)
518 }
519
520 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
521 ///
522 /// This method provides the same functionality as `get_or_compute_with()` but with
523 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
524 /// API calls, or any computation that returns structured data.
525 ///
526 /// # Type Safety
527 ///
528 /// - Returns your actual type `T` instead of `serde_json::Value`
529 /// - Compiler enforces Serialize + DeserializeOwned bounds
530 /// - No manual JSON conversion needed
531 ///
532 /// # Cache Flow
533 ///
534 /// 1. Check L1 cache â deserialize if found
535 /// 2. Check L2 cache â deserialize + promote to L1 if found
536 /// 3. Execute compute_fn â serialize â store in L1+L2
537 /// 4. Full stampede protection (only ONE request computes)
538 ///
539 /// # Arguments
540 ///
541 /// * `key` - Cache key
542 /// * `strategy` - Cache strategy for TTL
543 /// * `compute_fn` - Async function returning `Result<T>`
544 ///
545 /// # Example - Database Query
546 ///
547 /// ```no_run
548 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
549 /// # use std::sync::Arc;
550 /// # use serde::{Serialize, Deserialize};
551 /// # async fn example() -> anyhow::Result<()> {
552 /// # let l1 = Arc::new(L1Cache::new().await?);
553 /// # let l2 = Arc::new(L2Cache::new().await?);
554 /// # let cache_manager = CacheManager::new(l1, l2);
555 ///
556 /// #[derive(Serialize, Deserialize)]
557 /// struct User {
558 /// id: i64,
559 /// name: String,
560 /// }
561 ///
562 /// // Type-safe database caching (example - requires sqlx)
563 /// // let user: User = cache_manager.get_or_compute_typed(
564 /// // "user:123",
565 /// // CacheStrategy::MediumTerm,
566 /// // || async {
567 /// // sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
568 /// // .bind(123)
569 /// // .fetch_one(&pool)
570 /// // .await
571 /// // }
572 /// // ).await?;
573 /// # Ok(())
574 /// # }
575 /// ```
576 ///
577 /// # Example - API Call
578 ///
579 /// ```no_run
580 /// # use multi_tier_cache::{CacheManager, CacheStrategy, L1Cache, L2Cache};
581 /// # use std::sync::Arc;
582 /// # use serde::{Serialize, Deserialize};
583 /// # async fn example() -> anyhow::Result<()> {
584 /// # let l1 = Arc::new(L1Cache::new().await?);
585 /// # let l2 = Arc::new(L2Cache::new().await?);
586 /// # let cache_manager = CacheManager::new(l1, l2);
587 /// #[derive(Serialize, Deserialize)]
588 /// struct ApiResponse {
589 /// data: String,
590 /// timestamp: i64,
591 /// }
592 ///
593 /// // API call caching (example - requires reqwest)
594 /// // let response: ApiResponse = cache_manager.get_or_compute_typed(
595 /// // "api:endpoint",
596 /// // CacheStrategy::RealTime,
597 /// // || async {
598 /// // reqwest::get("https://api.example.com/data")
599 /// // .await?
600 /// // .json::<ApiResponse>()
601 /// // .await
602 /// // }
603 /// // ).await?;
604 /// # Ok(())
605 /// # }
606 /// ```
607 ///
608 /// # Performance
609 ///
610 /// - L1 hit: <1ms + deserialization (~10-50Ξs for small structs)
611 /// - L2 hit: 2-5ms + deserialization + L1 promotion
612 /// - Compute: Your function time + serialization + L1+L2 storage
613 /// - Stampede protection: 99.6% latency reduction under high concurrency
614 ///
615 /// # Errors
616 ///
617 /// Returns error if:
618 /// - Compute function fails
619 /// - Serialization fails (invalid type for JSON)
620 /// - Deserialization fails (cache data doesn't match type T)
621 /// - Cache operations fail (Redis connection issues)
622 pub async fn get_or_compute_typed<T, F, Fut>(
623 &self,
624 key: &str,
625 strategy: CacheStrategy,
626 compute_fn: F,
627 ) -> Result<T>
628 where
629 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
630 F: FnOnce() -> Fut + Send,
631 Fut: Future<Output = Result<T>> + Send,
632 {
633 self.total_requests.fetch_add(1, Ordering::Relaxed);
634
635 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
636 if let Some(cached_json) = self.l1_cache.get(key).await {
637 self.l1_hits.fetch_add(1, Ordering::Relaxed);
638
639 // Attempt to deserialize from JSON to type T
640 match serde_json::from_value::<T>(cached_json) {
641 Ok(typed_value) => {
642 println!("â
[L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
643 return Ok(typed_value);
644 }
645 Err(e) => {
646 // Deserialization failed - cache data may be stale or corrupt
647 eprintln!("â ïļ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
648 // Fall through to recompute
649 }
650 }
651 }
652
653 // 2. L1 miss - try L2 with Cache Stampede protection
654 let key_owned = key.to_string();
655 let lock_guard = self.in_flight_requests
656 .entry(key_owned.clone())
657 .or_insert_with(|| Arc::new(Mutex::new(())))
658 .clone();
659
660 let _guard = lock_guard.lock().await;
661
662 // RAII cleanup guard - ensures entry is removed even on early return or panic
663 let _cleanup_guard = CleanupGuard {
664 map: &self.in_flight_requests,
665 key: key_owned,
666 };
667
668 // 3. Double-check L1 cache after acquiring lock
669 // (Another request might have populated it while we were waiting)
670 if let Some(cached_json) = self.l1_cache.get(key).await {
671 self.l1_hits.fetch_add(1, Ordering::Relaxed);
672 if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
673 println!("â
[L1 HIT] Deserialized '{}' after lock acquisition", key);
674 return Ok(typed_value);
675 }
676 }
677
678 // 4. Check L2 cache with TTL
679 if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
680 self.l2_hits.fetch_add(1, Ordering::Relaxed);
681
682 // Attempt to deserialize
683 match serde_json::from_value::<T>(cached_json.clone()) {
684 Ok(typed_value) => {
685 println!("â
[L2 HIT] Deserialized '{}' from Redis", key);
686
687 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
688 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
689
690 if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
691 eprintln!("â ïļ Failed to promote key '{}' to L1: {}", key, e);
692 } else {
693 self.promotions.fetch_add(1, Ordering::Relaxed);
694 println!("âŽïļ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
695 }
696
697 return Ok(typed_value);
698 }
699 Err(e) => {
700 eprintln!("â ïļ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
701 // Fall through to recompute
702 }
703 }
704 }
705
706 // 5. Both L1 and L2 miss (or deserialization failed) - compute fresh data
707 println!("ðŧ Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
708 let typed_value = compute_fn().await?;
709
710 // 6. Serialize to JSON for storage
711 let json_value = serde_json::to_value(&typed_value)
712 .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
713
714 // 7. Store in both L1 and L2 caches
715 if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
716 eprintln!("â ïļ Failed to cache computed typed data for key '{}': {}", key, e);
717 } else {
718 println!("ðū Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
719 }
720
721 // 8. _cleanup_guard will auto-remove entry on drop
722
723 Ok(typed_value)
724 }
725
726 /// Get comprehensive cache statistics
727 #[allow(dead_code)]
728 pub fn get_stats(&self) -> CacheManagerStats {
729 let total_reqs = self.total_requests.load(Ordering::Relaxed);
730 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
731 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
732 let misses = self.misses.load(Ordering::Relaxed);
733
734 CacheManagerStats {
735 total_requests: total_reqs,
736 l1_hits,
737 l2_hits,
738 total_hits: l1_hits + l2_hits,
739 misses,
740 hit_rate: if total_reqs > 0 {
741 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
742 } else { 0.0 },
743 l1_hit_rate: if total_reqs > 0 {
744 (l1_hits as f64 / total_reqs as f64) * 100.0
745 } else { 0.0 },
746 promotions: self.promotions.load(Ordering::Relaxed),
747 in_flight_requests: self.in_flight_requests.len(),
748 }
749 }
750
751 // ===== Redis Streams Methods =====
752
753 /// Publish data to Redis Stream
754 ///
755 /// # Arguments
756 /// * `stream_key` - Name of the stream (e.g., "events_stream")
757 /// * `fields` - Field-value pairs to publish
758 /// * `maxlen` - Optional max length for stream trimming
759 ///
760 /// # Returns
761 /// The entry ID generated by Redis
762 ///
763 /// # Errors
764 /// Returns error if streaming backend is not configured
765 pub async fn publish_to_stream(
766 &self,
767 stream_key: &str,
768 fields: Vec<(String, String)>,
769 maxlen: Option<usize>
770 ) -> Result<String> {
771 match &self.streaming_backend {
772 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
773 None => Err(anyhow::anyhow!("Streaming backend not configured"))
774 }
775 }
776
777 /// Read latest entries from Redis Stream
778 ///
779 /// # Arguments
780 /// * `stream_key` - Name of the stream
781 /// * `count` - Number of latest entries to retrieve
782 ///
783 /// # Returns
784 /// Vector of (entry_id, fields) tuples (newest first)
785 ///
786 /// # Errors
787 /// Returns error if streaming backend is not configured
788 pub async fn read_stream_latest(
789 &self,
790 stream_key: &str,
791 count: usize
792 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
793 match &self.streaming_backend {
794 Some(backend) => backend.stream_read_latest(stream_key, count).await,
795 None => Err(anyhow::anyhow!("Streaming backend not configured"))
796 }
797 }
798
799 /// Read from Redis Stream with optional blocking
800 ///
801 /// # Arguments
802 /// * `stream_key` - Name of the stream
803 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
804 /// * `count` - Max entries to retrieve
805 /// * `block_ms` - Optional blocking timeout in ms
806 ///
807 /// # Returns
808 /// Vector of (entry_id, fields) tuples
809 ///
810 /// # Errors
811 /// Returns error if streaming backend is not configured
812 pub async fn read_stream(
813 &self,
814 stream_key: &str,
815 last_id: &str,
816 count: usize,
817 block_ms: Option<usize>
818 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
819 match &self.streaming_backend {
820 Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
821 None => Err(anyhow::anyhow!("Streaming backend not configured"))
822 }
823 }
824
825 // ===== Cache Invalidation Methods =====
826
827 /// Invalidate a cache key across all instances
828 ///
829 /// This removes the key from both L1 and L2 caches and broadcasts
830 /// the invalidation to all other cache instances via Redis Pub/Sub.
831 ///
832 /// # Arguments
833 /// * `key` - Cache key to invalidate
834 ///
835 /// # Example
836 /// ```rust,ignore
837 /// // Invalidate user cache after profile update
838 /// cache_manager.invalidate("user:123").await?;
839 /// ```
840 pub async fn invalidate(&self, key: &str) -> Result<()> {
841 // Remove from local L1 cache
842 self.l1_cache.remove(key).await?;
843
844 // Remove from L2 cache
845 self.l2_cache.remove(key).await?;
846
847 // Broadcast to other instances
848 if let Some(publisher) = &self.invalidation_publisher {
849 let mut pub_lock = publisher.lock().await;
850 let msg = InvalidationMessage::remove(key);
851 pub_lock.publish(&msg).await?;
852 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
853 }
854
855 println!("ðïļ Invalidated '{}' across all instances", key);
856 Ok(())
857 }
858
859 /// Update cache value across all instances
860 ///
861 /// This updates the key in both L1 and L2 caches and broadcasts
862 /// the update to all other cache instances, avoiding cache misses.
863 ///
864 /// # Arguments
865 /// * `key` - Cache key to update
866 /// * `value` - New value
867 /// * `ttl` - Optional TTL (uses default if None)
868 ///
869 /// # Example
870 /// ```rust,ignore
871 /// // Update user cache with new data
872 /// let user_data = serde_json::json!({"id": 123, "name": "Alice"});
873 /// cache_manager.update_cache("user:123", user_data, Some(Duration::from_secs(3600))).await?;
874 /// ```
875 pub async fn update_cache(
876 &self,
877 key: &str,
878 value: serde_json::Value,
879 ttl: Option<Duration>,
880 ) -> Result<()> {
881 let ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
882
883 // Update local L1 cache
884 self.l1_cache.set_with_ttl(key, value.clone(), ttl).await?;
885
886 // Update L2 cache
887 self.l2_cache.set_with_ttl(key, value.clone(), ttl).await?;
888
889 // Broadcast update to other instances
890 if let Some(publisher) = &self.invalidation_publisher {
891 let mut pub_lock = publisher.lock().await;
892 let msg = InvalidationMessage::update(key, value, Some(ttl));
893 pub_lock.publish(&msg).await?;
894 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
895 }
896
897 println!("ð Updated '{}' across all instances", key);
898 Ok(())
899 }
900
901 /// Invalidate all keys matching a pattern
902 ///
903 /// This scans L2 cache for keys matching the pattern, removes them,
904 /// and broadcasts the invalidation. L1 caches will be cleared via broadcast.
905 ///
906 /// # Arguments
907 /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
908 ///
909 /// # Example
910 /// ```rust,ignore
911 /// // Invalidate all user caches
912 /// cache_manager.invalidate_pattern("user:*").await?;
913 ///
914 /// // Invalidate specific user's related caches
915 /// cache_manager.invalidate_pattern("user:123:*").await?;
916 /// ```
917 pub async fn invalidate_pattern(&self, pattern: &str) -> Result<()> {
918 // Scan L2 for matching keys
919 let keys = if let Some(l2) = &self.l2_cache_concrete {
920 l2.scan_keys(pattern).await?
921 } else {
922 return Err(anyhow::anyhow!("Pattern invalidation requires concrete L2Cache instance"));
923 };
924
925 if keys.is_empty() {
926 println!("ð No keys found matching pattern '{}'", pattern);
927 return Ok(());
928 }
929
930 // Remove from L2 in bulk
931 if let Some(l2) = &self.l2_cache_concrete {
932 l2.remove_bulk(&keys).await?;
933 }
934
935 // Broadcast pattern invalidation
936 if let Some(publisher) = &self.invalidation_publisher {
937 let mut pub_lock = publisher.lock().await;
938 let msg = InvalidationMessage::remove_bulk(keys.clone());
939 pub_lock.publish(&msg).await?;
940 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
941 }
942
943 println!("ð Invalidated {} keys matching pattern '{}'", keys.len(), pattern);
944 Ok(())
945 }
946
947 /// Set value with automatic broadcast to all instances
948 ///
949 /// This is a write-through operation that updates the cache and
950 /// broadcasts the update to all other instances automatically.
951 ///
952 /// # Arguments
953 /// * `key` - Cache key
954 /// * `value` - Value to cache
955 /// * `strategy` - Cache strategy (determines TTL)
956 ///
957 /// # Example
958 /// ```rust,ignore
959 /// // Update and broadcast in one call
960 /// let data = serde_json::json!({"status": "active"});
961 /// cache_manager.set_with_broadcast("user:123", data, CacheStrategy::MediumTerm).await?;
962 /// ```
963 pub async fn set_with_broadcast(
964 &self,
965 key: &str,
966 value: serde_json::Value,
967 strategy: CacheStrategy,
968 ) -> Result<()> {
969 let ttl = strategy.to_duration();
970
971 // Set in local caches
972 self.set_with_strategy(key, value.clone(), strategy).await?;
973
974 // Broadcast update if invalidation is enabled
975 if let Some(publisher) = &self.invalidation_publisher {
976 let mut pub_lock = publisher.lock().await;
977 let msg = InvalidationMessage::update(key, value, Some(ttl));
978 pub_lock.publish(&msg).await?;
979 self.invalidation_stats.messages_sent.fetch_add(1, Ordering::Relaxed);
980 }
981
982 Ok(())
983 }
984
985 /// Get invalidation statistics
986 ///
987 /// Returns statistics about invalidation operations if invalidation is enabled.
988 pub fn get_invalidation_stats(&self) -> Option<InvalidationStats> {
989 if self.invalidation_subscriber.is_some() {
990 Some(self.invalidation_stats.snapshot())
991 } else {
992 None
993 }
994 }
995}
996
997/// Cache Manager statistics
998#[allow(dead_code)]
999#[derive(Debug, Clone)]
1000pub struct CacheManagerStats {
1001 pub total_requests: u64,
1002 pub l1_hits: u64,
1003 pub l2_hits: u64,
1004 pub total_hits: u64,
1005 pub misses: u64,
1006 pub hit_rate: f64,
1007 pub l1_hit_rate: f64,
1008 pub promotions: usize,
1009 pub in_flight_requests: usize,
1010}