multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use super::l1_cache::L1Cache;
15use super::l2_cache::L2Cache;
16
17/// RAII cleanup guard for in-flight request tracking
18/// Ensures that entries are removed from DashMap even on early return or panic
19struct CleanupGuard<'a> {
20 map: &'a DashMap<String, Arc<Mutex<()>>>,
21 key: String,
22}
23
24impl<'a> Drop for CleanupGuard<'a> {
25 fn drop(&mut self) {
26 self.map.remove(&self.key);
27 }
28}
29
30/// Cache strategies for different data types
31#[derive(Debug, Clone)]
32#[allow(dead_code)]
33pub enum CacheStrategy {
34 /// Real-time data - 10 seconds TTL
35 RealTime,
36 /// Short-term data - 5 minutes TTL
37 ShortTerm,
38 /// Medium-term data - 1 hour TTL
39 MediumTerm,
40 /// Long-term data - 3 hours TTL
41 LongTerm,
42 /// Custom TTL
43 Custom(Duration),
44 /// Default strategy (5 minutes)
45 Default,
46}
47
48impl CacheStrategy {
49 /// Convert strategy to duration
50 pub fn to_duration(&self) -> Duration {
51 match self {
52 Self::RealTime => Duration::from_secs(10),
53 Self::ShortTerm => Duration::from_secs(300), // 5 minutes
54 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
55 Self::LongTerm => Duration::from_secs(10800), // 3 hours
56 Self::Custom(duration) => *duration,
57 Self::Default => Duration::from_secs(300),
58 }
59 }
60}
61
62/// Cache Manager - Unified operations across L1 and L2
63pub struct CacheManager {
64 /// L1 Cache (Moka)
65 l1_cache: Arc<L1Cache>,
66 /// L2 Cache (Redis)
67 l2_cache: Arc<L2Cache>,
68 /// Statistics
69 total_requests: Arc<AtomicU64>,
70 l1_hits: Arc<AtomicU64>,
71 l2_hits: Arc<AtomicU64>,
72 misses: Arc<AtomicU64>,
73 promotions: Arc<AtomicUsize>,
74 /// In-flight requests to prevent Cache Stampede on L2/compute operations
75 in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
76}
77
78impl CacheManager {
79 /// Create new cache manager
80 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
81 println!(" 🎯 Initializing Cache Manager...");
82
83 Ok(Self {
84 l1_cache,
85 l2_cache,
86 total_requests: Arc::new(AtomicU64::new(0)),
87 l1_hits: Arc::new(AtomicU64::new(0)),
88 l2_hits: Arc::new(AtomicU64::new(0)),
89 misses: Arc::new(AtomicU64::new(0)),
90 promotions: Arc::new(AtomicUsize::new(0)),
91 in_flight_requests: Arc::new(DashMap::new()),
92 })
93 }
94
95 /// Get value from cache (L1 first, then L2 fallback with promotion)
96 ///
97 /// This method now includes built-in Cache Stampede protection when cache misses occur.
98 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
99 /// unnecessary duplicate work on external data sources.
100 ///
101 /// # Arguments
102 /// * `key` - Cache key to retrieve
103 ///
104 /// # Returns
105 /// * `Ok(Some(value))` - Cache hit, value found in L1 or L2
106 /// * `Ok(None)` - Cache miss, value not found in either cache
107 /// * `Err(error)` - Cache operation failed
108 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
109 self.total_requests.fetch_add(1, Ordering::Relaxed);
110
111 // Fast path: Try L1 first (no locking needed)
112 if let Some(value) = self.l1_cache.get(key).await {
113 self.l1_hits.fetch_add(1, Ordering::Relaxed);
114 return Ok(Some(value));
115 }
116
117 // L1 miss - implement Cache Stampede protection for L2 lookup
118 let key_owned = key.to_string();
119 let lock_guard = self.in_flight_requests
120 .entry(key_owned.clone())
121 .or_insert_with(|| Arc::new(Mutex::new(())))
122 .clone();
123
124 let _guard = lock_guard.lock().await;
125
126 // RAII cleanup guard - ensures entry is removed even on early return or panic
127 let cleanup_guard = CleanupGuard {
128 map: &self.in_flight_requests,
129 key: key_owned.clone(),
130 };
131
132 // Double-check L1 cache after acquiring lock
133 // (Another concurrent request might have populated it while we were waiting)
134 if let Some(value) = self.l1_cache.get(key).await {
135 self.l1_hits.fetch_add(1, Ordering::Relaxed);
136 // cleanup_guard will auto-remove entry on drop
137 return Ok(Some(value));
138 }
139
140 // Check L2 cache
141 if let Some(value) = self.l2_cache.get(key).await {
142 self.l2_hits.fetch_add(1, Ordering::Relaxed);
143
144 // Promote to L1 for faster access next time
145 if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), Duration::from_secs(300)).await {
146 // L1 promotion failed, but we still have the data
147 eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
148 } else {
149 self.promotions.fetch_add(1, Ordering::Relaxed);
150 println!("⬆️ Promoted '{}' from L2 to L1 (via get)", key);
151 }
152
153 // cleanup_guard will auto-remove entry on drop
154 return Ok(Some(value));
155 }
156
157 // Both L1 and L2 miss
158 self.misses.fetch_add(1, Ordering::Relaxed);
159
160 // cleanup_guard will auto-remove entry on drop
161 drop(cleanup_guard);
162
163 Ok(None)
164 }
165
166 /// Get value from cache with fallback computation (enhanced backward compatibility)
167 ///
168 /// This is a convenience method that combines `get()` with optional computation.
169 /// If the value is not found in cache, it will execute the compute function
170 /// and cache the result automatically.
171 ///
172 /// # Arguments
173 /// * `key` - Cache key
174 /// * `compute_fn` - Optional function to compute value if not in cache
175 /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
176 ///
177 /// # Returns
178 /// * `Ok(Some(value))` - Value found in cache or computed successfully
179 /// * `Ok(None)` - Value not in cache and no compute function provided
180 /// * `Err(error)` - Cache operation or computation failed
181 ///
182 /// # Example
183 /// ```rust
184 /// // Simple cache get (existing behavior)
185 /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
186 ///
187 /// // Get with computation fallback (new enhanced behavior)
188 /// let api_data = cache_manager.get_with_fallback(
189 /// "api_response",
190 /// Some(|| async { fetch_data_from_api().await }),
191 /// Some(CacheStrategy::RealTime)
192 /// ).await?;
193 /// ```
194
195 /// Set value with specific cache strategy (both L1 and L2)
196 pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
197 let ttl = strategy.to_duration();
198
199 // Store in both L1 and L2
200 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
201 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
202
203 // Return success if at least one cache succeeded
204 match (l1_result, l2_result) {
205 (Ok(_), Ok(_)) => {
206 // Both succeeded
207 println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
208 Ok(())
209 }
210 (Ok(_), Err(_)) => {
211 // L1 succeeded, L2 failed
212 eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
213 println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
214 Ok(())
215 }
216 (Err(_), Ok(_)) => {
217 // L1 failed, L2 succeeded
218 eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
219 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
220 Ok(())
221 }
222 (Err(e1), Err(_e2)) => {
223 // Both failed
224 Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
225 }
226 }
227 }
228
229 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
230 ///
231 /// This method provides comprehensive Cache Stampede protection:
232 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
233 /// 2. Check L2 cache with mutex-based coalescing
234 /// 3. Compute fresh data with protection against concurrent computations
235 ///
236 /// # Arguments
237 /// * `key` - Cache key
238 /// * `strategy` - Cache strategy for TTL and storage behavior
239 /// * `compute_fn` - Async function to compute the value if not in any cache
240 ///
241 /// # Example
242 /// ```rust
243 /// let api_data = cache_manager.get_or_compute_with(
244 /// "api_response",
245 /// CacheStrategy::RealTime,
246 /// || async {
247 /// fetch_data_from_api().await
248 /// }
249 /// ).await?;
250 /// ```
251 #[allow(dead_code)]
252 pub async fn get_or_compute_with<F, Fut>(
253 &self,
254 key: &str,
255 strategy: CacheStrategy,
256 compute_fn: F,
257 ) -> Result<serde_json::Value>
258 where
259 F: FnOnce() -> Fut + Send,
260 Fut: Future<Output = Result<serde_json::Value>> + Send,
261 {
262 self.total_requests.fetch_add(1, Ordering::Relaxed);
263
264 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
265 if let Some(value) = self.l1_cache.get(key).await {
266 self.l1_hits.fetch_add(1, Ordering::Relaxed);
267 return Ok(value);
268 }
269
270 // 2. L1 miss - try L2 with Cache Stampede protection
271 let key_owned = key.to_string();
272 let lock_guard = self.in_flight_requests
273 .entry(key_owned.clone())
274 .or_insert_with(|| Arc::new(Mutex::new(())))
275 .clone();
276
277 let _guard = lock_guard.lock().await;
278
279 // RAII cleanup guard - ensures entry is removed even on early return or panic
280 let _cleanup_guard = CleanupGuard {
281 map: &self.in_flight_requests,
282 key: key_owned,
283 };
284
285 // 3. Double-check L1 cache after acquiring lock
286 // (Another request might have populated it while we were waiting)
287 if let Some(value) = self.l1_cache.get(key).await {
288 self.l1_hits.fetch_add(1, Ordering::Relaxed);
289 // _cleanup_guard will auto-remove entry on drop
290 return Ok(value);
291 }
292
293 // 4. Check L2 cache
294 if let Some(value) = self.l2_cache.get(key).await {
295 self.l2_hits.fetch_add(1, Ordering::Relaxed);
296
297 // Promote to L1 for future requests
298 let ttl = strategy.to_duration();
299 if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await {
300 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
301 } else {
302 self.promotions.fetch_add(1, Ordering::Relaxed);
303 println!("⬆️ Promoted '{}' from L2 to L1", key);
304 }
305
306 // _cleanup_guard will auto-remove entry on drop
307 return Ok(value);
308 }
309
310 // 5. Both L1 and L2 miss - compute fresh data
311 println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
312 let fresh_data = compute_fn().await?;
313
314 // 6. Store in both caches
315 if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
316 eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
317 }
318
319 // 7. _cleanup_guard will auto-remove entry on drop
320
321 Ok(fresh_data)
322 }
323
324 /// Get comprehensive cache statistics
325 #[allow(dead_code)]
326 pub fn get_stats(&self) -> CacheManagerStats {
327 let total_reqs = self.total_requests.load(Ordering::Relaxed);
328 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
329 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
330 let misses = self.misses.load(Ordering::Relaxed);
331
332 CacheManagerStats {
333 total_requests: total_reqs,
334 l1_hits,
335 l2_hits,
336 total_hits: l1_hits + l2_hits,
337 misses,
338 hit_rate: if total_reqs > 0 {
339 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
340 } else { 0.0 },
341 l1_hit_rate: if total_reqs > 0 {
342 (l1_hits as f64 / total_reqs as f64) * 100.0
343 } else { 0.0 },
344 promotions: self.promotions.load(Ordering::Relaxed),
345 in_flight_requests: self.in_flight_requests.len(),
346 }
347 }
348
349 // ===== Redis Streams Methods =====
350
351 /// Publish data to Redis Stream
352 ///
353 /// # Arguments
354 /// * `stream_key` - Name of the stream (e.g., "events_stream")
355 /// * `fields` - Field-value pairs to publish
356 /// * `maxlen` - Optional max length for stream trimming
357 ///
358 /// # Returns
359 /// The entry ID generated by Redis
360 pub async fn publish_to_stream(
361 &self,
362 stream_key: &str,
363 fields: Vec<(String, String)>,
364 maxlen: Option<usize>
365 ) -> Result<String> {
366 self.l2_cache.stream_add(stream_key, fields, maxlen).await
367 }
368
369 /// Read latest entries from Redis Stream
370 ///
371 /// # Arguments
372 /// * `stream_key` - Name of the stream
373 /// * `count` - Number of latest entries to retrieve
374 ///
375 /// # Returns
376 /// Vector of (entry_id, fields) tuples (newest first)
377 pub async fn read_stream_latest(
378 &self,
379 stream_key: &str,
380 count: usize
381 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
382 self.l2_cache.stream_read_latest(stream_key, count).await
383 }
384
385 /// Read from Redis Stream with optional blocking
386 ///
387 /// # Arguments
388 /// * `stream_key` - Name of the stream
389 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
390 /// * `count` - Max entries to retrieve
391 /// * `block_ms` - Optional blocking timeout in ms
392 ///
393 /// # Returns
394 /// Vector of (entry_id, fields) tuples
395 pub async fn read_stream(
396 &self,
397 stream_key: &str,
398 last_id: &str,
399 count: usize,
400 block_ms: Option<usize>
401 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
402 self.l2_cache.stream_read(stream_key, last_id, count, block_ms).await
403 }
404}
405
406/// Cache Manager statistics
407#[allow(dead_code)]
408#[derive(Debug, Clone)]
409pub struct CacheManagerStats {
410 pub total_requests: u64,
411 pub l1_hits: u64,
412 pub l2_hits: u64,
413 pub total_hits: u64,
414 pub misses: u64,
415 pub hit_rate: f64,
416 pub l1_hit_rate: f64,
417 pub promotions: usize,
418 pub in_flight_requests: usize,
419}