1use lru::LruCache;
9use parking_lot::Mutex;
10use std::num::NonZeroUsize;
11
12const NODE_BASE_SIZE: usize = 512; const EDGE_BASE_SIZE: usize = 128; const GRAPH_OVERHEAD: f64 = 1.4; const DEFAULT_MEMORY_BUDGET: usize = 512 * 1024 * 1024;
19
20const DEFAULT_MIN_PARTITIONS: usize = 2;
22
23#[derive(Debug, Clone)]
25pub struct PartitionStats {
26 pub node_count: usize,
28 pub edge_count: usize,
30 pub estimated_bytes: usize,
32}
33
34impl PartitionStats {
35 pub fn new(node_count: usize, edge_count: usize) -> Self {
37 let estimated_bytes = estimate_memory(node_count, edge_count);
38 Self {
39 node_count,
40 edge_count,
41 estimated_bytes,
42 }
43 }
44}
45
46#[derive(Debug, Clone, Default)]
48pub struct CacheMetrics {
49 pub hits: u64,
51 pub misses: u64,
53 pub evictions: u64,
55 pub bytes_evicted: usize,
57}
58
59impl CacheMetrics {
60 pub fn hit_rate(&self) -> f64 {
62 let total = self.hits + self.misses;
63 if total == 0 {
64 0.0
65 } else {
66 self.hits as f64 / total as f64
67 }
68 }
69
70 pub fn record_hit(&mut self) {
72 self.hits += 1;
73 }
74
75 pub fn record_miss(&mut self) {
77 self.misses += 1;
78 }
79
80 pub fn record_eviction(&mut self, bytes: usize) {
82 self.evictions += 1;
83 self.bytes_evicted += bytes;
84 }
85}
86
87struct CacheState {
89 current_memory_bytes: usize,
91
92 partition_lru: LruCache<String, PartitionStats>,
95
96 metrics: CacheMetrics,
98}
99
100pub struct MemoryBudgetCache {
108 max_memory_bytes: usize,
110
111 min_partitions: usize,
113
114 state: Mutex<CacheState>,
116}
117
118impl MemoryBudgetCache {
119 pub fn new(max_memory_bytes: usize) -> Self {
121 Self {
122 max_memory_bytes,
123 min_partitions: DEFAULT_MIN_PARTITIONS,
124 state: Mutex::new(CacheState {
125 current_memory_bytes: 0,
126 partition_lru: LruCache::new(NonZeroUsize::new(10000).unwrap()),
128 metrics: CacheMetrics::default(),
129 }),
130 }
131 }
132
133 pub fn with_default_budget() -> Self {
135 Self::new(DEFAULT_MEMORY_BUDGET)
136 }
137
138 pub fn with_min_partitions(mut self, min: usize) -> Self {
140 self.min_partitions = min;
141 self
142 }
143
144 pub fn max_memory_bytes(&self) -> usize {
146 self.max_memory_bytes
147 }
148
149 pub fn current_memory_bytes(&self) -> usize {
151 self.state.lock().current_memory_bytes
152 }
153
154 pub fn memory_usage_ratio(&self) -> f64 {
156 if self.max_memory_bytes == 0 {
157 0.0
158 } else {
159 let current = self.state.lock().current_memory_bytes;
160 current as f64 / self.max_memory_bytes as f64
161 }
162 }
163
164 pub fn partition_count(&self) -> usize {
166 self.state.lock().partition_lru.len()
167 }
168
169 pub fn metrics(&self) -> CacheMetrics {
171 self.state.lock().metrics.clone()
172 }
173
174 pub fn reset_metrics(&self) {
176 self.state.lock().metrics = CacheMetrics::default();
177 }
178
179 pub fn contains(&self, partition_id: &str) -> bool {
181 self.state.lock().partition_lru.contains(partition_id)
182 }
183
184 pub fn touch(&self, partition_id: &str) -> bool {
188 let mut state = self.state.lock();
189 if state.partition_lru.get(partition_id).is_some() {
190 state.metrics.record_hit();
191 true
192 } else {
193 state.metrics.record_miss();
194 false
195 }
196 }
197
198 pub fn record_loaded(&self, partition_id: String, stats: PartitionStats) {
202 let mut state = self.state.lock();
203 state.current_memory_bytes += stats.estimated_bytes;
204 state.partition_lru.put(partition_id, stats);
205 }
206
207 pub fn get_stats(&self, partition_id: &str) -> Option<PartitionStats> {
211 self.state.lock().partition_lru.peek(partition_id).cloned()
212 }
213
214 pub fn remove(&self, partition_id: &str) -> Option<PartitionStats> {
218 let mut state = self.state.lock();
219 if let Some(stats) = state.partition_lru.pop(partition_id) {
220 state.current_memory_bytes = state
221 .current_memory_bytes
222 .saturating_sub(stats.estimated_bytes);
223 state.metrics.record_eviction(stats.estimated_bytes);
224 Some(stats)
225 } else {
226 None
227 }
228 }
229
230 pub fn is_over_budget(&self) -> bool {
232 self.state.lock().current_memory_bytes > self.max_memory_bytes
233 }
234
235 pub fn get_eviction_candidates(&self) -> Vec<String> {
240 let state = self.state.lock();
241 if state.current_memory_bytes <= self.max_memory_bytes {
242 return vec![];
243 }
244
245 let mut candidates = Vec::new();
246 let mut projected_bytes = state.current_memory_bytes;
247 let mut remaining_partitions = state.partition_lru.len();
248
249 for (partition_id, stats) in state.partition_lru.iter().rev() {
251 if projected_bytes <= self.max_memory_bytes {
252 break;
253 }
254 if remaining_partitions <= self.min_partitions {
255 break;
256 }
257
258 candidates.push(partition_id.clone());
259 projected_bytes = projected_bytes.saturating_sub(stats.estimated_bytes);
260 remaining_partitions -= 1;
261 }
262
263 candidates
264 }
265
266 pub fn memory_needed_for(&self, additional_bytes: usize) -> usize {
268 let current = self.state.lock().current_memory_bytes;
269 let projected = current + additional_bytes;
270 projected.saturating_sub(self.max_memory_bytes)
271 }
272
273 pub fn get_eviction_candidates_for(&self, needed_bytes: usize) -> Vec<String> {
277 if needed_bytes == 0 {
278 return vec![];
279 }
280
281 let state = self.state.lock();
282 let mut candidates = Vec::new();
283 let mut bytes_freed = 0usize;
284 let mut remaining_partitions = state.partition_lru.len();
285
286 for (partition_id, stats) in state.partition_lru.iter().rev() {
288 if bytes_freed >= needed_bytes {
289 break;
290 }
291 if remaining_partitions <= self.min_partitions {
292 break;
293 }
294
295 candidates.push(partition_id.clone());
296 bytes_freed += stats.estimated_bytes;
297 remaining_partitions -= 1;
298 }
299
300 candidates
301 }
302
303 pub fn clear(&self) {
305 let mut state = self.state.lock();
306 state.partition_lru.clear();
307 state.current_memory_bytes = 0;
308 }
309}
310
311impl Default for MemoryBudgetCache {
312 fn default() -> Self {
313 Self::with_default_budget()
314 }
315}
316
317pub fn estimate_memory(node_count: usize, edge_count: usize) -> usize {
321 let base = node_count * NODE_BASE_SIZE + edge_count * EDGE_BASE_SIZE;
322 (base as f64 * GRAPH_OVERHEAD) as usize
323}
324
325pub fn estimate_memory_custom(
327 node_count: usize,
328 edge_count: usize,
329 node_size: usize,
330 edge_size: usize,
331 overhead: f64,
332) -> usize {
333 let base = node_count * node_size + edge_count * edge_size;
334 (base as f64 * overhead) as usize
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn test_memory_estimation() {
343 let bytes = estimate_memory(1000, 500);
345 assert_eq!(bytes, 806400);
347 }
348
349 #[test]
350 fn test_cache_new() {
351 let cache = MemoryBudgetCache::new(1024 * 1024); assert_eq!(cache.max_memory_bytes(), 1024 * 1024);
353 assert_eq!(cache.current_memory_bytes(), 0);
354 assert_eq!(cache.partition_count(), 0);
355 }
356
357 #[test]
358 fn test_cache_record_loaded() {
359 let cache = MemoryBudgetCache::new(10_000_000); let stats = PartitionStats::new(100, 50);
362 let expected_bytes = stats.estimated_bytes;
363
364 cache.record_loaded("partition1".to_string(), stats);
365
366 assert_eq!(cache.partition_count(), 1);
367 assert_eq!(cache.current_memory_bytes(), expected_bytes);
368 assert!(cache.contains("partition1"));
369 }
370
371 #[test]
372 fn test_cache_touch() {
373 let cache = MemoryBudgetCache::new(10_000_000);
374
375 cache.record_loaded("p1".to_string(), PartitionStats::new(100, 50));
376 cache.record_loaded("p2".to_string(), PartitionStats::new(100, 50));
377
378 assert!(cache.touch("p1"));
380 assert_eq!(cache.metrics().hits, 1);
381
382 assert!(!cache.touch("p3"));
384 assert_eq!(cache.metrics().misses, 1);
385 }
386
387 #[test]
388 fn test_cache_remove() {
389 let cache = MemoryBudgetCache::new(10_000_000);
390
391 let stats = PartitionStats::new(100, 50);
392 let bytes = stats.estimated_bytes;
393
394 cache.record_loaded("p1".to_string(), stats);
395 assert_eq!(cache.current_memory_bytes(), bytes);
396
397 let removed = cache.remove("p1");
398 assert!(removed.is_some());
399 assert_eq!(cache.current_memory_bytes(), 0);
400 assert!(!cache.contains("p1"));
401 assert_eq!(cache.metrics().evictions, 1);
402 }
403
404 #[test]
405 fn test_cache_over_budget() {
406 let cache = MemoryBudgetCache::new(100_000); cache.record_loaded("p1".to_string(), PartitionStats::new(50, 25)); assert!(!cache.is_over_budget());
411
412 cache.record_loaded("p2".to_string(), PartitionStats::new(100, 50)); assert!(cache.is_over_budget());
415 }
416
417 #[test]
418 fn test_cache_eviction_candidates() {
419 let cache = MemoryBudgetCache::new(50_000).with_min_partitions(1);
422
423 cache.record_loaded("p1".to_string(), PartitionStats::new(30, 15));
425 cache.record_loaded("p2".to_string(), PartitionStats::new(30, 15));
426 cache.record_loaded("p3".to_string(), PartitionStats::new(30, 15));
427
428 assert!(cache.is_over_budget());
430
431 cache.touch("p1");
433
434 let candidates = cache.get_eviction_candidates();
437 assert!(!candidates.is_empty());
438 assert_eq!(candidates[0], "p2");
440 }
441
442 #[test]
443 fn test_cache_eviction_candidates_for() {
444 let cache = MemoryBudgetCache::new(1_000_000).with_min_partitions(1);
445
446 let stats1 = PartitionStats::new(100, 50); let stats2 = PartitionStats::new(100, 50);
449
450 cache.record_loaded("p1".to_string(), stats1.clone());
451 cache.record_loaded("p2".to_string(), stats2.clone());
452
453 let candidates = cache.get_eviction_candidates_for(50_000);
455 assert!(!candidates.is_empty());
456 }
457
458 #[test]
459 fn test_cache_min_partitions() {
460 let cache = MemoryBudgetCache::new(10_000).with_min_partitions(2);
461
462 cache.record_loaded("p1".to_string(), PartitionStats::new(50, 25));
464 cache.record_loaded("p2".to_string(), PartitionStats::new(50, 25));
465 cache.record_loaded("p3".to_string(), PartitionStats::new(50, 25));
466
467 assert!(cache.is_over_budget());
469
470 let candidates = cache.get_eviction_candidates();
472 assert_eq!(candidates.len(), 1); }
474
475 #[test]
476 fn test_cache_metrics() {
477 let cache = MemoryBudgetCache::new(10_000_000);
478
479 cache.record_loaded("p1".to_string(), PartitionStats::new(100, 50));
480
481 cache.touch("p1"); cache.touch("p1"); cache.touch("p2"); assert_eq!(cache.metrics().hits, 2);
487 assert_eq!(cache.metrics().misses, 1);
488 assert!((cache.metrics().hit_rate() - 0.666).abs() < 0.01);
489
490 cache.remove("p1");
492 assert_eq!(cache.metrics().evictions, 1);
493 }
494
495 #[test]
496 fn test_cache_clear() {
497 let cache = MemoryBudgetCache::new(10_000_000);
498
499 cache.record_loaded("p1".to_string(), PartitionStats::new(100, 50));
500 cache.record_loaded("p2".to_string(), PartitionStats::new(100, 50));
501
502 cache.clear();
503
504 assert_eq!(cache.partition_count(), 0);
505 assert_eq!(cache.current_memory_bytes(), 0);
506 }
507
508 #[test]
509 fn test_memory_usage_ratio() {
510 let cache = MemoryBudgetCache::new(1_000_000); assert_eq!(cache.memory_usage_ratio(), 0.0);
513
514 cache.record_loaded("p1".to_string(), PartitionStats::new(500, 250));
516
517 let ratio = cache.memory_usage_ratio();
518 assert!(ratio > 0.0 && ratio < 1.0);
519 }
520}