1use super::{CacheEntry, CacheKey, EvictionPolicy};
4use anyhow::Result;
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::interval;
11use tracing::{debug, info, warn};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CacheConfig {
16 pub max_entries: usize,
18 pub max_memory_bytes: usize,
20 pub default_ttl: Option<Duration>,
22 pub eviction_policy: EvictionPolicy,
24 pub enable_background_cleanup: bool,
26}
27
28impl Default for CacheConfig {
29 fn default() -> Self {
30 Self {
31 max_entries: 10000,
32 max_memory_bytes: 100 * 1024 * 1024, default_ttl: Some(Duration::from_secs(300)), eviction_policy: EvictionPolicy::LRU,
35 enable_background_cleanup: true,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct CacheStats {
43 pub hits: u64,
44 pub misses: u64,
45 pub evictions: u64,
46 pub current_entries: usize,
47 pub current_memory_bytes: usize,
48}
49
50impl CacheStats {
51 pub fn hit_rate(&self) -> f64 {
53 let total = self.hits + self.misses;
54 if total == 0 {
55 0.0
56 } else {
57 (self.hits as f64 / total as f64) * 100.0
58 }
59 }
60}
61
62pub struct CacheManager {
64 config: CacheConfig,
65 cache: Arc<DashMap<CacheKey, CacheEntry>>,
66 hits: Arc<AtomicU64>,
67 misses: Arc<AtomicU64>,
68 evictions: Arc<AtomicU64>,
69 current_memory: Arc<AtomicUsize>,
70}
71
72impl CacheManager {
73 pub async fn new(config: CacheConfig) -> Result<Self> {
75 info!("Initializing cache manager with config: {:?}", config);
76
77 let manager = Self {
78 config: config.clone(),
79 cache: Arc::new(DashMap::new()),
80 hits: Arc::new(AtomicU64::new(0)),
81 misses: Arc::new(AtomicU64::new(0)),
82 evictions: Arc::new(AtomicU64::new(0)),
83 current_memory: Arc::new(AtomicUsize::new(0)),
84 };
85
86 if config.enable_background_cleanup {
88 manager.start_cleanup_task();
89 }
90
91 Ok(manager)
92 }
93
94 pub fn get(&self, key: &CacheKey) -> Option<serde_json::Value> {
96 match self.cache.get_mut(key) {
97 Some(mut entry) => {
98 if entry.is_expired() {
100 drop(entry); self.invalidate(key);
102 self.misses.fetch_add(1, Ordering::Relaxed);
103 debug!("Cache miss (expired): {:?}", key);
104 return None;
105 }
106
107 entry.mark_accessed();
109 let value = entry.value.clone();
110
111 self.hits.fetch_add(1, Ordering::Relaxed);
112 debug!("Cache hit: {:?}", key);
113 Some(value)
114 }
115 None => {
116 self.misses.fetch_add(1, Ordering::Relaxed);
117 debug!("Cache miss (not found): {:?}", key);
118 None
119 }
120 }
121 }
122
123 pub fn put(
125 &self,
126 key: CacheKey,
127 value: serde_json::Value,
128 ttl: Option<Duration>,
129 ) -> Result<()> {
130 let ttl = ttl.or(self.config.default_ttl);
131 let entry = CacheEntry::new(key.clone(), value, ttl);
132 let entry_size = entry.size_bytes;
133
134 self.ensure_capacity(entry_size)?;
136
137 self.current_memory.fetch_add(entry_size, Ordering::Relaxed);
139 self.cache.insert(key.clone(), entry);
140
141 debug!("Cached entry: {:?} ({} bytes)", key, entry_size);
142 Ok(())
143 }
144
145 pub fn invalidate(&self, key: &CacheKey) -> bool {
147 if let Some((_, entry)) = self.cache.remove(key) {
148 self.current_memory.fetch_sub(entry.size_bytes, Ordering::Relaxed);
149 debug!("Invalidated cache entry: {:?}", key);
150 true
151 } else {
152 false
153 }
154 }
155
156 pub fn invalidate_node(&self, node_id: &str) -> usize {
158 let keys_to_remove: Vec<CacheKey> = self
159 .cache
160 .iter()
161 .filter(|entry| entry.key().node_id == node_id)
162 .map(|entry| entry.key().clone())
163 .collect();
164
165 let count = keys_to_remove.len();
166 for key in keys_to_remove {
167 self.invalidate(&key);
168 }
169
170 info!("Invalidated {} entries for node '{}'", count, node_id);
171 count
172 }
173
174 pub fn clear(&self) {
176 let count = self.cache.len();
177 self.cache.clear();
178 self.current_memory.store(0, Ordering::Relaxed);
179 info!("Cleared cache ({} entries)", count);
180 }
181
182 pub fn stats(&self) -> CacheStats {
184 CacheStats {
185 hits: self.hits.load(Ordering::Relaxed),
186 misses: self.misses.load(Ordering::Relaxed),
187 evictions: self.evictions.load(Ordering::Relaxed),
188 current_entries: self.cache.len(),
189 current_memory_bytes: self.current_memory.load(Ordering::Relaxed),
190 }
191 }
192
193 fn ensure_capacity(&self, new_entry_size: usize) -> Result<()> {
195 while self.cache.len() >= self.config.max_entries {
197 self.evict_one()?;
198 }
199
200 while self.current_memory.load(Ordering::Relaxed) + new_entry_size
202 > self.config.max_memory_bytes
203 {
204 self.evict_one()?;
205 }
206
207 Ok(())
208 }
209
210 fn evict_one(&self) -> Result<()> {
212 let key_to_evict = match self.config.eviction_policy {
213 EvictionPolicy::LRU => self.find_lru_key(),
214 EvictionPolicy::FIFO => self.find_fifo_key(),
215 EvictionPolicy::LFU => self.find_lfu_key(),
216 EvictionPolicy::None => {
217 warn!("Eviction needed but policy is None");
218 return Ok(());
219 }
220 };
221
222 if let Some(key) = key_to_evict {
223 self.invalidate(&key);
224 self.evictions.fetch_add(1, Ordering::Relaxed);
225 debug!("Evicted entry: {:?}", key);
226 }
227
228 Ok(())
229 }
230
231 fn find_lru_key(&self) -> Option<CacheKey> {
233 self.cache
234 .iter()
235 .min_by_key(|entry| entry.last_accessed)
236 .map(|entry| entry.key().clone())
237 }
238
239 fn find_fifo_key(&self) -> Option<CacheKey> {
241 self.cache
242 .iter()
243 .min_by_key(|entry| entry.created_at)
244 .map(|entry| entry.key().clone())
245 }
246
247 fn find_lfu_key(&self) -> Option<CacheKey> {
249 self.cache
250 .iter()
251 .min_by_key(|entry| entry.access_count)
252 .map(|entry| entry.key().clone())
253 }
254
255 fn start_cleanup_task(&self) {
257 let cache = Arc::clone(&self.cache);
258 let current_memory = Arc::clone(&self.current_memory);
259
260 tokio::spawn(async move {
261 let mut cleanup_interval = interval(Duration::from_secs(60));
262
263 loop {
264 cleanup_interval.tick().await;
265
266 let expired_keys: Vec<CacheKey> = cache
267 .iter()
268 .filter(|entry| entry.is_expired())
269 .map(|entry| entry.key().clone())
270 .collect();
271
272 if !expired_keys.is_empty() {
273 for key in &expired_keys {
274 if let Some((_, entry)) = cache.remove(key) {
275 current_memory.fetch_sub(entry.size_bytes, Ordering::Relaxed);
276 }
277 }
278 info!("Cleaned up {} expired cache entries", expired_keys.len());
279 }
280 }
281 });
282
283 info!("Started background cache cleanup task");
284 }
285
286 pub fn entries(&self) -> Vec<CacheEntry> {
288 self.cache
289 .iter()
290 .map(|entry| entry.value().clone())
291 .collect()
292 }
293
294 pub fn contains_key(&self, key: &CacheKey) -> bool {
296 self.cache.contains_key(key)
297 }
298
299 pub fn len(&self) -> usize {
301 self.cache.len()
302 }
303
304 pub fn is_empty(&self) -> bool {
306 self.cache.is_empty()
307 }
308}
309
310impl Clone for CacheManager {
311 fn clone(&self) -> Self {
312 Self {
313 config: self.config.clone(),
314 cache: Arc::clone(&self.cache),
315 hits: Arc::clone(&self.hits),
316 misses: Arc::clone(&self.misses),
317 evictions: Arc::clone(&self.evictions),
318 current_memory: Arc::clone(&self.current_memory),
319 }
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use serde_json::json;
327
328 #[tokio::test]
329 async fn test_cache_basic_operations() {
330 let config = CacheConfig {
331 max_entries: 100,
332 max_memory_bytes: 1024 * 1024,
333 default_ttl: None,
334 eviction_policy: EvictionPolicy::LRU,
335 enable_background_cleanup: false,
336 };
337
338 let cache = CacheManager::new(config).await.unwrap();
339
340 let key = CacheKey::new("node1", &json!({"x": 10}));
341 let value = json!({"result": 42});
342
343 assert!(cache.get(&key).is_none());
345
346 cache.put(key.clone(), value.clone(), None).unwrap();
348
349 assert_eq!(cache.get(&key), Some(value));
351
352 let stats = cache.stats();
354 assert_eq!(stats.hits, 1);
355 assert_eq!(stats.misses, 1);
356 }
357
358 #[tokio::test]
359 async fn test_cache_expiration() {
360 let config = CacheConfig {
361 max_entries: 100,
362 max_memory_bytes: 1024 * 1024,
363 default_ttl: Some(Duration::from_millis(50)),
364 eviction_policy: EvictionPolicy::LRU,
365 enable_background_cleanup: false,
366 };
367
368 let cache = CacheManager::new(config).await.unwrap();
369
370 let key = CacheKey::new("node1", &json!({"x": 10}));
371 let value = json!({"result": 42});
372
373 cache.put(key.clone(), value.clone(), None).unwrap();
374
375 assert!(cache.get(&key).is_some());
377
378 tokio::time::sleep(Duration::from_millis(100)).await;
380
381 assert!(cache.get(&key).is_none());
383 }
384
385 #[tokio::test]
386 async fn test_cache_max_entries() {
387 let config = CacheConfig {
388 max_entries: 3,
389 max_memory_bytes: 1024 * 1024,
390 default_ttl: None,
391 eviction_policy: EvictionPolicy::FIFO,
392 enable_background_cleanup: false,
393 };
394
395 let cache = CacheManager::new(config).await.unwrap();
396
397 for i in 0..4 {
399 let key = CacheKey::new(format!("node{}", i), &json!({"x": i}));
400 let value = json!({"result": i});
401 cache.put(key, value, None).unwrap();
402 }
403
404 assert_eq!(cache.len(), 3);
406
407 let first_key = CacheKey::new("node0", &json!({"x": 0}));
409 assert!(!cache.contains_key(&first_key));
410 }
411
412 #[tokio::test]
413 async fn test_cache_invalidation() {
414 let config = CacheConfig::default();
415 let cache = CacheManager::new(config).await.unwrap();
416
417 let key1 = CacheKey::new("node1", &json!({"x": 10}));
418 let key2 = CacheKey::new("node1", &json!({"x": 20}));
419
420 cache.put(key1.clone(), json!({"result": 1}), None).unwrap();
421 cache.put(key2.clone(), json!({"result": 2}), None).unwrap();
422
423 assert_eq!(cache.len(), 2);
424
425 let count = cache.invalidate_node("node1");
427 assert_eq!(count, 2);
428 assert_eq!(cache.len(), 0);
429 }
430
431 #[tokio::test]
432 async fn test_cache_stats() {
433 let config = CacheConfig::default();
434 let cache = CacheManager::new(config).await.unwrap();
435
436 let key = CacheKey::new("node1", &json!({"x": 10}));
437
438 cache.get(&key);
440
441 cache.put(key.clone(), json!({"result": 42}), None).unwrap();
443 cache.get(&key);
444 cache.get(&key);
445
446 let stats = cache.stats();
447 assert_eq!(stats.hits, 2);
448 assert_eq!(stats.misses, 1);
449 assert_eq!(stats.current_entries, 1);
450 assert!(stats.hit_rate() > 0.0);
451 }
452}