rust_logic_graph/cache/
cache_manager.rs

1//! Cache manager implementation
2
3use super::{CacheEntry, CacheKey, EvictionPolicy};
4use anyhow::Result;
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::interval;
11use tracing::{debug, info, warn};
12
13/// Configuration for the cache manager
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CacheConfig {
16    /// Maximum number of entries in the cache
17    pub max_entries: usize,
18    /// Maximum memory usage in bytes (approximate)
19    pub max_memory_bytes: usize,
20    /// Default TTL for cache entries (None = no expiration)
21    pub default_ttl: Option<Duration>,
22    /// Eviction policy when limits are reached
23    pub eviction_policy: EvictionPolicy,
24    /// Enable background cleanup task for expired entries
25    pub enable_background_cleanup: bool,
26}
27
28impl Default for CacheConfig {
29    fn default() -> Self {
30        Self {
31            max_entries: 10000,
32            max_memory_bytes: 100 * 1024 * 1024, // 100MB
33            default_ttl: Some(Duration::from_secs(300)), // 5 minutes
34            eviction_policy: EvictionPolicy::LRU,
35            enable_background_cleanup: true,
36        }
37    }
38}
39
40/// Statistics about cache usage
41#[derive(Debug, Clone, Default)]
42pub struct CacheStats {
43    pub hits: u64,
44    pub misses: u64,
45    pub evictions: u64,
46    pub current_entries: usize,
47    pub current_memory_bytes: usize,
48}
49
50impl CacheStats {
51    /// Calculate hit rate as a percentage
52    pub fn hit_rate(&self) -> f64 {
53        let total = self.hits + self.misses;
54        if total == 0 {
55            0.0
56        } else {
57            (self.hits as f64 / total as f64) * 100.0
58        }
59    }
60}
61
62/// Thread-safe cache manager for node execution results
63pub struct CacheManager {
64    config: CacheConfig,
65    cache: Arc<DashMap<CacheKey, CacheEntry>>,
66    hits: Arc<AtomicU64>,
67    misses: Arc<AtomicU64>,
68    evictions: Arc<AtomicU64>,
69    current_memory: Arc<AtomicUsize>,
70}
71
72impl CacheManager {
73    /// Create a new cache manager with the given configuration
74    pub async fn new(config: CacheConfig) -> Result<Self> {
75        info!("Initializing cache manager with config: {:?}", config);
76
77        let manager = Self {
78            config: config.clone(),
79            cache: Arc::new(DashMap::new()),
80            hits: Arc::new(AtomicU64::new(0)),
81            misses: Arc::new(AtomicU64::new(0)),
82            evictions: Arc::new(AtomicU64::new(0)),
83            current_memory: Arc::new(AtomicUsize::new(0)),
84        };
85
86        // Start background cleanup task if enabled
87        if config.enable_background_cleanup {
88            manager.start_cleanup_task();
89        }
90
91        Ok(manager)
92    }
93
94    /// Get a value from the cache
95    pub fn get(&self, key: &CacheKey) -> Option<serde_json::Value> {
96        match self.cache.get_mut(key) {
97            Some(mut entry) => {
98                // Check if expired
99                if entry.is_expired() {
100                    drop(entry); // Release the lock before removing
101                    self.invalidate(key);
102                    self.misses.fetch_add(1, Ordering::Relaxed);
103                    debug!("Cache miss (expired): {:?}", key);
104                    return None;
105                }
106
107                // Update access metadata
108                entry.mark_accessed();
109                let value = entry.value.clone();
110                
111                self.hits.fetch_add(1, Ordering::Relaxed);
112                debug!("Cache hit: {:?}", key);
113                Some(value)
114            }
115            None => {
116                self.misses.fetch_add(1, Ordering::Relaxed);
117                debug!("Cache miss (not found): {:?}", key);
118                None
119            }
120        }
121    }
122
123    /// Put a value into the cache
124    pub fn put(
125        &self,
126        key: CacheKey,
127        value: serde_json::Value,
128        ttl: Option<Duration>,
129    ) -> Result<()> {
130        let ttl = ttl.or(self.config.default_ttl);
131        let entry = CacheEntry::new(key.clone(), value, ttl);
132        let entry_size = entry.size_bytes;
133
134        // Check if we need to evict entries
135        self.ensure_capacity(entry_size)?;
136
137        // Insert the new entry
138        self.current_memory.fetch_add(entry_size, Ordering::Relaxed);
139        self.cache.insert(key.clone(), entry);
140
141        debug!("Cached entry: {:?} ({} bytes)", key, entry_size);
142        Ok(())
143    }
144
145    /// Invalidate (remove) a specific cache entry
146    pub fn invalidate(&self, key: &CacheKey) -> bool {
147        if let Some((_, entry)) = self.cache.remove(key) {
148            self.current_memory.fetch_sub(entry.size_bytes, Ordering::Relaxed);
149            debug!("Invalidated cache entry: {:?}", key);
150            true
151        } else {
152            false
153        }
154    }
155
156    /// Invalidate all entries for a specific node
157    pub fn invalidate_node(&self, node_id: &str) -> usize {
158        let keys_to_remove: Vec<CacheKey> = self
159            .cache
160            .iter()
161            .filter(|entry| entry.key().node_id == node_id)
162            .map(|entry| entry.key().clone())
163            .collect();
164
165        let count = keys_to_remove.len();
166        for key in keys_to_remove {
167            self.invalidate(&key);
168        }
169
170        info!("Invalidated {} entries for node '{}'", count, node_id);
171        count
172    }
173
174    /// Clear all cache entries
175    pub fn clear(&self) {
176        let count = self.cache.len();
177        self.cache.clear();
178        self.current_memory.store(0, Ordering::Relaxed);
179        info!("Cleared cache ({} entries)", count);
180    }
181
182    /// Get cache statistics
183    pub fn stats(&self) -> CacheStats {
184        CacheStats {
185            hits: self.hits.load(Ordering::Relaxed),
186            misses: self.misses.load(Ordering::Relaxed),
187            evictions: self.evictions.load(Ordering::Relaxed),
188            current_entries: self.cache.len(),
189            current_memory_bytes: self.current_memory.load(Ordering::Relaxed),
190        }
191    }
192
193    /// Ensure there's capacity for a new entry
194    fn ensure_capacity(&self, new_entry_size: usize) -> Result<()> {
195        // Check entry count limit
196        while self.cache.len() >= self.config.max_entries {
197            self.evict_one()?;
198        }
199
200        // Check memory limit
201        while self.current_memory.load(Ordering::Relaxed) + new_entry_size
202            > self.config.max_memory_bytes
203        {
204            self.evict_one()?;
205        }
206
207        Ok(())
208    }
209
210    /// Evict one entry based on the configured policy
211    fn evict_one(&self) -> Result<()> {
212        let key_to_evict = match self.config.eviction_policy {
213            EvictionPolicy::LRU => self.find_lru_key(),
214            EvictionPolicy::FIFO => self.find_fifo_key(),
215            EvictionPolicy::LFU => self.find_lfu_key(),
216            EvictionPolicy::None => {
217                warn!("Eviction needed but policy is None");
218                return Ok(());
219            }
220        };
221
222        if let Some(key) = key_to_evict {
223            self.invalidate(&key);
224            self.evictions.fetch_add(1, Ordering::Relaxed);
225            debug!("Evicted entry: {:?}", key);
226        }
227
228        Ok(())
229    }
230
231    /// Find the least recently used entry
232    fn find_lru_key(&self) -> Option<CacheKey> {
233        self.cache
234            .iter()
235            .min_by_key(|entry| entry.last_accessed)
236            .map(|entry| entry.key().clone())
237    }
238
239    /// Find the oldest entry (FIFO)
240    fn find_fifo_key(&self) -> Option<CacheKey> {
241        self.cache
242            .iter()
243            .min_by_key(|entry| entry.created_at)
244            .map(|entry| entry.key().clone())
245    }
246
247    /// Find the least frequently used entry
248    fn find_lfu_key(&self) -> Option<CacheKey> {
249        self.cache
250            .iter()
251            .min_by_key(|entry| entry.access_count)
252            .map(|entry| entry.key().clone())
253    }
254
255    /// Start background task to clean up expired entries
256    fn start_cleanup_task(&self) {
257        let cache = Arc::clone(&self.cache);
258        let current_memory = Arc::clone(&self.current_memory);
259
260        tokio::spawn(async move {
261            let mut cleanup_interval = interval(Duration::from_secs(60));
262
263            loop {
264                cleanup_interval.tick().await;
265
266                let expired_keys: Vec<CacheKey> = cache
267                    .iter()
268                    .filter(|entry| entry.is_expired())
269                    .map(|entry| entry.key().clone())
270                    .collect();
271
272                if !expired_keys.is_empty() {
273                    for key in &expired_keys {
274                        if let Some((_, entry)) = cache.remove(key) {
275                            current_memory.fetch_sub(entry.size_bytes, Ordering::Relaxed);
276                        }
277                    }
278                    info!("Cleaned up {} expired cache entries", expired_keys.len());
279                }
280            }
281        });
282
283        info!("Started background cache cleanup task");
284    }
285
286    /// Get all cache entries (useful for testing/debugging)
287    pub fn entries(&self) -> Vec<CacheEntry> {
288        self.cache
289            .iter()
290            .map(|entry| entry.value().clone())
291            .collect()
292    }
293
294    /// Check if cache contains a key
295    pub fn contains_key(&self, key: &CacheKey) -> bool {
296        self.cache.contains_key(key)
297    }
298
299    /// Get current cache size
300    pub fn len(&self) -> usize {
301        self.cache.len()
302    }
303
304    /// Check if cache is empty
305    pub fn is_empty(&self) -> bool {
306        self.cache.is_empty()
307    }
308}
309
310impl Clone for CacheManager {
311    fn clone(&self) -> Self {
312        Self {
313            config: self.config.clone(),
314            cache: Arc::clone(&self.cache),
315            hits: Arc::clone(&self.hits),
316            misses: Arc::clone(&self.misses),
317            evictions: Arc::clone(&self.evictions),
318            current_memory: Arc::clone(&self.current_memory),
319        }
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use serde_json::json;
327
328    #[tokio::test]
329    async fn test_cache_basic_operations() {
330        let config = CacheConfig {
331            max_entries: 100,
332            max_memory_bytes: 1024 * 1024,
333            default_ttl: None,
334            eviction_policy: EvictionPolicy::LRU,
335            enable_background_cleanup: false,
336        };
337
338        let cache = CacheManager::new(config).await.unwrap();
339
340        let key = CacheKey::new("node1", &json!({"x": 10}));
341        let value = json!({"result": 42});
342
343        // Should be a miss initially
344        assert!(cache.get(&key).is_none());
345
346        // Put value
347        cache.put(key.clone(), value.clone(), None).unwrap();
348
349        // Should be a hit now
350        assert_eq!(cache.get(&key), Some(value));
351
352        // Check stats
353        let stats = cache.stats();
354        assert_eq!(stats.hits, 1);
355        assert_eq!(stats.misses, 1);
356    }
357
358    #[tokio::test]
359    async fn test_cache_expiration() {
360        let config = CacheConfig {
361            max_entries: 100,
362            max_memory_bytes: 1024 * 1024,
363            default_ttl: Some(Duration::from_millis(50)),
364            eviction_policy: EvictionPolicy::LRU,
365            enable_background_cleanup: false,
366        };
367
368        let cache = CacheManager::new(config).await.unwrap();
369
370        let key = CacheKey::new("node1", &json!({"x": 10}));
371        let value = json!({"result": 42});
372
373        cache.put(key.clone(), value.clone(), None).unwrap();
374
375        // Should exist initially
376        assert!(cache.get(&key).is_some());
377
378        // Wait for expiration
379        tokio::time::sleep(Duration::from_millis(100)).await;
380
381        // Should be expired now
382        assert!(cache.get(&key).is_none());
383    }
384
385    #[tokio::test]
386    async fn test_cache_max_entries() {
387        let config = CacheConfig {
388            max_entries: 3,
389            max_memory_bytes: 1024 * 1024,
390            default_ttl: None,
391            eviction_policy: EvictionPolicy::FIFO,
392            enable_background_cleanup: false,
393        };
394
395        let cache = CacheManager::new(config).await.unwrap();
396
397        // Add 4 entries (should evict the first one)
398        for i in 0..4 {
399            let key = CacheKey::new(format!("node{}", i), &json!({"x": i}));
400            let value = json!({"result": i});
401            cache.put(key, value, None).unwrap();
402        }
403
404        // Should have exactly 3 entries
405        assert_eq!(cache.len(), 3);
406
407        // First entry should be evicted
408        let first_key = CacheKey::new("node0", &json!({"x": 0}));
409        assert!(!cache.contains_key(&first_key));
410    }
411
412    #[tokio::test]
413    async fn test_cache_invalidation() {
414        let config = CacheConfig::default();
415        let cache = CacheManager::new(config).await.unwrap();
416
417        let key1 = CacheKey::new("node1", &json!({"x": 10}));
418        let key2 = CacheKey::new("node1", &json!({"x": 20}));
419
420        cache.put(key1.clone(), json!({"result": 1}), None).unwrap();
421        cache.put(key2.clone(), json!({"result": 2}), None).unwrap();
422
423        assert_eq!(cache.len(), 2);
424
425        // Invalidate by node
426        let count = cache.invalidate_node("node1");
427        assert_eq!(count, 2);
428        assert_eq!(cache.len(), 0);
429    }
430
431    #[tokio::test]
432    async fn test_cache_stats() {
433        let config = CacheConfig::default();
434        let cache = CacheManager::new(config).await.unwrap();
435
436        let key = CacheKey::new("node1", &json!({"x": 10}));
437
438        // Miss
439        cache.get(&key);
440
441        // Put and hit
442        cache.put(key.clone(), json!({"result": 42}), None).unwrap();
443        cache.get(&key);
444        cache.get(&key);
445
446        let stats = cache.stats();
447        assert_eq!(stats.hits, 2);
448        assert_eq!(stats.misses, 1);
449        assert_eq!(stats.current_entries, 1);
450        assert!(stats.hit_rate() > 0.0);
451    }
452}