grafeo_common/memory/buffer/
manager.rs1use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18 pub budget: usize,
20 pub soft_limit_fraction: f64,
22 pub evict_limit_fraction: f64,
24 pub hard_limit_fraction: f64,
26 pub background_eviction: bool,
28 pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33 #[must_use]
37 pub fn detect_system_memory() -> usize {
38 #[cfg(miri)]
40 {
41 return Self::fallback_system_memory();
42 }
43
44 #[cfg(not(miri))]
47 {
48 #[cfg(target_os = "windows")]
49 {
50 Self::fallback_system_memory()
53 }
54
55 #[cfg(target_os = "linux")]
56 {
57 if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
59 for line in contents.lines() {
60 if line.starts_with("MemTotal:") {
61 if let Some(kb_str) = line.split_whitespace().nth(1) {
62 if let Ok(kb) = kb_str.parse::<usize>() {
63 return kb * 1024;
64 }
65 }
66 }
67 }
68 }
69 Self::fallback_system_memory()
70 }
71
72 #[cfg(target_os = "macos")]
73 {
74 Self::fallback_system_memory()
76 }
77
78 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
79 {
80 Self::fallback_system_memory()
81 }
82 }
83 }
84
85 fn fallback_system_memory() -> usize {
86 1024 * 1024 * 1024
88 }
89
90 #[must_use]
92 pub fn with_budget(budget: usize) -> Self {
93 Self {
94 budget,
95 ..Default::default()
96 }
97 }
98}
99
100impl Default for BufferManagerConfig {
101 fn default() -> Self {
102 let system_memory = Self::detect_system_memory();
103 Self {
104 budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
105 soft_limit_fraction: 0.70,
106 evict_limit_fraction: 0.85,
107 hard_limit_fraction: 0.95,
108 background_eviction: false, spill_path: None,
110 }
111 }
112}
113
114pub struct BufferManager {
119 config: BufferManagerConfig,
121 allocated: AtomicUsize,
123 region_allocated: [AtomicUsize; 4],
125 consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
127 soft_limit: usize,
129 evict_limit: usize,
131 hard_limit: usize,
133 shutdown: AtomicBool,
135}
136
137impl BufferManager {
138 #[must_use]
140 pub fn new(config: BufferManagerConfig) -> Arc<Self> {
141 let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
142 let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
143 let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
144
145 Arc::new(Self {
146 config,
147 allocated: AtomicUsize::new(0),
148 region_allocated: [
149 AtomicUsize::new(0),
150 AtomicUsize::new(0),
151 AtomicUsize::new(0),
152 AtomicUsize::new(0),
153 ],
154 consumers: RwLock::new(Vec::new()),
155 soft_limit,
156 evict_limit,
157 hard_limit,
158 shutdown: AtomicBool::new(false),
159 })
160 }
161
162 #[must_use]
164 pub fn with_defaults() -> Arc<Self> {
165 Self::new(BufferManagerConfig::default())
166 }
167
168 #[must_use]
170 pub fn with_budget(budget: usize) -> Arc<Self> {
171 Self::new(BufferManagerConfig::with_budget(budget))
172 }
173
174 pub fn try_allocate(
179 self: &Arc<Self>,
180 size: usize,
181 region: MemoryRegion,
182 ) -> Option<MemoryGrant> {
183 let current = self.allocated.load(Ordering::Relaxed);
185
186 if current + size > self.hard_limit {
187 self.run_eviction_cycle(true);
189
190 let current = self.allocated.load(Ordering::Relaxed);
192 if current + size > self.hard_limit {
193 return None;
194 }
195 }
196
197 self.allocated.fetch_add(size, Ordering::Relaxed);
199 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
200
201 self.check_pressure();
203
204 Some(MemoryGrant::new(
205 Arc::clone(self) as Arc<dyn GrantReleaser>,
206 size,
207 region,
208 ))
209 }
210
211 #[must_use]
213 pub fn pressure_level(&self) -> PressureLevel {
214 let current = self.allocated.load(Ordering::Relaxed);
215 self.compute_pressure_level(current)
216 }
217
218 #[must_use]
220 pub fn stats(&self) -> BufferStats {
221 let total_allocated = self.allocated.load(Ordering::Relaxed);
222 BufferStats {
223 budget: self.config.budget,
224 total_allocated,
225 region_allocated: [
226 self.region_allocated[0].load(Ordering::Relaxed),
227 self.region_allocated[1].load(Ordering::Relaxed),
228 self.region_allocated[2].load(Ordering::Relaxed),
229 self.region_allocated[3].load(Ordering::Relaxed),
230 ],
231 pressure_level: self.compute_pressure_level(total_allocated),
232 consumer_count: self.consumers.read().len(),
233 }
234 }
235
236 pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
238 self.consumers.write().push(consumer);
239 }
240
241 pub fn unregister_consumer(&self, name: &str) {
243 self.consumers.write().retain(|c| c.name() != name);
244 }
245
246 pub fn evict_to_target(&self, target_bytes: usize) -> usize {
250 let current = self.allocated.load(Ordering::Relaxed);
251 if current <= target_bytes {
252 return 0;
253 }
254
255 let to_free = current - target_bytes;
256 self.run_eviction_internal(to_free)
257 }
258
259 #[must_use]
261 pub fn config(&self) -> &BufferManagerConfig {
262 &self.config
263 }
264
265 #[must_use]
267 pub fn budget(&self) -> usize {
268 self.config.budget
269 }
270
271 #[must_use]
273 pub fn allocated(&self) -> usize {
274 self.allocated.load(Ordering::Relaxed)
275 }
276
277 #[must_use]
279 pub fn available(&self) -> usize {
280 self.config
281 .budget
282 .saturating_sub(self.allocated.load(Ordering::Relaxed))
283 }
284
285 pub fn shutdown(&self) {
287 self.shutdown.store(true, Ordering::Relaxed);
288 }
289
290 fn compute_pressure_level(&self, current: usize) -> PressureLevel {
293 if current >= self.hard_limit {
294 PressureLevel::Critical
295 } else if current >= self.evict_limit {
296 PressureLevel::High
297 } else if current >= self.soft_limit {
298 PressureLevel::Moderate
299 } else {
300 PressureLevel::Normal
301 }
302 }
303
304 fn check_pressure(&self) {
305 let level = self.pressure_level();
306 if level.requires_eviction() {
307 let aggressive = level >= PressureLevel::High;
310 self.run_eviction_cycle(aggressive);
311 }
312 }
313
314 fn run_eviction_cycle(&self, aggressive: bool) -> usize {
315 let target = if aggressive {
316 self.soft_limit
317 } else {
318 self.evict_limit
319 };
320
321 let current = self.allocated.load(Ordering::Relaxed);
322 if current <= target {
323 return 0;
324 }
325
326 let to_free = current - target;
327 self.run_eviction_internal(to_free)
328 }
329
330 fn run_eviction_internal(&self, to_free: usize) -> usize {
331 let consumers = self.consumers.read();
332
333 let mut sorted: Vec<_> = consumers.iter().collect();
335 sorted.sort_by_key(|c| c.eviction_priority());
336
337 let mut total_freed = 0;
338 for consumer in sorted {
339 if total_freed >= to_free {
340 break;
341 }
342
343 let remaining = to_free - total_freed;
344 let consumer_usage = consumer.memory_usage();
345
346 let target_evict = remaining.min(consumer_usage / 2);
348 if target_evict > 0 {
349 let freed = consumer.evict(target_evict);
350 total_freed += freed;
351 }
354 }
355
356 total_freed
357 }
358}
359
360impl GrantReleaser for BufferManager {
361 fn release(&self, size: usize, region: MemoryRegion) {
362 self.allocated.fetch_sub(size, Ordering::Relaxed);
363 self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
364 }
365
366 fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
367 let current = self.allocated.load(Ordering::Relaxed);
368
369 if current + size > self.hard_limit {
370 self.run_eviction_cycle(true);
372
373 let current = self.allocated.load(Ordering::Relaxed);
374 if current + size > self.hard_limit {
375 return false;
376 }
377 }
378
379 self.allocated.fetch_add(size, Ordering::Relaxed);
380 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
381 true
382 }
383}
384
385impl Drop for BufferManager {
386 fn drop(&mut self) {
387 self.shutdown.store(true, Ordering::Relaxed);
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394 use crate::memory::buffer::consumer::priorities;
395 use std::sync::atomic::AtomicUsize;
396
397 struct TestConsumer {
398 name: String,
399 usage: AtomicUsize,
400 priority: u8,
401 region: MemoryRegion,
402 evicted: AtomicUsize,
403 }
404
405 impl TestConsumer {
406 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
407 Arc::new(Self {
408 name: name.to_string(),
409 usage: AtomicUsize::new(usage),
410 priority,
411 region,
412 evicted: AtomicUsize::new(0),
413 })
414 }
415 }
416
417 impl MemoryConsumer for TestConsumer {
418 fn name(&self) -> &str {
419 &self.name
420 }
421
422 fn memory_usage(&self) -> usize {
423 self.usage.load(Ordering::Relaxed)
424 }
425
426 fn eviction_priority(&self) -> u8 {
427 self.priority
428 }
429
430 fn region(&self) -> MemoryRegion {
431 self.region
432 }
433
434 fn evict(&self, target_bytes: usize) -> usize {
435 let current = self.usage.load(Ordering::Relaxed);
436 let to_evict = target_bytes.min(current);
437 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
438 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
439 to_evict
440 }
441 }
442
443 #[test]
444 fn test_basic_allocation() {
445 let config = BufferManagerConfig {
446 budget: 1024 * 1024, ..Default::default()
448 };
449 let manager = BufferManager::new(config);
450
451 let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
452 assert!(grant.is_some());
453 assert_eq!(manager.stats().total_allocated, 1024);
454 }
455
456 #[test]
457 fn test_grant_raii_release() {
458 let config = BufferManagerConfig {
459 budget: 1024,
460 ..Default::default()
461 };
462 let manager = BufferManager::new(config);
463
464 {
465 let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
466 assert_eq!(manager.stats().total_allocated, 512);
467 }
468
469 assert_eq!(manager.stats().total_allocated, 0);
471 }
472
473 #[test]
474 fn test_pressure_levels() {
475 let config = BufferManagerConfig {
476 budget: 1000,
477 soft_limit_fraction: 0.70,
478 evict_limit_fraction: 0.85,
479 hard_limit_fraction: 0.95,
480 background_eviction: false,
481 spill_path: None,
482 };
483 let manager = BufferManager::new(config);
484
485 assert_eq!(manager.pressure_level(), PressureLevel::Normal);
486
487 let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
489 assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
490
491 let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
493 assert_eq!(manager.pressure_level(), PressureLevel::High);
494
495 }
497
498 #[test]
499 fn test_region_tracking() {
500 let config = BufferManagerConfig {
501 budget: 10000,
502 ..Default::default()
503 };
504 let manager = BufferManager::new(config);
505
506 let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
507 let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
508 let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
509
510 let stats = manager.stats();
511 assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
512 assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
513 assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
514 assert_eq!(stats.total_allocated, 600);
515 }
516
517 #[test]
518 fn test_consumer_registration() {
519 let manager = BufferManager::with_budget(10000);
520
521 let consumer = TestConsumer::new(
522 "test",
523 1000,
524 priorities::INDEX_BUFFERS,
525 MemoryRegion::IndexBuffers,
526 );
527
528 manager.register_consumer(consumer);
529 assert_eq!(manager.stats().consumer_count, 1);
530
531 manager.unregister_consumer("test");
532 assert_eq!(manager.stats().consumer_count, 0);
533 }
534
535 #[test]
536 fn test_eviction_ordering() {
537 let manager = BufferManager::with_budget(10000);
538
539 let low_priority = TestConsumer::new(
541 "low",
542 500,
543 priorities::SPILL_STAGING,
544 MemoryRegion::SpillStaging,
545 );
546
547 let high_priority = TestConsumer::new(
549 "high",
550 500,
551 priorities::ACTIVE_TRANSACTION,
552 MemoryRegion::ExecutionBuffers,
553 );
554
555 manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
556 manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
557
558 manager.allocated.store(1000, Ordering::Relaxed);
561
562 let freed = manager.evict_to_target(700);
564
565 assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
567 assert!(freed > 0);
568 }
569
570 #[test]
571 fn test_hard_limit_blocking() {
572 let config = BufferManagerConfig {
573 budget: 1000,
574 soft_limit_fraction: 0.70,
575 evict_limit_fraction: 0.85,
576 hard_limit_fraction: 0.95,
577 background_eviction: false,
578 spill_path: None,
579 };
580 let manager = BufferManager::new(config);
581
582 let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
584
585 let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
587 assert!(g2.is_none());
588 }
589
590 #[test]
591 fn test_available_memory() {
592 let manager = BufferManager::with_budget(1000);
593
594 assert_eq!(manager.available(), 1000);
595
596 let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
597 assert_eq!(manager.available(), 700);
598 }
599}