1use async_trait::async_trait;
50use std::sync::Arc;
51use tracing::{debug, warn};
52
53use crate::cache::Cache;
54use crate::cache::memory::MemoryCache;
55use crate::cache::redis::RedisCache;
56use crate::config::Config;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum CacheBackend {
61 RedisMemory,
63 MemoryOnly,
65}
66
67#[derive(Clone)]
72pub struct CacheCoordinator {
73 memory: Arc<MemoryCache>,
75
76 redis: Option<Arc<RedisCache>>,
78
79 backend: CacheBackend,
81}
82
83impl CacheCoordinator {
84 pub async fn from_config(config: &Config) -> anyhow::Result<Self> {
95 let memory = Arc::new(MemoryCache::new(
97 config.cache_size_bytes,
98 config.max_cache_item_size_bytes,
99 ));
100
101 debug!(
102 cache_size_mb = config.cache_size_bytes / crate::config::MB,
103 max_item_size_kb = config.max_cache_item_size_bytes / crate::config::KB,
104 "Initialized memory cache (L1)"
105 );
106
107 let redis = if let Some(ref redis_url) = config.redis_url {
109 match RedisCache::from_url(redis_url, "warpdrive:cache:".to_string()).await {
110 Ok(cache) => {
111 debug!(redis_url = %redis_url, "Initialized Redis cache (L2)");
112 Some(Arc::new(cache))
113 }
114 Err(e) => {
115 warn!(
116 error = %e,
117 "Failed to connect to Redis, running in memory-only mode"
118 );
119 None
120 }
121 }
122 } else {
123 debug!("No Redis URL configured, running in memory-only mode");
124 None
125 };
126
127 let backend = if redis.is_some() {
128 CacheBackend::RedisMemory
129 } else {
130 CacheBackend::MemoryOnly
131 };
132
133 Ok(CacheCoordinator {
134 memory,
135 redis,
136 backend,
137 })
138 }
139
140 pub fn backend(&self) -> CacheBackend {
142 self.backend
143 }
144
145 pub fn memory_stats(&self) -> (usize, usize, usize) {
149 self.memory.stats()
150 }
151
152 pub fn clear_memory(&self) {
154 self.memory.clear()
155 }
156
157 pub fn redis(&self) -> Option<&Arc<RedisCache>> {
161 self.redis.as_ref()
162 }
163}
164
165#[async_trait]
166impl Cache for CacheCoordinator {
167 async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
168 if let Some(value) = self.memory.get(key).await? {
170 debug!(key = %key, backend = "memory", "Cache hit (L1)");
171 crate::metrics::CACHE_HITS
172 .with_label_values(&["memory"])
173 .inc();
174 return Ok(Some(value));
175 }
176
177 if let Some(ref redis) = self.redis {
179 match redis.get(key).await {
180 Ok(Some(value)) => {
181 debug!(key = %key, backend = "redis", "Cache hit (L2)");
182 crate::metrics::CACHE_HITS
183 .with_label_values(&["redis"])
184 .inc();
185
186 let memory = Arc::clone(&self.memory);
188 let key = key.to_string();
189 let value_clone = value.clone();
190 tokio::spawn(async move {
191 let _ = memory.set(&key, &value_clone, 60).await;
193 });
194
195 return Ok(Some(value));
196 }
197 Ok(None) => {
198 debug!(key = %key, backend = "redis", "Cache miss (L2)");
199 crate::metrics::CACHE_MISSES
200 .with_label_values(&["redis"])
201 .inc();
202 }
203 Err(e) => {
204 warn!(
205 key = %key,
206 error = %e,
207 "Redis get failed, continuing with memory-only"
208 );
209 crate::metrics::CACHE_ERRORS
210 .with_label_values(&["redis", "get"])
211 .inc();
212 }
213 }
214 }
215
216 crate::metrics::CACHE_MISSES
218 .with_label_values(&["memory"])
219 .inc();
220 Ok(None)
221 }
222
223 async fn set(&self, key: &str, value: &[u8], ttl_seconds: u64) -> anyhow::Result<()> {
224 self.memory.set(key, value, ttl_seconds).await?;
226 debug!(key = %key, ttl = ttl_seconds, backend = "memory", "Cache set (L1)");
227
228 if let Some(ref redis) = self.redis {
230 match redis.set(key, value, ttl_seconds).await {
231 Ok(_) => {
232 debug!(key = %key, ttl = ttl_seconds, backend = "redis", "Cache set (L2)");
233 }
234 Err(e) => {
235 warn!(
236 key = %key,
237 error = %e,
238 "Redis set failed, data only in L1 cache"
239 );
240 crate::metrics::CACHE_ERRORS
241 .with_label_values(&["redis", "set"])
242 .inc();
243 }
244 }
245 }
246
247 Ok(())
248 }
249
250 async fn delete(&self, key: &str) -> anyhow::Result<()> {
251 self.memory.delete(key).await?;
253 debug!(key = %key, backend = "memory", "Cache delete (L1)");
254
255 if let Some(ref redis) = self.redis {
257 match redis.delete(key).await {
258 Ok(_) => {
259 debug!(key = %key, backend = "redis", "Cache delete (L2)");
260 }
261 Err(e) => {
262 warn!(
263 key = %key,
264 error = %e,
265 "Redis delete failed, L1 cleared but L2 may still have stale data"
266 );
267 crate::metrics::CACHE_ERRORS
268 .with_label_values(&["redis", "delete"])
269 .inc();
270 }
271 }
272 }
273
274 crate::metrics::CACHE_INVALIDATIONS
275 .with_label_values(&["explicit"])
276 .inc();
277 Ok(())
278 }
279
280 async fn exists(&self, key: &str) -> anyhow::Result<bool> {
281 if self.memory.exists(key).await? {
283 return Ok(true);
284 }
285
286 if let Some(ref redis) = self.redis {
288 match redis.exists(key).await {
289 Ok(exists) => return Ok(exists),
290 Err(e) => {
291 warn!(
292 key = %key,
293 error = %e,
294 "Redis exists check failed"
295 );
296 crate::metrics::CACHE_ERRORS
297 .with_label_values(&["redis", "exists"])
298 .inc();
299 }
300 }
301 }
302
303 Ok(false)
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[tokio::test]
312 async fn test_memory_only_mode() {
313 let config = Config {
314 redis_url: None,
315 cache_size_bytes: 1024 * 1024,
316 max_cache_item_size_bytes: 1024,
317 ..Default::default()
318 };
319
320 let cache = CacheCoordinator::from_config(&config).await.unwrap();
321 assert_eq!(cache.backend(), CacheBackend::MemoryOnly);
322 assert!(cache.redis().is_none());
323 }
324
325 #[tokio::test]
326 async fn test_set_and_get_memory_only() {
327 let config = Config {
328 redis_url: None,
329 cache_size_bytes: 1024 * 1024,
330 max_cache_item_size_bytes: 1024,
331 ..Default::default()
332 };
333
334 let cache = CacheCoordinator::from_config(&config).await.unwrap();
335
336 cache.set("test", b"value", 60).await.unwrap();
337 let value = cache.get("test").await.unwrap();
338 assert_eq!(value, Some(b"value".to_vec()));
339 }
340
341 #[tokio::test]
342 async fn test_delete() {
343 let config = Config {
344 redis_url: None,
345 cache_size_bytes: 1024 * 1024,
346 max_cache_item_size_bytes: 1024,
347 ..Default::default()
348 };
349
350 let cache = CacheCoordinator::from_config(&config).await.unwrap();
351
352 cache.set("test", b"value", 60).await.unwrap();
353 assert!(cache.exists("test").await.unwrap());
354
355 cache.delete("test").await.unwrap();
356 assert!(!cache.exists("test").await.unwrap());
357 }
358
359 #[tokio::test]
360 async fn test_memory_stats() {
361 let config = Config {
362 redis_url: None,
363 cache_size_bytes: 1024 * 1024,
364 max_cache_item_size_bytes: 1024,
365 ..Default::default()
366 };
367
368 let cache = CacheCoordinator::from_config(&config).await.unwrap();
369
370 let (size_before, capacity, count_before) = cache.memory_stats();
371 assert_eq!(size_before, 0);
372 assert_eq!(count_before, 0);
373 assert_eq!(capacity, 1024 * 1024);
374
375 cache.set("test", b"value", 60).await.unwrap();
376
377 let (size_after, _, count_after) = cache.memory_stats();
378 assert!(size_after > 0);
379 assert_eq!(count_after, 1);
380 }
381}