1use super::{CacheEntry, CacheKey, EvictionPolicy};
4use anyhow::Result;
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::interval;
11use tracing::{debug, info, warn};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CacheConfig {
16 pub max_entries: usize,
18 pub max_memory_bytes: usize,
20 pub default_ttl: Option<Duration>,
22 pub eviction_policy: EvictionPolicy,
24 pub enable_background_cleanup: bool,
26}
27
28impl Default for CacheConfig {
29 fn default() -> Self {
30 Self {
31 max_entries: 10000,
32 max_memory_bytes: 100 * 1024 * 1024, default_ttl: Some(Duration::from_secs(300)), eviction_policy: EvictionPolicy::LRU,
35 enable_background_cleanup: true,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct CacheStats {
43 pub hits: u64,
44 pub misses: u64,
45 pub evictions: u64,
46 pub current_entries: usize,
47 pub current_memory_bytes: usize,
48}
49
50impl CacheStats {
51 pub fn hit_rate(&self) -> f64 {
53 let total = self.hits + self.misses;
54 if total == 0 {
55 0.0
56 } else {
57 (self.hits as f64 / total as f64) * 100.0
58 }
59 }
60}
61
62pub struct CacheManager {
64 config: CacheConfig,
65 cache: Arc<DashMap<CacheKey, CacheEntry>>,
66 hits: Arc<AtomicU64>,
67 misses: Arc<AtomicU64>,
68 evictions: Arc<AtomicU64>,
69 current_memory: Arc<AtomicUsize>,
70}
71
72impl CacheManager {
73 pub async fn new(config: CacheConfig) -> Result<Self> {
75 info!("Initializing cache manager with config: {:?}", config);
76
77 let manager = Self {
78 config: config.clone(),
79 cache: Arc::new(DashMap::new()),
80 hits: Arc::new(AtomicU64::new(0)),
81 misses: Arc::new(AtomicU64::new(0)),
82 evictions: Arc::new(AtomicU64::new(0)),
83 current_memory: Arc::new(AtomicUsize::new(0)),
84 };
85
86 if config.enable_background_cleanup {
88 manager.start_cleanup_task();
89 }
90
91 Ok(manager)
92 }
93
94 pub fn get(&self, key: &CacheKey) -> Option<serde_json::Value> {
96 match self.cache.get_mut(key) {
97 Some(mut entry) => {
98 if entry.is_expired() {
100 drop(entry); self.invalidate(key);
102 self.misses.fetch_add(1, Ordering::Relaxed);
103 debug!("Cache miss (expired): {:?}", key);
104 return None;
105 }
106
107 entry.mark_accessed();
109 let value = entry.value.clone();
110
111 self.hits.fetch_add(1, Ordering::Relaxed);
112 debug!("Cache hit: {:?}", key);
113 Some(value)
114 }
115 None => {
116 self.misses.fetch_add(1, Ordering::Relaxed);
117 debug!("Cache miss (not found): {:?}", key);
118 None
119 }
120 }
121 }
122
123 pub fn put(
125 &self,
126 key: CacheKey,
127 value: serde_json::Value,
128 ttl: Option<Duration>,
129 ) -> Result<()> {
130 let ttl = ttl.or(self.config.default_ttl);
131 let entry = CacheEntry::new(key.clone(), value, ttl);
132 let entry_size = entry.size_bytes;
133
134 self.ensure_capacity(entry_size)?;
136
137 self.current_memory.fetch_add(entry_size, Ordering::Relaxed);
139 self.cache.insert(key.clone(), entry);
140
141 debug!("Cached entry: {:?} ({} bytes)", key, entry_size);
142 Ok(())
143 }
144
145 pub fn invalidate(&self, key: &CacheKey) -> bool {
147 if let Some((_, entry)) = self.cache.remove(key) {
148 self.current_memory
149 .fetch_sub(entry.size_bytes, Ordering::Relaxed);
150 debug!("Invalidated cache entry: {:?}", key);
151 true
152 } else {
153 false
154 }
155 }
156
157 pub fn invalidate_node(&self, node_id: &str) -> usize {
159 let keys_to_remove: Vec<CacheKey> = self
160 .cache
161 .iter()
162 .filter(|entry| entry.key().node_id == node_id)
163 .map(|entry| entry.key().clone())
164 .collect();
165
166 let count = keys_to_remove.len();
167 for key in keys_to_remove {
168 self.invalidate(&key);
169 }
170
171 info!("Invalidated {} entries for node '{}'", count, node_id);
172 count
173 }
174
175 pub fn clear(&self) {
177 let count = self.cache.len();
178 self.cache.clear();
179 self.current_memory.store(0, Ordering::Relaxed);
180 info!("Cleared cache ({} entries)", count);
181 }
182
183 pub fn stats(&self) -> CacheStats {
185 CacheStats {
186 hits: self.hits.load(Ordering::Relaxed),
187 misses: self.misses.load(Ordering::Relaxed),
188 evictions: self.evictions.load(Ordering::Relaxed),
189 current_entries: self.cache.len(),
190 current_memory_bytes: self.current_memory.load(Ordering::Relaxed),
191 }
192 }
193
194 fn ensure_capacity(&self, new_entry_size: usize) -> Result<()> {
196 while self.cache.len() >= self.config.max_entries {
198 self.evict_one()?;
199 }
200
201 while self.current_memory.load(Ordering::Relaxed) + new_entry_size
203 > self.config.max_memory_bytes
204 {
205 self.evict_one()?;
206 }
207
208 Ok(())
209 }
210
211 fn evict_one(&self) -> Result<()> {
213 let key_to_evict = match self.config.eviction_policy {
214 EvictionPolicy::LRU => self.find_lru_key(),
215 EvictionPolicy::FIFO => self.find_fifo_key(),
216 EvictionPolicy::LFU => self.find_lfu_key(),
217 EvictionPolicy::None => {
218 warn!("Eviction needed but policy is None");
219 return Ok(());
220 }
221 };
222
223 if let Some(key) = key_to_evict {
224 self.invalidate(&key);
225 self.evictions.fetch_add(1, Ordering::Relaxed);
226 debug!("Evicted entry: {:?}", key);
227 }
228
229 Ok(())
230 }
231
232 fn find_lru_key(&self) -> Option<CacheKey> {
234 self.cache
235 .iter()
236 .min_by_key(|entry| entry.last_accessed)
237 .map(|entry| entry.key().clone())
238 }
239
240 fn find_fifo_key(&self) -> Option<CacheKey> {
242 self.cache
243 .iter()
244 .min_by_key(|entry| entry.created_at)
245 .map(|entry| entry.key().clone())
246 }
247
248 fn find_lfu_key(&self) -> Option<CacheKey> {
250 self.cache
251 .iter()
252 .min_by_key(|entry| entry.access_count)
253 .map(|entry| entry.key().clone())
254 }
255
256 fn start_cleanup_task(&self) {
258 let cache = Arc::clone(&self.cache);
259 let current_memory = Arc::clone(&self.current_memory);
260
261 tokio::spawn(async move {
262 let mut cleanup_interval = interval(Duration::from_secs(60));
263
264 loop {
265 cleanup_interval.tick().await;
266
267 let expired_keys: Vec<CacheKey> = cache
268 .iter()
269 .filter(|entry| entry.is_expired())
270 .map(|entry| entry.key().clone())
271 .collect();
272
273 if !expired_keys.is_empty() {
274 for key in &expired_keys {
275 if let Some((_, entry)) = cache.remove(key) {
276 current_memory.fetch_sub(entry.size_bytes, Ordering::Relaxed);
277 }
278 }
279 info!("Cleaned up {} expired cache entries", expired_keys.len());
280 }
281 }
282 });
283
284 info!("Started background cache cleanup task");
285 }
286
287 pub fn entries(&self) -> Vec<CacheEntry> {
289 self.cache
290 .iter()
291 .map(|entry| entry.value().clone())
292 .collect()
293 }
294
295 pub fn contains_key(&self, key: &CacheKey) -> bool {
297 self.cache.contains_key(key)
298 }
299
300 pub fn len(&self) -> usize {
302 self.cache.len()
303 }
304
305 pub fn is_empty(&self) -> bool {
307 self.cache.is_empty()
308 }
309}
310
311impl Clone for CacheManager {
312 fn clone(&self) -> Self {
313 Self {
314 config: self.config.clone(),
315 cache: Arc::clone(&self.cache),
316 hits: Arc::clone(&self.hits),
317 misses: Arc::clone(&self.misses),
318 evictions: Arc::clone(&self.evictions),
319 current_memory: Arc::clone(&self.current_memory),
320 }
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use serde_json::json;
328
329 #[tokio::test]
330 async fn test_cache_basic_operations() {
331 let config = CacheConfig {
332 max_entries: 100,
333 max_memory_bytes: 1024 * 1024,
334 default_ttl: None,
335 eviction_policy: EvictionPolicy::LRU,
336 enable_background_cleanup: false,
337 };
338
339 let cache = CacheManager::new(config).await.unwrap();
340
341 let key = CacheKey::new("node1", &json!({"x": 10}));
342 let value = json!({"result": 42});
343
344 assert!(cache.get(&key).is_none());
346
347 cache.put(key.clone(), value.clone(), None).unwrap();
349
350 assert_eq!(cache.get(&key), Some(value));
352
353 let stats = cache.stats();
355 assert_eq!(stats.hits, 1);
356 assert_eq!(stats.misses, 1);
357 }
358
359 #[tokio::test]
360 async fn test_cache_expiration() {
361 let config = CacheConfig {
362 max_entries: 100,
363 max_memory_bytes: 1024 * 1024,
364 default_ttl: Some(Duration::from_millis(50)),
365 eviction_policy: EvictionPolicy::LRU,
366 enable_background_cleanup: false,
367 };
368
369 let cache = CacheManager::new(config).await.unwrap();
370
371 let key = CacheKey::new("node1", &json!({"x": 10}));
372 let value = json!({"result": 42});
373
374 cache.put(key.clone(), value.clone(), None).unwrap();
375
376 assert!(cache.get(&key).is_some());
378
379 tokio::time::sleep(Duration::from_millis(100)).await;
381
382 assert!(cache.get(&key).is_none());
384 }
385
386 #[tokio::test]
387 async fn test_cache_max_entries() {
388 let config = CacheConfig {
389 max_entries: 3,
390 max_memory_bytes: 1024 * 1024,
391 default_ttl: None,
392 eviction_policy: EvictionPolicy::FIFO,
393 enable_background_cleanup: false,
394 };
395
396 let cache = CacheManager::new(config).await.unwrap();
397
398 for i in 0..4 {
400 let key = CacheKey::new(format!("node{}", i), &json!({"x": i}));
401 let value = json!({"result": i});
402 cache.put(key, value, None).unwrap();
403 }
404
405 assert_eq!(cache.len(), 3);
407
408 let first_key = CacheKey::new("node0", &json!({"x": 0}));
410 assert!(!cache.contains_key(&first_key));
411 }
412
413 #[tokio::test]
414 async fn test_cache_invalidation() {
415 let config = CacheConfig::default();
416 let cache = CacheManager::new(config).await.unwrap();
417
418 let key1 = CacheKey::new("node1", &json!({"x": 10}));
419 let key2 = CacheKey::new("node1", &json!({"x": 20}));
420
421 cache.put(key1.clone(), json!({"result": 1}), None).unwrap();
422 cache.put(key2.clone(), json!({"result": 2}), None).unwrap();
423
424 assert_eq!(cache.len(), 2);
425
426 let count = cache.invalidate_node("node1");
428 assert_eq!(count, 2);
429 assert_eq!(cache.len(), 0);
430 }
431
432 #[tokio::test]
433 async fn test_cache_stats() {
434 let config = CacheConfig::default();
435 let cache = CacheManager::new(config).await.unwrap();
436
437 let key = CacheKey::new("node1", &json!({"x": 10}));
438
439 cache.get(&key);
441
442 cache.put(key.clone(), json!({"result": 42}), None).unwrap();
444 cache.get(&key);
445 cache.get(&key);
446
447 let stats = cache.stats();
448 assert_eq!(stats.hits, 2);
449 assert_eq!(stats.misses, 1);
450 assert_eq!(stats.current_entries, 1);
451 assert!(stats.hit_rate() > 0.0);
452 }
453}