rust_logic_graph/cache/
cache_manager.rs

1//! Cache manager implementation
2
3use super::{CacheEntry, CacheKey, EvictionPolicy};
4use anyhow::Result;
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::interval;
11use tracing::{debug, info, warn};
12
13/// Configuration for the cache manager
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CacheConfig {
16    /// Maximum number of entries in the cache
17    pub max_entries: usize,
18    /// Maximum memory usage in bytes (approximate)
19    pub max_memory_bytes: usize,
20    /// Default TTL for cache entries (None = no expiration)
21    pub default_ttl: Option<Duration>,
22    /// Eviction policy when limits are reached
23    pub eviction_policy: EvictionPolicy,
24    /// Enable background cleanup task for expired entries
25    pub enable_background_cleanup: bool,
26}
27
28impl Default for CacheConfig {
29    fn default() -> Self {
30        Self {
31            max_entries: 10000,
32            max_memory_bytes: 100 * 1024 * 1024,         // 100MB
33            default_ttl: Some(Duration::from_secs(300)), // 5 minutes
34            eviction_policy: EvictionPolicy::LRU,
35            enable_background_cleanup: true,
36        }
37    }
38}
39
40/// Statistics about cache usage
41#[derive(Debug, Clone, Default)]
42pub struct CacheStats {
43    pub hits: u64,
44    pub misses: u64,
45    pub evictions: u64,
46    pub current_entries: usize,
47    pub current_memory_bytes: usize,
48}
49
50impl CacheStats {
51    /// Calculate hit rate as a percentage
52    pub fn hit_rate(&self) -> f64 {
53        let total = self.hits + self.misses;
54        if total == 0 {
55            0.0
56        } else {
57            (self.hits as f64 / total as f64) * 100.0
58        }
59    }
60}
61
62/// Thread-safe cache manager for node execution results
63pub struct CacheManager {
64    config: CacheConfig,
65    cache: Arc<DashMap<CacheKey, CacheEntry>>,
66    hits: Arc<AtomicU64>,
67    misses: Arc<AtomicU64>,
68    evictions: Arc<AtomicU64>,
69    current_memory: Arc<AtomicUsize>,
70}
71
72impl CacheManager {
73    /// Create a new cache manager with the given configuration
74    pub async fn new(config: CacheConfig) -> Result<Self> {
75        info!("Initializing cache manager with config: {:?}", config);
76
77        let manager = Self {
78            config: config.clone(),
79            cache: Arc::new(DashMap::new()),
80            hits: Arc::new(AtomicU64::new(0)),
81            misses: Arc::new(AtomicU64::new(0)),
82            evictions: Arc::new(AtomicU64::new(0)),
83            current_memory: Arc::new(AtomicUsize::new(0)),
84        };
85
86        // Start background cleanup task if enabled
87        if config.enable_background_cleanup {
88            manager.start_cleanup_task();
89        }
90
91        Ok(manager)
92    }
93
94    /// Get a value from the cache
95    pub fn get(&self, key: &CacheKey) -> Option<serde_json::Value> {
96        match self.cache.get_mut(key) {
97            Some(mut entry) => {
98                // Check if expired
99                if entry.is_expired() {
100                    drop(entry); // Release the lock before removing
101                    self.invalidate(key);
102                    self.misses.fetch_add(1, Ordering::Relaxed);
103                    debug!("Cache miss (expired): {:?}", key);
104                    return None;
105                }
106
107                // Update access metadata
108                entry.mark_accessed();
109                let value = entry.value.clone();
110
111                self.hits.fetch_add(1, Ordering::Relaxed);
112                debug!("Cache hit: {:?}", key);
113                Some(value)
114            }
115            None => {
116                self.misses.fetch_add(1, Ordering::Relaxed);
117                debug!("Cache miss (not found): {:?}", key);
118                None
119            }
120        }
121    }
122
123    /// Put a value into the cache
124    pub fn put(
125        &self,
126        key: CacheKey,
127        value: serde_json::Value,
128        ttl: Option<Duration>,
129    ) -> Result<()> {
130        let ttl = ttl.or(self.config.default_ttl);
131        let entry = CacheEntry::new(key.clone(), value, ttl);
132        let entry_size = entry.size_bytes;
133
134        // Check if we need to evict entries
135        self.ensure_capacity(entry_size)?;
136
137        // Insert the new entry
138        self.current_memory.fetch_add(entry_size, Ordering::Relaxed);
139        self.cache.insert(key.clone(), entry);
140
141        debug!("Cached entry: {:?} ({} bytes)", key, entry_size);
142        Ok(())
143    }
144
145    /// Invalidate (remove) a specific cache entry
146    pub fn invalidate(&self, key: &CacheKey) -> bool {
147        if let Some((_, entry)) = self.cache.remove(key) {
148            self.current_memory
149                .fetch_sub(entry.size_bytes, Ordering::Relaxed);
150            debug!("Invalidated cache entry: {:?}", key);
151            true
152        } else {
153            false
154        }
155    }
156
157    /// Invalidate all entries for a specific node
158    pub fn invalidate_node(&self, node_id: &str) -> usize {
159        let keys_to_remove: Vec<CacheKey> = self
160            .cache
161            .iter()
162            .filter(|entry| entry.key().node_id == node_id)
163            .map(|entry| entry.key().clone())
164            .collect();
165
166        let count = keys_to_remove.len();
167        for key in keys_to_remove {
168            self.invalidate(&key);
169        }
170
171        info!("Invalidated {} entries for node '{}'", count, node_id);
172        count
173    }
174
175    /// Clear all cache entries
176    pub fn clear(&self) {
177        let count = self.cache.len();
178        self.cache.clear();
179        self.current_memory.store(0, Ordering::Relaxed);
180        info!("Cleared cache ({} entries)", count);
181    }
182
183    /// Get cache statistics
184    pub fn stats(&self) -> CacheStats {
185        CacheStats {
186            hits: self.hits.load(Ordering::Relaxed),
187            misses: self.misses.load(Ordering::Relaxed),
188            evictions: self.evictions.load(Ordering::Relaxed),
189            current_entries: self.cache.len(),
190            current_memory_bytes: self.current_memory.load(Ordering::Relaxed),
191        }
192    }
193
194    /// Ensure there's capacity for a new entry
195    fn ensure_capacity(&self, new_entry_size: usize) -> Result<()> {
196        // Check entry count limit
197        while self.cache.len() >= self.config.max_entries {
198            self.evict_one()?;
199        }
200
201        // Check memory limit
202        while self.current_memory.load(Ordering::Relaxed) + new_entry_size
203            > self.config.max_memory_bytes
204        {
205            self.evict_one()?;
206        }
207
208        Ok(())
209    }
210
211    /// Evict one entry based on the configured policy
212    fn evict_one(&self) -> Result<()> {
213        let key_to_evict = match self.config.eviction_policy {
214            EvictionPolicy::LRU => self.find_lru_key(),
215            EvictionPolicy::FIFO => self.find_fifo_key(),
216            EvictionPolicy::LFU => self.find_lfu_key(),
217            EvictionPolicy::None => {
218                warn!("Eviction needed but policy is None");
219                return Ok(());
220            }
221        };
222
223        if let Some(key) = key_to_evict {
224            self.invalidate(&key);
225            self.evictions.fetch_add(1, Ordering::Relaxed);
226            debug!("Evicted entry: {:?}", key);
227        }
228
229        Ok(())
230    }
231
232    /// Find the least recently used entry
233    fn find_lru_key(&self) -> Option<CacheKey> {
234        self.cache
235            .iter()
236            .min_by_key(|entry| entry.last_accessed)
237            .map(|entry| entry.key().clone())
238    }
239
240    /// Find the oldest entry (FIFO)
241    fn find_fifo_key(&self) -> Option<CacheKey> {
242        self.cache
243            .iter()
244            .min_by_key(|entry| entry.created_at)
245            .map(|entry| entry.key().clone())
246    }
247
248    /// Find the least frequently used entry
249    fn find_lfu_key(&self) -> Option<CacheKey> {
250        self.cache
251            .iter()
252            .min_by_key(|entry| entry.access_count)
253            .map(|entry| entry.key().clone())
254    }
255
256    /// Start background task to clean up expired entries
257    fn start_cleanup_task(&self) {
258        let cache = Arc::clone(&self.cache);
259        let current_memory = Arc::clone(&self.current_memory);
260
261        tokio::spawn(async move {
262            let mut cleanup_interval = interval(Duration::from_secs(60));
263
264            loop {
265                cleanup_interval.tick().await;
266
267                let expired_keys: Vec<CacheKey> = cache
268                    .iter()
269                    .filter(|entry| entry.is_expired())
270                    .map(|entry| entry.key().clone())
271                    .collect();
272
273                if !expired_keys.is_empty() {
274                    for key in &expired_keys {
275                        if let Some((_, entry)) = cache.remove(key) {
276                            current_memory.fetch_sub(entry.size_bytes, Ordering::Relaxed);
277                        }
278                    }
279                    info!("Cleaned up {} expired cache entries", expired_keys.len());
280                }
281            }
282        });
283
284        info!("Started background cache cleanup task");
285    }
286
287    /// Get all cache entries (useful for testing/debugging)
288    pub fn entries(&self) -> Vec<CacheEntry> {
289        self.cache
290            .iter()
291            .map(|entry| entry.value().clone())
292            .collect()
293    }
294
295    /// Check if cache contains a key
296    pub fn contains_key(&self, key: &CacheKey) -> bool {
297        self.cache.contains_key(key)
298    }
299
300    /// Get current cache size
301    pub fn len(&self) -> usize {
302        self.cache.len()
303    }
304
305    /// Check if cache is empty
306    pub fn is_empty(&self) -> bool {
307        self.cache.is_empty()
308    }
309}
310
311impl Clone for CacheManager {
312    fn clone(&self) -> Self {
313        Self {
314            config: self.config.clone(),
315            cache: Arc::clone(&self.cache),
316            hits: Arc::clone(&self.hits),
317            misses: Arc::clone(&self.misses),
318            evictions: Arc::clone(&self.evictions),
319            current_memory: Arc::clone(&self.current_memory),
320        }
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use serde_json::json;
328
329    #[tokio::test]
330    async fn test_cache_basic_operations() {
331        let config = CacheConfig {
332            max_entries: 100,
333            max_memory_bytes: 1024 * 1024,
334            default_ttl: None,
335            eviction_policy: EvictionPolicy::LRU,
336            enable_background_cleanup: false,
337        };
338
339        let cache = CacheManager::new(config).await.unwrap();
340
341        let key = CacheKey::new("node1", &json!({"x": 10}));
342        let value = json!({"result": 42});
343
344        // Should be a miss initially
345        assert!(cache.get(&key).is_none());
346
347        // Put value
348        cache.put(key.clone(), value.clone(), None).unwrap();
349
350        // Should be a hit now
351        assert_eq!(cache.get(&key), Some(value));
352
353        // Check stats
354        let stats = cache.stats();
355        assert_eq!(stats.hits, 1);
356        assert_eq!(stats.misses, 1);
357    }
358
359    #[tokio::test]
360    async fn test_cache_expiration() {
361        let config = CacheConfig {
362            max_entries: 100,
363            max_memory_bytes: 1024 * 1024,
364            default_ttl: Some(Duration::from_millis(50)),
365            eviction_policy: EvictionPolicy::LRU,
366            enable_background_cleanup: false,
367        };
368
369        let cache = CacheManager::new(config).await.unwrap();
370
371        let key = CacheKey::new("node1", &json!({"x": 10}));
372        let value = json!({"result": 42});
373
374        cache.put(key.clone(), value.clone(), None).unwrap();
375
376        // Should exist initially
377        assert!(cache.get(&key).is_some());
378
379        // Wait for expiration
380        tokio::time::sleep(Duration::from_millis(100)).await;
381
382        // Should be expired now
383        assert!(cache.get(&key).is_none());
384    }
385
386    #[tokio::test]
387    async fn test_cache_max_entries() {
388        let config = CacheConfig {
389            max_entries: 3,
390            max_memory_bytes: 1024 * 1024,
391            default_ttl: None,
392            eviction_policy: EvictionPolicy::FIFO,
393            enable_background_cleanup: false,
394        };
395
396        let cache = CacheManager::new(config).await.unwrap();
397
398        // Add 4 entries (should evict the first one)
399        for i in 0..4 {
400            let key = CacheKey::new(format!("node{}", i), &json!({"x": i}));
401            let value = json!({"result": i});
402            cache.put(key, value, None).unwrap();
403        }
404
405        // Should have exactly 3 entries
406        assert_eq!(cache.len(), 3);
407
408        // First entry should be evicted
409        let first_key = CacheKey::new("node0", &json!({"x": 0}));
410        assert!(!cache.contains_key(&first_key));
411    }
412
413    #[tokio::test]
414    async fn test_cache_invalidation() {
415        let config = CacheConfig::default();
416        let cache = CacheManager::new(config).await.unwrap();
417
418        let key1 = CacheKey::new("node1", &json!({"x": 10}));
419        let key2 = CacheKey::new("node1", &json!({"x": 20}));
420
421        cache.put(key1.clone(), json!({"result": 1}), None).unwrap();
422        cache.put(key2.clone(), json!({"result": 2}), None).unwrap();
423
424        assert_eq!(cache.len(), 2);
425
426        // Invalidate by node
427        let count = cache.invalidate_node("node1");
428        assert_eq!(count, 2);
429        assert_eq!(cache.len(), 0);
430    }
431
432    #[tokio::test]
433    async fn test_cache_stats() {
434        let config = CacheConfig::default();
435        let cache = CacheManager::new(config).await.unwrap();
436
437        let key = CacheKey::new("node1", &json!({"x": 10}));
438
439        // Miss
440        cache.get(&key);
441
442        // Put and hit
443        cache.put(key.clone(), json!({"result": 42}), None).unwrap();
444        cache.get(&key);
445        cache.get(&key);
446
447        let stats = cache.stats();
448        assert_eq!(stats.hits, 2);
449        assert_eq!(stats.misses, 1);
450        assert_eq!(stats.current_entries, 1);
451        assert!(stats.hit_rate() > 0.0);
452    }
453}