multi_tier_cache/cache_manager.rs
1//! Cache Manager - Unified Cache Operations
2//!
3//! Manages operations across L1 (Moka) and L2 (Redis) caches with intelligent fallback.
4
5use std::sync::Arc;
6use std::time::Duration;
7use std::future::Future;
8use anyhow::Result;
9use serde_json;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use dashmap::DashMap;
12use tokio::sync::Mutex;
13
14use super::l1_cache::L1Cache;
15use super::l2_cache::L2Cache;
16use super::traits::{CacheBackend, L2CacheBackend, StreamingBackend};
17
18/// RAII cleanup guard for in-flight request tracking
19/// Ensures that entries are removed from DashMap even on early return or panic
20struct CleanupGuard<'a> {
21 map: &'a DashMap<String, Arc<Mutex<()>>>,
22 key: String,
23}
24
25impl<'a> Drop for CleanupGuard<'a> {
26 fn drop(&mut self) {
27 self.map.remove(&self.key);
28 }
29}
30
31/// Cache strategies for different data types
32#[derive(Debug, Clone)]
33#[allow(dead_code)]
34pub enum CacheStrategy {
35 /// Real-time data - 10 seconds TTL
36 RealTime,
37 /// Short-term data - 5 minutes TTL
38 ShortTerm,
39 /// Medium-term data - 1 hour TTL
40 MediumTerm,
41 /// Long-term data - 3 hours TTL
42 LongTerm,
43 /// Custom TTL
44 Custom(Duration),
45 /// Default strategy (5 minutes)
46 Default,
47}
48
49impl CacheStrategy {
50 /// Convert strategy to duration
51 pub fn to_duration(&self) -> Duration {
52 match self {
53 Self::RealTime => Duration::from_secs(10),
54 Self::ShortTerm => Duration::from_secs(300), // 5 minutes
55 Self::MediumTerm => Duration::from_secs(3600), // 1 hour
56 Self::LongTerm => Duration::from_secs(10800), // 3 hours
57 Self::Custom(duration) => *duration,
58 Self::Default => Duration::from_secs(300),
59 }
60 }
61}
62
63/// Cache Manager - Unified operations across L1 and L2
64pub struct CacheManager {
65 /// L1 Cache (trait object for pluggable backends)
66 l1_cache: Arc<dyn CacheBackend>,
67 /// L2 Cache (trait object for pluggable backends)
68 l2_cache: Arc<dyn L2CacheBackend>,
69 /// Optional streaming backend (defaults to L2 if it implements StreamingBackend)
70 streaming_backend: Option<Arc<dyn StreamingBackend>>,
71 /// Statistics
72 total_requests: Arc<AtomicU64>,
73 l1_hits: Arc<AtomicU64>,
74 l2_hits: Arc<AtomicU64>,
75 misses: Arc<AtomicU64>,
76 promotions: Arc<AtomicUsize>,
77 /// In-flight requests to prevent Cache Stampede on L2/compute operations
78 in_flight_requests: Arc<DashMap<String, Arc<Mutex<()>>>>,
79}
80
81impl CacheManager {
82 /// Create new cache manager with trait objects (pluggable backends)
83 ///
84 /// This is the primary constructor for v0.3.0+, supporting custom cache backends.
85 ///
86 /// # Arguments
87 ///
88 /// * `l1_cache` - Any L1 cache backend implementing `CacheBackend` trait
89 /// * `l2_cache` - Any L2 cache backend implementing `L2CacheBackend` trait
90 /// * `streaming_backend` - Optional streaming backend (None to disable streaming)
91 ///
92 /// # Example
93 ///
94 /// ```rust,ignore
95 /// use multi_tier_cache::{CacheManager, L1Cache, L2Cache};
96 /// use std::sync::Arc;
97 ///
98 /// let l1: Arc<dyn CacheBackend> = Arc::new(L1Cache::new().await?);
99 /// let l2: Arc<dyn L2CacheBackend> = Arc::new(L2Cache::new().await?);
100 ///
101 /// let manager = CacheManager::new_with_backends(l1, l2, None).await?;
102 /// ```
103 pub async fn new_with_backends(
104 l1_cache: Arc<dyn CacheBackend>,
105 l2_cache: Arc<dyn L2CacheBackend>,
106 streaming_backend: Option<Arc<dyn StreamingBackend>>,
107 ) -> Result<Self> {
108 println!(" 🎯 Initializing Cache Manager with custom backends...");
109 println!(" L1: {}", l1_cache.name());
110 println!(" L2: {}", l2_cache.name());
111 if streaming_backend.is_some() {
112 println!(" Streaming: enabled");
113 } else {
114 println!(" Streaming: disabled");
115 }
116
117 Ok(Self {
118 l1_cache,
119 l2_cache,
120 streaming_backend,
121 total_requests: Arc::new(AtomicU64::new(0)),
122 l1_hits: Arc::new(AtomicU64::new(0)),
123 l2_hits: Arc::new(AtomicU64::new(0)),
124 misses: Arc::new(AtomicU64::new(0)),
125 promotions: Arc::new(AtomicUsize::new(0)),
126 in_flight_requests: Arc::new(DashMap::new()),
127 })
128 }
129
130 /// Create new cache manager with default backends (backward compatible)
131 ///
132 /// This is the legacy constructor maintained for backward compatibility.
133 /// New code should prefer `new_with_backends()` or `CacheSystemBuilder`.
134 ///
135 /// # Arguments
136 ///
137 /// * `l1_cache` - Moka L1 cache instance
138 /// * `l2_cache` - Redis L2 cache instance
139 pub async fn new(l1_cache: Arc<L1Cache>, l2_cache: Arc<L2Cache>) -> Result<Self> {
140 println!(" 🎯 Initializing Cache Manager...");
141
142 // Convert concrete types to trait objects
143 let l1_backend: Arc<dyn CacheBackend> = l1_cache.clone();
144 let l2_backend: Arc<dyn L2CacheBackend> = l2_cache.clone();
145 // L2Cache also implements StreamingBackend, so use it for streaming
146 let streaming_backend: Arc<dyn StreamingBackend> = l2_cache;
147
148 Self::new_with_backends(l1_backend, l2_backend, Some(streaming_backend)).await
149 }
150
151 /// Get value from cache (L1 first, then L2 fallback with promotion)
152 ///
153 /// This method now includes built-in Cache Stampede protection when cache misses occur.
154 /// Multiple concurrent requests for the same missing key will be coalesced to prevent
155 /// unnecessary duplicate work on external data sources.
156 ///
157 /// # Arguments
158 /// * `key` - Cache key to retrieve
159 ///
160 /// # Returns
161 /// * `Ok(Some(value))` - Cache hit, value found in L1 or L2
162 /// * `Ok(None)` - Cache miss, value not found in either cache
163 /// * `Err(error)` - Cache operation failed
164 pub async fn get(&self, key: &str) -> Result<Option<serde_json::Value>> {
165 self.total_requests.fetch_add(1, Ordering::Relaxed);
166
167 // Fast path: Try L1 first (no locking needed)
168 if let Some(value) = self.l1_cache.get(key).await {
169 self.l1_hits.fetch_add(1, Ordering::Relaxed);
170 return Ok(Some(value));
171 }
172
173 // L1 miss - implement Cache Stampede protection for L2 lookup
174 let key_owned = key.to_string();
175 let lock_guard = self.in_flight_requests
176 .entry(key_owned.clone())
177 .or_insert_with(|| Arc::new(Mutex::new(())))
178 .clone();
179
180 let _guard = lock_guard.lock().await;
181
182 // RAII cleanup guard - ensures entry is removed even on early return or panic
183 let cleanup_guard = CleanupGuard {
184 map: &self.in_flight_requests,
185 key: key_owned.clone(),
186 };
187
188 // Double-check L1 cache after acquiring lock
189 // (Another concurrent request might have populated it while we were waiting)
190 if let Some(value) = self.l1_cache.get(key).await {
191 self.l1_hits.fetch_add(1, Ordering::Relaxed);
192 // cleanup_guard will auto-remove entry on drop
193 return Ok(Some(value));
194 }
195
196 // Check L2 cache with TTL information
197 if let Some((value, ttl)) = self.l2_cache.get_with_ttl(key).await {
198 self.l2_hits.fetch_add(1, Ordering::Relaxed);
199
200 // Promote to L1 with same TTL as Redis (or default if no TTL)
201 let promotion_ttl = ttl.unwrap_or_else(|| CacheStrategy::Default.to_duration());
202
203 if let Err(_) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
204 // L1 promotion failed, but we still have the data
205 eprintln!("⚠️ Failed to promote key '{}' to L1 cache", key);
206 } else {
207 self.promotions.fetch_add(1, Ordering::Relaxed);
208 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?} (via get)", key, promotion_ttl);
209 }
210
211 // cleanup_guard will auto-remove entry on drop
212 return Ok(Some(value));
213 }
214
215 // Both L1 and L2 miss
216 self.misses.fetch_add(1, Ordering::Relaxed);
217
218 // cleanup_guard will auto-remove entry on drop
219 drop(cleanup_guard);
220
221 Ok(None)
222 }
223
224 /// Get value from cache with fallback computation (enhanced backward compatibility)
225 ///
226 /// This is a convenience method that combines `get()` with optional computation.
227 /// If the value is not found in cache, it will execute the compute function
228 /// and cache the result automatically.
229 ///
230 /// # Arguments
231 /// * `key` - Cache key
232 /// * `compute_fn` - Optional function to compute value if not in cache
233 /// * `strategy` - Cache strategy for storing computed value (default: ShortTerm)
234 ///
235 /// # Returns
236 /// * `Ok(Some(value))` - Value found in cache or computed successfully
237 /// * `Ok(None)` - Value not in cache and no compute function provided
238 /// * `Err(error)` - Cache operation or computation failed
239 ///
240 /// # Example
241 /// ```rust
242 /// // Simple cache get (existing behavior)
243 /// let cached_data = cache_manager.get_with_fallback("my_key", None, None).await?;
244 ///
245 /// // Get with computation fallback (new enhanced behavior)
246 /// let api_data = cache_manager.get_with_fallback(
247 /// "api_response",
248 /// Some(|| async { fetch_data_from_api().await }),
249 /// Some(CacheStrategy::RealTime)
250 /// ).await?;
251 /// ```
252
253 /// Set value with specific cache strategy (both L1 and L2)
254 pub async fn set_with_strategy(&self, key: &str, value: serde_json::Value, strategy: CacheStrategy) -> Result<()> {
255 let ttl = strategy.to_duration();
256
257 // Store in both L1 and L2
258 let l1_result = self.l1_cache.set_with_ttl(key, value.clone(), ttl).await;
259 let l2_result = self.l2_cache.set_with_ttl(key, value, ttl).await;
260
261 // Return success if at least one cache succeeded
262 match (l1_result, l2_result) {
263 (Ok(_), Ok(_)) => {
264 // Both succeeded
265 println!("💾 [L1+L2] Cached '{}' with TTL {:?}", key, ttl);
266 Ok(())
267 }
268 (Ok(_), Err(_)) => {
269 // L1 succeeded, L2 failed
270 eprintln!("⚠️ L2 cache set failed for key '{}', continuing with L1", key);
271 println!("💾 [L1] Cached '{}' with TTL {:?}", key, ttl);
272 Ok(())
273 }
274 (Err(_), Ok(_)) => {
275 // L1 failed, L2 succeeded
276 eprintln!("⚠️ L1 cache set failed for key '{}', continuing with L2", key);
277 println!("💾 [L2] Cached '{}' with TTL {:?}", key, ttl);
278 Ok(())
279 }
280 (Err(e1), Err(_e2)) => {
281 // Both failed
282 Err(anyhow::anyhow!("Both L1 and L2 cache set failed for key '{}': {}", key, e1))
283 }
284 }
285 }
286
287 /// Get or compute value with Cache Stampede protection across L1+L2+Compute
288 ///
289 /// This method provides comprehensive Cache Stampede protection:
290 /// 1. Check L1 cache first (uses Moka's built-in coalescing)
291 /// 2. Check L2 cache with mutex-based coalescing
292 /// 3. Compute fresh data with protection against concurrent computations
293 ///
294 /// # Arguments
295 /// * `key` - Cache key
296 /// * `strategy` - Cache strategy for TTL and storage behavior
297 /// * `compute_fn` - Async function to compute the value if not in any cache
298 ///
299 /// # Example
300 /// ```rust
301 /// let api_data = cache_manager.get_or_compute_with(
302 /// "api_response",
303 /// CacheStrategy::RealTime,
304 /// || async {
305 /// fetch_data_from_api().await
306 /// }
307 /// ).await?;
308 /// ```
309 #[allow(dead_code)]
310 pub async fn get_or_compute_with<F, Fut>(
311 &self,
312 key: &str,
313 strategy: CacheStrategy,
314 compute_fn: F,
315 ) -> Result<serde_json::Value>
316 where
317 F: FnOnce() -> Fut + Send,
318 Fut: Future<Output = Result<serde_json::Value>> + Send,
319 {
320 self.total_requests.fetch_add(1, Ordering::Relaxed);
321
322 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
323 if let Some(value) = self.l1_cache.get(key).await {
324 self.l1_hits.fetch_add(1, Ordering::Relaxed);
325 return Ok(value);
326 }
327
328 // 2. L1 miss - try L2 with Cache Stampede protection
329 let key_owned = key.to_string();
330 let lock_guard = self.in_flight_requests
331 .entry(key_owned.clone())
332 .or_insert_with(|| Arc::new(Mutex::new(())))
333 .clone();
334
335 let _guard = lock_guard.lock().await;
336
337 // RAII cleanup guard - ensures entry is removed even on early return or panic
338 let _cleanup_guard = CleanupGuard {
339 map: &self.in_flight_requests,
340 key: key_owned,
341 };
342
343 // 3. Double-check L1 cache after acquiring lock
344 // (Another request might have populated it while we were waiting)
345 if let Some(value) = self.l1_cache.get(key).await {
346 self.l1_hits.fetch_add(1, Ordering::Relaxed);
347 // _cleanup_guard will auto-remove entry on drop
348 return Ok(value);
349 }
350
351 // 4. Check L2 cache with TTL
352 if let Some((value, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
353 self.l2_hits.fetch_add(1, Ordering::Relaxed);
354
355 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
356 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
357
358 if let Err(e) = self.l1_cache.set_with_ttl(key, value.clone(), promotion_ttl).await {
359 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
360 } else {
361 self.promotions.fetch_add(1, Ordering::Relaxed);
362 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
363 }
364
365 // _cleanup_guard will auto-remove entry on drop
366 return Ok(value);
367 }
368
369 // 5. Both L1 and L2 miss - compute fresh data
370 println!("💻 Computing fresh data for key: '{}' (Cache Stampede protected)", key);
371 let fresh_data = compute_fn().await?;
372
373 // 6. Store in both caches
374 if let Err(e) = self.set_with_strategy(key, fresh_data.clone(), strategy).await {
375 eprintln!("⚠️ Failed to cache computed data for key '{}': {}", key, e);
376 }
377
378 // 7. _cleanup_guard will auto-remove entry on drop
379
380 Ok(fresh_data)
381 }
382
383 /// Get or compute typed value with Cache Stampede protection (Type-Safe Version)
384 ///
385 /// This method provides the same functionality as `get_or_compute_with()` but with
386 /// **type-safe** automatic serialization/deserialization. Perfect for database queries,
387 /// API calls, or any computation that returns structured data.
388 ///
389 /// # Type Safety
390 ///
391 /// - Returns your actual type `T` instead of `serde_json::Value`
392 /// - Compiler enforces Serialize + DeserializeOwned bounds
393 /// - No manual JSON conversion needed
394 ///
395 /// # Cache Flow
396 ///
397 /// 1. Check L1 cache → deserialize if found
398 /// 2. Check L2 cache → deserialize + promote to L1 if found
399 /// 3. Execute compute_fn → serialize → store in L1+L2
400 /// 4. Full stampede protection (only ONE request computes)
401 ///
402 /// # Arguments
403 ///
404 /// * `key` - Cache key
405 /// * `strategy` - Cache strategy for TTL
406 /// * `compute_fn` - Async function returning `Result<T>`
407 ///
408 /// # Example - Database Query
409 ///
410 /// ```rust
411 /// use serde::{Serialize, Deserialize};
412 ///
413 /// #[derive(Serialize, Deserialize)]
414 /// struct User {
415 /// id: i64,
416 /// name: String,
417 /// }
418 ///
419 /// // Type-safe database caching
420 /// let user: User = cache_manager.get_or_compute_typed(
421 /// "user:123",
422 /// CacheStrategy::MediumTerm,
423 /// || async {
424 /// // Your database query here
425 /// sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
426 /// .bind(123)
427 /// .fetch_one(&pool)
428 /// .await
429 /// }
430 /// ).await?;
431 /// ```
432 ///
433 /// # Example - API Call
434 ///
435 /// ```rust
436 /// #[derive(Serialize, Deserialize)]
437 /// struct ApiResponse {
438 /// data: String,
439 /// timestamp: i64,
440 /// }
441 ///
442 /// let response: ApiResponse = cache_manager.get_or_compute_typed(
443 /// "api:endpoint",
444 /// CacheStrategy::RealTime,
445 /// || async {
446 /// reqwest::get("https://api.example.com/data")
447 /// .await?
448 /// .json::<ApiResponse>()
449 /// .await
450 /// }
451 /// ).await?;
452 /// ```
453 ///
454 /// # Performance
455 ///
456 /// - L1 hit: <1ms + deserialization (~10-50μs for small structs)
457 /// - L2 hit: 2-5ms + deserialization + L1 promotion
458 /// - Compute: Your function time + serialization + L1+L2 storage
459 /// - Stampede protection: 99.6% latency reduction under high concurrency
460 ///
461 /// # Errors
462 ///
463 /// Returns error if:
464 /// - Compute function fails
465 /// - Serialization fails (invalid type for JSON)
466 /// - Deserialization fails (cache data doesn't match type T)
467 /// - Cache operations fail (Redis connection issues)
468 pub async fn get_or_compute_typed<T, F, Fut>(
469 &self,
470 key: &str,
471 strategy: CacheStrategy,
472 compute_fn: F,
473 ) -> Result<T>
474 where
475 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
476 F: FnOnce() -> Fut + Send,
477 Fut: Future<Output = Result<T>> + Send,
478 {
479 self.total_requests.fetch_add(1, Ordering::Relaxed);
480
481 // 1. Try L1 cache first (with built-in Moka coalescing for hot data)
482 if let Some(cached_json) = self.l1_cache.get(key).await {
483 self.l1_hits.fetch_add(1, Ordering::Relaxed);
484
485 // Attempt to deserialize from JSON to type T
486 match serde_json::from_value::<T>(cached_json) {
487 Ok(typed_value) => {
488 println!("✅ [L1 HIT] Deserialized '{}' to type {}", key, std::any::type_name::<T>());
489 return Ok(typed_value);
490 }
491 Err(e) => {
492 // Deserialization failed - cache data may be stale or corrupt
493 eprintln!("⚠️ L1 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
494 // Fall through to recompute
495 }
496 }
497 }
498
499 // 2. L1 miss - try L2 with Cache Stampede protection
500 let key_owned = key.to_string();
501 let lock_guard = self.in_flight_requests
502 .entry(key_owned.clone())
503 .or_insert_with(|| Arc::new(Mutex::new(())))
504 .clone();
505
506 let _guard = lock_guard.lock().await;
507
508 // RAII cleanup guard - ensures entry is removed even on early return or panic
509 let _cleanup_guard = CleanupGuard {
510 map: &self.in_flight_requests,
511 key: key_owned,
512 };
513
514 // 3. Double-check L1 cache after acquiring lock
515 // (Another request might have populated it while we were waiting)
516 if let Some(cached_json) = self.l1_cache.get(key).await {
517 self.l1_hits.fetch_add(1, Ordering::Relaxed);
518 if let Ok(typed_value) = serde_json::from_value::<T>(cached_json) {
519 println!("✅ [L1 HIT] Deserialized '{}' after lock acquisition", key);
520 return Ok(typed_value);
521 }
522 }
523
524 // 4. Check L2 cache with TTL
525 if let Some((cached_json, redis_ttl)) = self.l2_cache.get_with_ttl(key).await {
526 self.l2_hits.fetch_add(1, Ordering::Relaxed);
527
528 // Attempt to deserialize
529 match serde_json::from_value::<T>(cached_json.clone()) {
530 Ok(typed_value) => {
531 println!("✅ [L2 HIT] Deserialized '{}' from Redis", key);
532
533 // Promote to L1 using Redis TTL (or strategy TTL as fallback)
534 let promotion_ttl = redis_ttl.unwrap_or_else(|| strategy.to_duration());
535
536 if let Err(e) = self.l1_cache.set_with_ttl(key, cached_json, promotion_ttl).await {
537 eprintln!("⚠️ Failed to promote key '{}' to L1: {}", key, e);
538 } else {
539 self.promotions.fetch_add(1, Ordering::Relaxed);
540 println!("⬆️ Promoted '{}' from L2 to L1 with TTL {:?}", key, promotion_ttl);
541 }
542
543 return Ok(typed_value);
544 }
545 Err(e) => {
546 eprintln!("⚠️ L2 cache deserialization failed for key '{}': {}. Will recompute.", key, e);
547 // Fall through to recompute
548 }
549 }
550 }
551
552 // 5. Both L1 and L2 miss (or deserialization failed) - compute fresh data
553 println!("💻 Computing fresh typed data for key: '{}' (Cache Stampede protected)", key);
554 let typed_value = compute_fn().await?;
555
556 // 6. Serialize to JSON for storage
557 let json_value = serde_json::to_value(&typed_value)
558 .map_err(|e| anyhow::anyhow!("Failed to serialize type {} for caching: {}", std::any::type_name::<T>(), e))?;
559
560 // 7. Store in both L1 and L2 caches
561 if let Err(e) = self.set_with_strategy(key, json_value, strategy).await {
562 eprintln!("⚠️ Failed to cache computed typed data for key '{}': {}", key, e);
563 } else {
564 println!("💾 Cached typed value for '{}' (type: {})", key, std::any::type_name::<T>());
565 }
566
567 // 8. _cleanup_guard will auto-remove entry on drop
568
569 Ok(typed_value)
570 }
571
572 /// Get comprehensive cache statistics
573 #[allow(dead_code)]
574 pub fn get_stats(&self) -> CacheManagerStats {
575 let total_reqs = self.total_requests.load(Ordering::Relaxed);
576 let l1_hits = self.l1_hits.load(Ordering::Relaxed);
577 let l2_hits = self.l2_hits.load(Ordering::Relaxed);
578 let misses = self.misses.load(Ordering::Relaxed);
579
580 CacheManagerStats {
581 total_requests: total_reqs,
582 l1_hits,
583 l2_hits,
584 total_hits: l1_hits + l2_hits,
585 misses,
586 hit_rate: if total_reqs > 0 {
587 ((l1_hits + l2_hits) as f64 / total_reqs as f64) * 100.0
588 } else { 0.0 },
589 l1_hit_rate: if total_reqs > 0 {
590 (l1_hits as f64 / total_reqs as f64) * 100.0
591 } else { 0.0 },
592 promotions: self.promotions.load(Ordering::Relaxed),
593 in_flight_requests: self.in_flight_requests.len(),
594 }
595 }
596
597 // ===== Redis Streams Methods =====
598
599 /// Publish data to Redis Stream
600 ///
601 /// # Arguments
602 /// * `stream_key` - Name of the stream (e.g., "events_stream")
603 /// * `fields` - Field-value pairs to publish
604 /// * `maxlen` - Optional max length for stream trimming
605 ///
606 /// # Returns
607 /// The entry ID generated by Redis
608 ///
609 /// # Errors
610 /// Returns error if streaming backend is not configured
611 pub async fn publish_to_stream(
612 &self,
613 stream_key: &str,
614 fields: Vec<(String, String)>,
615 maxlen: Option<usize>
616 ) -> Result<String> {
617 match &self.streaming_backend {
618 Some(backend) => backend.stream_add(stream_key, fields, maxlen).await,
619 None => Err(anyhow::anyhow!("Streaming backend not configured"))
620 }
621 }
622
623 /// Read latest entries from Redis Stream
624 ///
625 /// # Arguments
626 /// * `stream_key` - Name of the stream
627 /// * `count` - Number of latest entries to retrieve
628 ///
629 /// # Returns
630 /// Vector of (entry_id, fields) tuples (newest first)
631 ///
632 /// # Errors
633 /// Returns error if streaming backend is not configured
634 pub async fn read_stream_latest(
635 &self,
636 stream_key: &str,
637 count: usize
638 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
639 match &self.streaming_backend {
640 Some(backend) => backend.stream_read_latest(stream_key, count).await,
641 None => Err(anyhow::anyhow!("Streaming backend not configured"))
642 }
643 }
644
645 /// Read from Redis Stream with optional blocking
646 ///
647 /// # Arguments
648 /// * `stream_key` - Name of the stream
649 /// * `last_id` - Last ID seen ("0" for start, "$" for new only)
650 /// * `count` - Max entries to retrieve
651 /// * `block_ms` - Optional blocking timeout in ms
652 ///
653 /// # Returns
654 /// Vector of (entry_id, fields) tuples
655 ///
656 /// # Errors
657 /// Returns error if streaming backend is not configured
658 pub async fn read_stream(
659 &self,
660 stream_key: &str,
661 last_id: &str,
662 count: usize,
663 block_ms: Option<usize>
664 ) -> Result<Vec<(String, Vec<(String, String)>)>> {
665 match &self.streaming_backend {
666 Some(backend) => backend.stream_read(stream_key, last_id, count, block_ms).await,
667 None => Err(anyhow::anyhow!("Streaming backend not configured"))
668 }
669 }
670}
671
672/// Cache Manager statistics
673#[allow(dead_code)]
674#[derive(Debug, Clone)]
675pub struct CacheManagerStats {
676 pub total_requests: u64,
677 pub l1_hits: u64,
678 pub l2_hits: u64,
679 pub total_hits: u64,
680 pub misses: u64,
681 pub hit_rate: f64,
682 pub l1_hit_rate: f64,
683 pub promotions: usize,
684 pub in_flight_requests: usize,
685}