codeprysm_core/lazy/
cache.rs

1//! Memory Budget Cache
2//!
3//! Provides LRU-based cache management with byte-level memory tracking
4//! for partition eviction in the lazy graph manager.
5//!
6//! Thread-safe via interior mutability using parking_lot::Mutex.
7
8use lru::LruCache;
9use parking_lot::Mutex;
10use std::num::NonZeroUsize;
11
12/// Constants for memory estimation
13const NODE_BASE_SIZE: usize = 512; // Base size of Node struct + typical string data
14const EDGE_BASE_SIZE: usize = 128; // EdgeData + indices
15const GRAPH_OVERHEAD: f64 = 1.4; // petgraph internal overhead factor
16
17/// Default memory budget (512 MB)
18const DEFAULT_MEMORY_BUDGET: usize = 512 * 1024 * 1024;
19
20/// Minimum partitions to keep (avoid thrashing)
21const DEFAULT_MIN_PARTITIONS: usize = 2;
22
23/// Statistics about a loaded partition
24#[derive(Debug, Clone)]
25pub struct PartitionStats {
26    /// Number of nodes in the partition
27    pub node_count: usize,
28    /// Number of edges in the partition
29    pub edge_count: usize,
30    /// Estimated memory footprint in bytes
31    pub estimated_bytes: usize,
32}
33
34impl PartitionStats {
35    /// Create stats for a partition
36    pub fn new(node_count: usize, edge_count: usize) -> Self {
37        let estimated_bytes = estimate_memory(node_count, edge_count);
38        Self {
39            node_count,
40            edge_count,
41            estimated_bytes,
42        }
43    }
44}
45
46/// Cache metrics for monitoring
47#[derive(Debug, Clone, Default)]
48pub struct CacheMetrics {
49    /// Number of cache hits (partition already loaded)
50    pub hits: u64,
51    /// Number of cache misses (partition needed loading)
52    pub misses: u64,
53    /// Number of partitions evicted
54    pub evictions: u64,
55    /// Total bytes evicted
56    pub bytes_evicted: usize,
57}
58
59impl CacheMetrics {
60    /// Get hit rate as a percentage (0.0 - 1.0)
61    pub fn hit_rate(&self) -> f64 {
62        let total = self.hits + self.misses;
63        if total == 0 {
64            0.0
65        } else {
66            self.hits as f64 / total as f64
67        }
68    }
69
70    /// Record a cache hit
71    pub fn record_hit(&mut self) {
72        self.hits += 1;
73    }
74
75    /// Record a cache miss
76    pub fn record_miss(&mut self) {
77        self.misses += 1;
78    }
79
80    /// Record an eviction
81    pub fn record_eviction(&mut self, bytes: usize) {
82        self.evictions += 1;
83        self.bytes_evicted += bytes;
84    }
85}
86
87/// Inner state for MemoryBudgetCache (protected by Mutex)
88struct CacheState {
89    /// Current estimated memory usage in bytes
90    current_memory_bytes: usize,
91
92    /// LRU cache mapping partition ID to its stats
93    /// Most recently used partitions are at the "front"
94    partition_lru: LruCache<String, PartitionStats>,
95
96    /// Cache metrics
97    metrics: CacheMetrics,
98}
99
100/// Memory budget cache with LRU eviction
101///
102/// Tracks loaded partitions and their memory footprint, evicting
103/// least-recently-used partitions when the memory budget is exceeded.
104///
105/// Thread-safe: All methods take `&self` and use interior mutability
106/// via parking_lot::Mutex for concurrent access.
107pub struct MemoryBudgetCache {
108    /// Maximum memory budget in bytes (immutable after construction)
109    max_memory_bytes: usize,
110
111    /// Minimum partitions to keep (immutable after construction)
112    min_partitions: usize,
113
114    /// Mutable state protected by Mutex
115    state: Mutex<CacheState>,
116}
117
118impl MemoryBudgetCache {
119    /// Create a new cache with the given memory budget
120    pub fn new(max_memory_bytes: usize) -> Self {
121        Self {
122            max_memory_bytes,
123            min_partitions: DEFAULT_MIN_PARTITIONS,
124            state: Mutex::new(CacheState {
125                current_memory_bytes: 0,
126                // Use a large cap - we manage eviction ourselves based on bytes
127                partition_lru: LruCache::new(NonZeroUsize::new(10000).unwrap()),
128                metrics: CacheMetrics::default(),
129            }),
130        }
131    }
132
133    /// Create a cache with default memory budget (512 MB)
134    pub fn with_default_budget() -> Self {
135        Self::new(DEFAULT_MEMORY_BUDGET)
136    }
137
138    /// Set the minimum number of partitions to keep
139    pub fn with_min_partitions(mut self, min: usize) -> Self {
140        self.min_partitions = min;
141        self
142    }
143
144    /// Get the memory budget in bytes
145    pub fn max_memory_bytes(&self) -> usize {
146        self.max_memory_bytes
147    }
148
149    /// Get current memory usage in bytes
150    pub fn current_memory_bytes(&self) -> usize {
151        self.state.lock().current_memory_bytes
152    }
153
154    /// Get memory usage as a percentage (0.0 - 1.0)
155    pub fn memory_usage_ratio(&self) -> f64 {
156        if self.max_memory_bytes == 0 {
157            0.0
158        } else {
159            let current = self.state.lock().current_memory_bytes;
160            current as f64 / self.max_memory_bytes as f64
161        }
162    }
163
164    /// Get the number of partitions currently tracked
165    pub fn partition_count(&self) -> usize {
166        self.state.lock().partition_lru.len()
167    }
168
169    /// Get a snapshot of cache metrics
170    pub fn metrics(&self) -> CacheMetrics {
171        self.state.lock().metrics.clone()
172    }
173
174    /// Reset cache metrics
175    pub fn reset_metrics(&self) {
176        self.state.lock().metrics = CacheMetrics::default();
177    }
178
179    /// Check if a partition is in the cache
180    pub fn contains(&self, partition_id: &str) -> bool {
181        self.state.lock().partition_lru.contains(partition_id)
182    }
183
184    /// Mark a partition as accessed (updates LRU order)
185    ///
186    /// Returns true if the partition was found, false otherwise.
187    pub fn touch(&self, partition_id: &str) -> bool {
188        let mut state = self.state.lock();
189        if state.partition_lru.get(partition_id).is_some() {
190            state.metrics.record_hit();
191            true
192        } else {
193            state.metrics.record_miss();
194            false
195        }
196    }
197
198    /// Record that a partition has been loaded
199    ///
200    /// Adds the partition to the cache and updates memory tracking.
201    pub fn record_loaded(&self, partition_id: String, stats: PartitionStats) {
202        let mut state = self.state.lock();
203        state.current_memory_bytes += stats.estimated_bytes;
204        state.partition_lru.put(partition_id, stats);
205    }
206
207    /// Get stats for a partition (if tracked)
208    ///
209    /// Returns a clone of the stats to avoid holding the lock.
210    pub fn get_stats(&self, partition_id: &str) -> Option<PartitionStats> {
211        self.state.lock().partition_lru.peek(partition_id).cloned()
212    }
213
214    /// Remove a partition from the cache
215    ///
216    /// Returns the stats if the partition was tracked, None otherwise.
217    pub fn remove(&self, partition_id: &str) -> Option<PartitionStats> {
218        let mut state = self.state.lock();
219        if let Some(stats) = state.partition_lru.pop(partition_id) {
220            state.current_memory_bytes = state
221                .current_memory_bytes
222                .saturating_sub(stats.estimated_bytes);
223            state.metrics.record_eviction(stats.estimated_bytes);
224            Some(stats)
225        } else {
226            None
227        }
228    }
229
230    /// Check if memory budget is exceeded
231    pub fn is_over_budget(&self) -> bool {
232        self.state.lock().current_memory_bytes > self.max_memory_bytes
233    }
234
235    /// Get partition IDs that should be evicted to make room
236    ///
237    /// Returns partition IDs in LRU order (least recently used first).
238    /// Respects `min_partitions` to avoid thrashing.
239    pub fn get_eviction_candidates(&self) -> Vec<String> {
240        let state = self.state.lock();
241        if state.current_memory_bytes <= self.max_memory_bytes {
242            return vec![];
243        }
244
245        let mut candidates = Vec::new();
246        let mut projected_bytes = state.current_memory_bytes;
247        let mut remaining_partitions = state.partition_lru.len();
248
249        // LruCache::iter() returns MRU first, so .rev() gives us LRU first
250        for (partition_id, stats) in state.partition_lru.iter().rev() {
251            if projected_bytes <= self.max_memory_bytes {
252                break;
253            }
254            if remaining_partitions <= self.min_partitions {
255                break;
256            }
257
258            candidates.push(partition_id.clone());
259            projected_bytes = projected_bytes.saturating_sub(stats.estimated_bytes);
260            remaining_partitions -= 1;
261        }
262
263        candidates
264    }
265
266    /// Get the amount of memory needed to be freed to accommodate a new load
267    pub fn memory_needed_for(&self, additional_bytes: usize) -> usize {
268        let current = self.state.lock().current_memory_bytes;
269        let projected = current + additional_bytes;
270        projected.saturating_sub(self.max_memory_bytes)
271    }
272
273    /// Get partitions to evict to make room for a specific amount of memory
274    ///
275    /// Returns partition IDs in LRU order.
276    pub fn get_eviction_candidates_for(&self, needed_bytes: usize) -> Vec<String> {
277        if needed_bytes == 0 {
278            return vec![];
279        }
280
281        let state = self.state.lock();
282        let mut candidates = Vec::new();
283        let mut bytes_freed = 0usize;
284        let mut remaining_partitions = state.partition_lru.len();
285
286        // LruCache::iter() returns MRU first, so .rev() gives us LRU first
287        for (partition_id, stats) in state.partition_lru.iter().rev() {
288            if bytes_freed >= needed_bytes {
289                break;
290            }
291            if remaining_partitions <= self.min_partitions {
292                break;
293            }
294
295            candidates.push(partition_id.clone());
296            bytes_freed += stats.estimated_bytes;
297            remaining_partitions -= 1;
298        }
299
300        candidates
301    }
302
303    /// Clear all tracked partitions
304    pub fn clear(&self) {
305        let mut state = self.state.lock();
306        state.partition_lru.clear();
307        state.current_memory_bytes = 0;
308    }
309}
310
311impl Default for MemoryBudgetCache {
312    fn default() -> Self {
313        Self::with_default_budget()
314    }
315}
316
317/// Estimate memory footprint for a partition
318///
319/// Uses conservative estimates for Node and EdgeData sizes.
320pub fn estimate_memory(node_count: usize, edge_count: usize) -> usize {
321    let base = node_count * NODE_BASE_SIZE + edge_count * EDGE_BASE_SIZE;
322    (base as f64 * GRAPH_OVERHEAD) as usize
323}
324
325/// Estimate memory footprint with custom size factors
326pub fn estimate_memory_custom(
327    node_count: usize,
328    edge_count: usize,
329    node_size: usize,
330    edge_size: usize,
331    overhead: f64,
332) -> usize {
333    let base = node_count * node_size + edge_count * edge_size;
334    (base as f64 * overhead) as usize
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn test_memory_estimation() {
343        // 1000 nodes, 500 edges
344        let bytes = estimate_memory(1000, 500);
345        // Expected: (1000 * 512 + 500 * 128) * 1.4 = (512000 + 64000) * 1.4 = 806400
346        assert_eq!(bytes, 806400);
347    }
348
349    #[test]
350    fn test_cache_new() {
351        let cache = MemoryBudgetCache::new(1024 * 1024); // 1 MB
352        assert_eq!(cache.max_memory_bytes(), 1024 * 1024);
353        assert_eq!(cache.current_memory_bytes(), 0);
354        assert_eq!(cache.partition_count(), 0);
355    }
356
357    #[test]
358    fn test_cache_record_loaded() {
359        let cache = MemoryBudgetCache::new(10_000_000); // 10 MB
360
361        let stats = PartitionStats::new(100, 50);
362        let expected_bytes = stats.estimated_bytes;
363
364        cache.record_loaded("partition1".to_string(), stats);
365
366        assert_eq!(cache.partition_count(), 1);
367        assert_eq!(cache.current_memory_bytes(), expected_bytes);
368        assert!(cache.contains("partition1"));
369    }
370
371    #[test]
372    fn test_cache_touch() {
373        let cache = MemoryBudgetCache::new(10_000_000);
374
375        cache.record_loaded("p1".to_string(), PartitionStats::new(100, 50));
376        cache.record_loaded("p2".to_string(), PartitionStats::new(100, 50));
377
378        // Touch should update LRU order and record hit
379        assert!(cache.touch("p1"));
380        assert_eq!(cache.metrics().hits, 1);
381
382        // Miss for non-existent partition
383        assert!(!cache.touch("p3"));
384        assert_eq!(cache.metrics().misses, 1);
385    }
386
387    #[test]
388    fn test_cache_remove() {
389        let cache = MemoryBudgetCache::new(10_000_000);
390
391        let stats = PartitionStats::new(100, 50);
392        let bytes = stats.estimated_bytes;
393
394        cache.record_loaded("p1".to_string(), stats);
395        assert_eq!(cache.current_memory_bytes(), bytes);
396
397        let removed = cache.remove("p1");
398        assert!(removed.is_some());
399        assert_eq!(cache.current_memory_bytes(), 0);
400        assert!(!cache.contains("p1"));
401        assert_eq!(cache.metrics().evictions, 1);
402    }
403
404    #[test]
405    fn test_cache_over_budget() {
406        let cache = MemoryBudgetCache::new(100_000); // 100 KB budget
407
408        // Add partition that's under budget
409        cache.record_loaded("p1".to_string(), PartitionStats::new(50, 25)); // ~50KB
410        assert!(!cache.is_over_budget());
411
412        // Add partition that pushes over budget
413        cache.record_loaded("p2".to_string(), PartitionStats::new(100, 50)); // ~100KB
414        assert!(cache.is_over_budget());
415    }
416
417    #[test]
418    fn test_cache_eviction_candidates() {
419        // Each partition is ~24KB (30*512 + 15*128) * 1.4 = ~24192 bytes
420        // Set budget to 50KB so 3 partitions (~72KB) will exceed it
421        let cache = MemoryBudgetCache::new(50_000).with_min_partitions(1);
422
423        // Add several partitions (in order: p1 oldest, p2, p3 newest)
424        cache.record_loaded("p1".to_string(), PartitionStats::new(30, 15));
425        cache.record_loaded("p2".to_string(), PartitionStats::new(30, 15));
426        cache.record_loaded("p3".to_string(), PartitionStats::new(30, 15));
427
428        // Should be over budget
429        assert!(cache.is_over_budget());
430
431        // Touch p1 to make it most recently used
432        cache.touch("p1");
433
434        // Now LRU order is: p2 (oldest/LRU), p3, p1 (newest/MRU)
435        // Get candidates when over budget - should evict oldest first
436        let candidates = cache.get_eviction_candidates();
437        assert!(!candidates.is_empty());
438        // p2 is the oldest (LRU) after touching p1
439        assert_eq!(candidates[0], "p2");
440    }
441
442    #[test]
443    fn test_cache_eviction_candidates_for() {
444        let cache = MemoryBudgetCache::new(1_000_000).with_min_partitions(1);
445
446        // Add partitions with known sizes
447        let stats1 = PartitionStats::new(100, 50); // ~80KB
448        let stats2 = PartitionStats::new(100, 50);
449
450        cache.record_loaded("p1".to_string(), stats1.clone());
451        cache.record_loaded("p2".to_string(), stats2.clone());
452
453        // Need to free 50KB
454        let candidates = cache.get_eviction_candidates_for(50_000);
455        assert!(!candidates.is_empty());
456    }
457
458    #[test]
459    fn test_cache_min_partitions() {
460        let cache = MemoryBudgetCache::new(10_000).with_min_partitions(2);
461
462        // Add 3 partitions that exceed budget
463        cache.record_loaded("p1".to_string(), PartitionStats::new(50, 25));
464        cache.record_loaded("p2".to_string(), PartitionStats::new(50, 25));
465        cache.record_loaded("p3".to_string(), PartitionStats::new(50, 25));
466
467        // Should be over budget
468        assert!(cache.is_over_budget());
469
470        // But should only suggest evicting down to min_partitions (2)
471        let candidates = cache.get_eviction_candidates();
472        assert_eq!(candidates.len(), 1); // Can only evict 1 (3 - 2 = 1)
473    }
474
475    #[test]
476    fn test_cache_metrics() {
477        let cache = MemoryBudgetCache::new(10_000_000);
478
479        cache.record_loaded("p1".to_string(), PartitionStats::new(100, 50));
480
481        // Record some hits and misses
482        cache.touch("p1"); // hit
483        cache.touch("p1"); // hit
484        cache.touch("p2"); // miss
485
486        assert_eq!(cache.metrics().hits, 2);
487        assert_eq!(cache.metrics().misses, 1);
488        assert!((cache.metrics().hit_rate() - 0.666).abs() < 0.01);
489
490        // Remove and check eviction metrics
491        cache.remove("p1");
492        assert_eq!(cache.metrics().evictions, 1);
493    }
494
495    #[test]
496    fn test_cache_clear() {
497        let cache = MemoryBudgetCache::new(10_000_000);
498
499        cache.record_loaded("p1".to_string(), PartitionStats::new(100, 50));
500        cache.record_loaded("p2".to_string(), PartitionStats::new(100, 50));
501
502        cache.clear();
503
504        assert_eq!(cache.partition_count(), 0);
505        assert_eq!(cache.current_memory_bytes(), 0);
506    }
507
508    #[test]
509    fn test_memory_usage_ratio() {
510        let cache = MemoryBudgetCache::new(1_000_000); // 1 MB
511
512        assert_eq!(cache.memory_usage_ratio(), 0.0);
513
514        // Add ~500KB worth
515        cache.record_loaded("p1".to_string(), PartitionStats::new(500, 250));
516
517        let ratio = cache.memory_usage_ratio();
518        assert!(ratio > 0.0 && ratio < 1.0);
519    }
520}