warpdrive_proxy/cache/
coordinator.rs

1//! Cache coordinator with distributed + local fallback strategy
2//!
3//! This module provides a high-level cache coordinator that manages both
4//! distributed (Redis) and local (Memory) caches with automatic fallback.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────┐
10//! │ Coordinator │
11//! └──────┬──────┘
12//!        │
13//!    ┌───┴───┐
14//!    │       │
15//!    ▼       ▼
16//! ┌─────┐ ┌────────┐
17//! │Redis│ │ Memory │
18//! └─────┘ └────────┘
19//!   (L2)     (L1)
20//! ```
21//!
22//! **Strategy**:
23//! - GET: L1 → L2 → upstream (populate L1 + L2 on miss)
24//! - SET: L1 + L2 (write-through)
25//! - DELETE: L1 + L2 (invalidate both)
26//!
27//! **Fallback**:
28//! - If Redis unavailable, use Memory only
29//! - If Redis operation fails, continue with Memory
30//! - Log degraded mode for monitoring
31//!
32//! # Example
33//!
34//! ```no_run
35//! use warpdrive::cache::coordinator::CacheCoordinator;
36//! use warpdrive::config::Config;
37//!
38//! # async fn example() -> anyhow::Result<()> {
39//! let config = Config::from_env()?;
40//! let cache = CacheCoordinator::from_config(&config).await?;
41//!
42//! // Use cache (automatic Redis → Memory fallback)
43//! cache.set("key", b"value", 60).await?;
44//! let value = cache.get("key").await?;
45//! # Ok(())
46//! # }
47//! ```
48
49use async_trait::async_trait;
50use std::sync::Arc;
51use tracing::{debug, warn};
52
53use crate::cache::Cache;
54use crate::cache::memory::MemoryCache;
55use crate::cache::redis::RedisCache;
56use crate::config::Config;
57
58/// Cache backend selection
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CacheBackend {
61    /// Redis + Memory (L1 + L2 strategy)
62    RedisMemory,
63    /// Memory only (Redis unavailable or not configured)
64    MemoryOnly,
65}
66
67/// Cache coordinator managing distributed and local caches
68///
69/// Provides automatic fallback from Redis (L2) to Memory (L1) on errors.
70/// Memory cache acts as both L1 fast-path and fallback for Redis failures.
71#[derive(Clone)]
72pub struct CacheCoordinator {
73    /// Local memory cache (L1) - always present
74    memory: Arc<MemoryCache>,
75
76    /// Distributed Redis cache (L2) - optional
77    redis: Option<Arc<RedisCache>>,
78
79    /// Current backend mode
80    backend: CacheBackend,
81}
82
83impl CacheCoordinator {
84    /// Create cache coordinator from config
85    ///
86    /// Initializes both Redis and Memory caches based on configuration:
87    /// - If `redis_url` is set, initializes Redis (degraded mode if fails)
88    /// - Always initializes Memory cache with configured size
89    ///
90    /// # Errors
91    ///
92    /// Returns error only if Memory cache creation fails (should never happen).
93    /// Redis connection failures are logged but don't fail initialization.
94    pub async fn from_config(config: &Config) -> anyhow::Result<Self> {
95        // Always create memory cache
96        let memory = Arc::new(MemoryCache::new(
97            config.cache_size_bytes,
98            config.max_cache_item_size_bytes,
99        ));
100
101        debug!(
102            cache_size_mb = config.cache_size_bytes / crate::config::MB,
103            max_item_size_kb = config.max_cache_item_size_bytes / crate::config::KB,
104            "Initialized memory cache (L1)"
105        );
106
107        // Try to create Redis cache if configured
108        let redis = if let Some(ref redis_url) = config.redis_url {
109            match RedisCache::from_url(redis_url, "warpdrive:cache:".to_string()).await {
110                Ok(cache) => {
111                    debug!(redis_url = %redis_url, "Initialized Redis cache (L2)");
112                    Some(Arc::new(cache))
113                }
114                Err(e) => {
115                    warn!(
116                        error = %e,
117                        "Failed to connect to Redis, running in memory-only mode"
118                    );
119                    None
120                }
121            }
122        } else {
123            debug!("No Redis URL configured, running in memory-only mode");
124            None
125        };
126
127        let backend = if redis.is_some() {
128            CacheBackend::RedisMemory
129        } else {
130            CacheBackend::MemoryOnly
131        };
132
133        Ok(CacheCoordinator {
134            memory,
135            redis,
136            backend,
137        })
138    }
139
140    /// Get current cache backend mode
141    pub fn backend(&self) -> CacheBackend {
142        self.backend
143    }
144
145    /// Get memory cache statistics
146    ///
147    /// Returns (current_size, capacity, item_count)
148    pub fn memory_stats(&self) -> (usize, usize, usize) {
149        self.memory.stats()
150    }
151
152    /// Clear memory cache (for testing/admin operations)
153    pub fn clear_memory(&self) {
154        self.memory.clear()
155    }
156
157    /// Get Redis cache for advanced operations
158    ///
159    /// Returns `None` if Redis is not configured or unavailable.
160    pub fn redis(&self) -> Option<&Arc<RedisCache>> {
161        self.redis.as_ref()
162    }
163}
164
165#[async_trait]
166impl Cache for CacheCoordinator {
167    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
168        // Try L1 (Memory) first
169        if let Some(value) = self.memory.get(key).await? {
170            debug!(key = %key, backend = "memory", "Cache hit (L1)");
171            crate::metrics::CACHE_HITS
172                .with_label_values(&["memory"])
173                .inc();
174            return Ok(Some(value));
175        }
176
177        // Try L2 (Redis) if available
178        if let Some(ref redis) = self.redis {
179            match redis.get(key).await {
180                Ok(Some(value)) => {
181                    debug!(key = %key, backend = "redis", "Cache hit (L2)");
182                    crate::metrics::CACHE_HITS
183                        .with_label_values(&["redis"])
184                        .inc();
185
186                    // Populate L1 for next access (fire and forget)
187                    let memory = Arc::clone(&self.memory);
188                    let key = key.to_string();
189                    let value_clone = value.clone();
190                    tokio::spawn(async move {
191                        // Use default TTL of 60s for L1 backfill
192                        let _ = memory.set(&key, &value_clone, 60).await;
193                    });
194
195                    return Ok(Some(value));
196                }
197                Ok(None) => {
198                    debug!(key = %key, backend = "redis", "Cache miss (L2)");
199                    crate::metrics::CACHE_MISSES
200                        .with_label_values(&["redis"])
201                        .inc();
202                }
203                Err(e) => {
204                    warn!(
205                        key = %key,
206                        error = %e,
207                        "Redis get failed, continuing with memory-only"
208                    );
209                    crate::metrics::CACHE_ERRORS
210                        .with_label_values(&["redis", "get"])
211                        .inc();
212                }
213            }
214        }
215
216        // Miss on both L1 and L2
217        crate::metrics::CACHE_MISSES
218            .with_label_values(&["memory"])
219            .inc();
220        Ok(None)
221    }
222
223    async fn set(&self, key: &str, value: &[u8], ttl_seconds: u64) -> anyhow::Result<()> {
224        // Write to L1 (Memory) - always succeeds
225        self.memory.set(key, value, ttl_seconds).await?;
226        debug!(key = %key, ttl = ttl_seconds, backend = "memory", "Cache set (L1)");
227
228        // Write to L2 (Redis) if available
229        if let Some(ref redis) = self.redis {
230            match redis.set(key, value, ttl_seconds).await {
231                Ok(_) => {
232                    debug!(key = %key, ttl = ttl_seconds, backend = "redis", "Cache set (L2)");
233                }
234                Err(e) => {
235                    warn!(
236                        key = %key,
237                        error = %e,
238                        "Redis set failed, data only in L1 cache"
239                    );
240                    crate::metrics::CACHE_ERRORS
241                        .with_label_values(&["redis", "set"])
242                        .inc();
243                }
244            }
245        }
246
247        Ok(())
248    }
249
250    async fn delete(&self, key: &str) -> anyhow::Result<()> {
251        // Delete from L1 (Memory)
252        self.memory.delete(key).await?;
253        debug!(key = %key, backend = "memory", "Cache delete (L1)");
254
255        // Delete from L2 (Redis) if available
256        if let Some(ref redis) = self.redis {
257            match redis.delete(key).await {
258                Ok(_) => {
259                    debug!(key = %key, backend = "redis", "Cache delete (L2)");
260                }
261                Err(e) => {
262                    warn!(
263                        key = %key,
264                        error = %e,
265                        "Redis delete failed, L1 cleared but L2 may still have stale data"
266                    );
267                    crate::metrics::CACHE_ERRORS
268                        .with_label_values(&["redis", "delete"])
269                        .inc();
270                }
271            }
272        }
273
274        crate::metrics::CACHE_INVALIDATIONS
275            .with_label_values(&["explicit"])
276            .inc();
277        Ok(())
278    }
279
280    async fn exists(&self, key: &str) -> anyhow::Result<bool> {
281        // Check L1 first
282        if self.memory.exists(key).await? {
283            return Ok(true);
284        }
285
286        // Check L2 if available
287        if let Some(ref redis) = self.redis {
288            match redis.exists(key).await {
289                Ok(exists) => return Ok(exists),
290                Err(e) => {
291                    warn!(
292                        key = %key,
293                        error = %e,
294                        "Redis exists check failed"
295                    );
296                    crate::metrics::CACHE_ERRORS
297                        .with_label_values(&["redis", "exists"])
298                        .inc();
299                }
300            }
301        }
302
303        Ok(false)
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[tokio::test]
312    async fn test_memory_only_mode() {
313        let config = Config {
314            redis_url: None,
315            cache_size_bytes: 1024 * 1024,
316            max_cache_item_size_bytes: 1024,
317            ..Default::default()
318        };
319
320        let cache = CacheCoordinator::from_config(&config).await.unwrap();
321        assert_eq!(cache.backend(), CacheBackend::MemoryOnly);
322        assert!(cache.redis().is_none());
323    }
324
325    #[tokio::test]
326    async fn test_set_and_get_memory_only() {
327        let config = Config {
328            redis_url: None,
329            cache_size_bytes: 1024 * 1024,
330            max_cache_item_size_bytes: 1024,
331            ..Default::default()
332        };
333
334        let cache = CacheCoordinator::from_config(&config).await.unwrap();
335
336        cache.set("test", b"value", 60).await.unwrap();
337        let value = cache.get("test").await.unwrap();
338        assert_eq!(value, Some(b"value".to_vec()));
339    }
340
341    #[tokio::test]
342    async fn test_delete() {
343        let config = Config {
344            redis_url: None,
345            cache_size_bytes: 1024 * 1024,
346            max_cache_item_size_bytes: 1024,
347            ..Default::default()
348        };
349
350        let cache = CacheCoordinator::from_config(&config).await.unwrap();
351
352        cache.set("test", b"value", 60).await.unwrap();
353        assert!(cache.exists("test").await.unwrap());
354
355        cache.delete("test").await.unwrap();
356        assert!(!cache.exists("test").await.unwrap());
357    }
358
359    #[tokio::test]
360    async fn test_memory_stats() {
361        let config = Config {
362            redis_url: None,
363            cache_size_bytes: 1024 * 1024,
364            max_cache_item_size_bytes: 1024,
365            ..Default::default()
366        };
367
368        let cache = CacheCoordinator::from_config(&config).await.unwrap();
369
370        let (size_before, capacity, count_before) = cache.memory_stats();
371        assert_eq!(size_before, 0);
372        assert_eq!(count_before, 0);
373        assert_eq!(capacity, 1024 * 1024);
374
375        cache.set("test", b"value", 60).await.unwrap();
376
377        let (size_after, _, count_after) = cache.memory_stats();
378        assert!(size_after > 0);
379        assert_eq!(count_after, 1);
380    }
381}