grafeo_common/memory/buffer/
manager.rs1use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18 pub budget: usize,
20 pub soft_limit_fraction: f64,
22 pub evict_limit_fraction: f64,
24 pub hard_limit_fraction: f64,
26 pub background_eviction: bool,
28 pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33 #[must_use]
37 pub fn detect_system_memory() -> usize {
38 #[cfg(target_os = "windows")]
41 {
42 Self::fallback_system_memory()
45 }
46
47 #[cfg(target_os = "linux")]
48 {
49 if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
51 for line in contents.lines() {
52 if line.starts_with("MemTotal:") {
53 if let Some(kb_str) = line.split_whitespace().nth(1) {
54 if let Ok(kb) = kb_str.parse::<usize>() {
55 return kb * 1024;
56 }
57 }
58 }
59 }
60 }
61 Self::fallback_system_memory()
62 }
63
64 #[cfg(target_os = "macos")]
65 {
66 Self::fallback_system_memory()
68 }
69
70 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
71 {
72 Self::fallback_system_memory()
73 }
74 }
75
76 fn fallback_system_memory() -> usize {
77 1024 * 1024 * 1024
79 }
80
81 #[must_use]
83 pub fn with_budget(budget: usize) -> Self {
84 Self {
85 budget,
86 ..Default::default()
87 }
88 }
89}
90
91impl Default for BufferManagerConfig {
92 fn default() -> Self {
93 let system_memory = Self::detect_system_memory();
94 Self {
95 budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
96 soft_limit_fraction: 0.70,
97 evict_limit_fraction: 0.85,
98 hard_limit_fraction: 0.95,
99 background_eviction: false, spill_path: None,
101 }
102 }
103}
104
105pub struct BufferManager {
110 config: BufferManagerConfig,
112 allocated: AtomicUsize,
114 region_allocated: [AtomicUsize; 4],
116 consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
118 soft_limit: usize,
120 evict_limit: usize,
122 hard_limit: usize,
124 shutdown: AtomicBool,
126}
127
128impl BufferManager {
129 #[must_use]
131 pub fn new(config: BufferManagerConfig) -> Arc<Self> {
132 let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
133 let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
134 let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
135
136 Arc::new(Self {
137 config,
138 allocated: AtomicUsize::new(0),
139 region_allocated: [
140 AtomicUsize::new(0),
141 AtomicUsize::new(0),
142 AtomicUsize::new(0),
143 AtomicUsize::new(0),
144 ],
145 consumers: RwLock::new(Vec::new()),
146 soft_limit,
147 evict_limit,
148 hard_limit,
149 shutdown: AtomicBool::new(false),
150 })
151 }
152
153 #[must_use]
155 pub fn with_defaults() -> Arc<Self> {
156 Self::new(BufferManagerConfig::default())
157 }
158
159 #[must_use]
161 pub fn with_budget(budget: usize) -> Arc<Self> {
162 Self::new(BufferManagerConfig::with_budget(budget))
163 }
164
165 pub fn try_allocate(
170 self: &Arc<Self>,
171 size: usize,
172 region: MemoryRegion,
173 ) -> Option<MemoryGrant> {
174 let current = self.allocated.load(Ordering::Relaxed);
176
177 if current + size > self.hard_limit {
178 self.run_eviction_cycle(true);
180
181 let current = self.allocated.load(Ordering::Relaxed);
183 if current + size > self.hard_limit {
184 return None;
185 }
186 }
187
188 self.allocated.fetch_add(size, Ordering::Relaxed);
190 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
191
192 self.check_pressure();
194
195 Some(MemoryGrant::new(
196 Arc::clone(self) as Arc<dyn GrantReleaser>,
197 size,
198 region,
199 ))
200 }
201
202 #[must_use]
204 pub fn pressure_level(&self) -> PressureLevel {
205 let current = self.allocated.load(Ordering::Relaxed);
206 self.compute_pressure_level(current)
207 }
208
209 #[must_use]
211 pub fn stats(&self) -> BufferStats {
212 let total_allocated = self.allocated.load(Ordering::Relaxed);
213 BufferStats {
214 budget: self.config.budget,
215 total_allocated,
216 region_allocated: [
217 self.region_allocated[0].load(Ordering::Relaxed),
218 self.region_allocated[1].load(Ordering::Relaxed),
219 self.region_allocated[2].load(Ordering::Relaxed),
220 self.region_allocated[3].load(Ordering::Relaxed),
221 ],
222 pressure_level: self.compute_pressure_level(total_allocated),
223 consumer_count: self.consumers.read().len(),
224 }
225 }
226
227 pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
229 self.consumers.write().push(consumer);
230 }
231
232 pub fn unregister_consumer(&self, name: &str) {
234 self.consumers.write().retain(|c| c.name() != name);
235 }
236
237 pub fn evict_to_target(&self, target_bytes: usize) -> usize {
241 let current = self.allocated.load(Ordering::Relaxed);
242 if current <= target_bytes {
243 return 0;
244 }
245
246 let to_free = current - target_bytes;
247 self.run_eviction_internal(to_free)
248 }
249
250 #[must_use]
252 pub fn config(&self) -> &BufferManagerConfig {
253 &self.config
254 }
255
256 #[must_use]
258 pub fn budget(&self) -> usize {
259 self.config.budget
260 }
261
262 #[must_use]
264 pub fn allocated(&self) -> usize {
265 self.allocated.load(Ordering::Relaxed)
266 }
267
268 #[must_use]
270 pub fn available(&self) -> usize {
271 self.config
272 .budget
273 .saturating_sub(self.allocated.load(Ordering::Relaxed))
274 }
275
276 pub fn shutdown(&self) {
278 self.shutdown.store(true, Ordering::Relaxed);
279 }
280
281 fn compute_pressure_level(&self, current: usize) -> PressureLevel {
284 if current >= self.hard_limit {
285 PressureLevel::Critical
286 } else if current >= self.evict_limit {
287 PressureLevel::High
288 } else if current >= self.soft_limit {
289 PressureLevel::Moderate
290 } else {
291 PressureLevel::Normal
292 }
293 }
294
295 fn check_pressure(&self) {
296 let level = self.pressure_level();
297 if level.requires_eviction() {
298 let aggressive = level >= PressureLevel::High;
301 self.run_eviction_cycle(aggressive);
302 }
303 }
304
305 fn run_eviction_cycle(&self, aggressive: bool) -> usize {
306 let target = if aggressive {
307 self.soft_limit
308 } else {
309 self.evict_limit
310 };
311
312 let current = self.allocated.load(Ordering::Relaxed);
313 if current <= target {
314 return 0;
315 }
316
317 let to_free = current - target;
318 self.run_eviction_internal(to_free)
319 }
320
321 fn run_eviction_internal(&self, to_free: usize) -> usize {
322 let consumers = self.consumers.read();
323
324 let mut sorted: Vec<_> = consumers.iter().collect();
326 sorted.sort_by_key(|c| c.eviction_priority());
327
328 let mut total_freed = 0;
329 for consumer in sorted {
330 if total_freed >= to_free {
331 break;
332 }
333
334 let remaining = to_free - total_freed;
335 let consumer_usage = consumer.memory_usage();
336
337 let target_evict = remaining.min(consumer_usage / 2);
339 if target_evict > 0 {
340 let freed = consumer.evict(target_evict);
341 total_freed += freed;
342 }
345 }
346
347 total_freed
348 }
349}
350
351impl GrantReleaser for BufferManager {
352 fn release(&self, size: usize, region: MemoryRegion) {
353 self.allocated.fetch_sub(size, Ordering::Relaxed);
354 self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
355 }
356
357 fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
358 let current = self.allocated.load(Ordering::Relaxed);
359
360 if current + size > self.hard_limit {
361 self.run_eviction_cycle(true);
363
364 let current = self.allocated.load(Ordering::Relaxed);
365 if current + size > self.hard_limit {
366 return false;
367 }
368 }
369
370 self.allocated.fetch_add(size, Ordering::Relaxed);
371 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
372 true
373 }
374}
375
376impl Drop for BufferManager {
377 fn drop(&mut self) {
378 self.shutdown.store(true, Ordering::Relaxed);
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use crate::memory::buffer::consumer::priorities;
386 use std::sync::atomic::AtomicUsize;
387
388 struct TestConsumer {
389 name: String,
390 usage: AtomicUsize,
391 priority: u8,
392 region: MemoryRegion,
393 evicted: AtomicUsize,
394 }
395
396 impl TestConsumer {
397 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
398 Arc::new(Self {
399 name: name.to_string(),
400 usage: AtomicUsize::new(usage),
401 priority,
402 region,
403 evicted: AtomicUsize::new(0),
404 })
405 }
406 }
407
408 impl MemoryConsumer for TestConsumer {
409 fn name(&self) -> &str {
410 &self.name
411 }
412
413 fn memory_usage(&self) -> usize {
414 self.usage.load(Ordering::Relaxed)
415 }
416
417 fn eviction_priority(&self) -> u8 {
418 self.priority
419 }
420
421 fn region(&self) -> MemoryRegion {
422 self.region
423 }
424
425 fn evict(&self, target_bytes: usize) -> usize {
426 let current = self.usage.load(Ordering::Relaxed);
427 let to_evict = target_bytes.min(current);
428 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
429 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
430 to_evict
431 }
432 }
433
434 #[test]
435 fn test_basic_allocation() {
436 let config = BufferManagerConfig {
437 budget: 1024 * 1024, ..Default::default()
439 };
440 let manager = BufferManager::new(config);
441
442 let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
443 assert!(grant.is_some());
444 assert_eq!(manager.stats().total_allocated, 1024);
445 }
446
447 #[test]
448 fn test_grant_raii_release() {
449 let config = BufferManagerConfig {
450 budget: 1024,
451 ..Default::default()
452 };
453 let manager = BufferManager::new(config);
454
455 {
456 let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
457 assert_eq!(manager.stats().total_allocated, 512);
458 }
459
460 assert_eq!(manager.stats().total_allocated, 0);
462 }
463
464 #[test]
465 fn test_pressure_levels() {
466 let config = BufferManagerConfig {
467 budget: 1000,
468 soft_limit_fraction: 0.70,
469 evict_limit_fraction: 0.85,
470 hard_limit_fraction: 0.95,
471 background_eviction: false,
472 spill_path: None,
473 };
474 let manager = BufferManager::new(config);
475
476 assert_eq!(manager.pressure_level(), PressureLevel::Normal);
477
478 let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
480 assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
481
482 let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
484 assert_eq!(manager.pressure_level(), PressureLevel::High);
485
486 }
488
489 #[test]
490 fn test_region_tracking() {
491 let config = BufferManagerConfig {
492 budget: 10000,
493 ..Default::default()
494 };
495 let manager = BufferManager::new(config);
496
497 let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
498 let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
499 let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
500
501 let stats = manager.stats();
502 assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
503 assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
504 assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
505 assert_eq!(stats.total_allocated, 600);
506 }
507
508 #[test]
509 fn test_consumer_registration() {
510 let manager = BufferManager::with_budget(10000);
511
512 let consumer = TestConsumer::new(
513 "test",
514 1000,
515 priorities::INDEX_BUFFERS,
516 MemoryRegion::IndexBuffers,
517 );
518
519 manager.register_consumer(consumer);
520 assert_eq!(manager.stats().consumer_count, 1);
521
522 manager.unregister_consumer("test");
523 assert_eq!(manager.stats().consumer_count, 0);
524 }
525
526 #[test]
527 fn test_eviction_ordering() {
528 let manager = BufferManager::with_budget(10000);
529
530 let low_priority = TestConsumer::new(
532 "low",
533 500,
534 priorities::SPILL_STAGING,
535 MemoryRegion::SpillStaging,
536 );
537
538 let high_priority = TestConsumer::new(
540 "high",
541 500,
542 priorities::ACTIVE_TRANSACTION,
543 MemoryRegion::ExecutionBuffers,
544 );
545
546 manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
547 manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
548
549 manager.allocated.store(1000, Ordering::Relaxed);
552
553 let freed = manager.evict_to_target(700);
555
556 assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
558 assert!(freed > 0);
559 }
560
561 #[test]
562 fn test_hard_limit_blocking() {
563 let config = BufferManagerConfig {
564 budget: 1000,
565 soft_limit_fraction: 0.70,
566 evict_limit_fraction: 0.85,
567 hard_limit_fraction: 0.95,
568 background_eviction: false,
569 spill_path: None,
570 };
571 let manager = BufferManager::new(config);
572
573 let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
575
576 let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
578 assert!(g2.is_none());
579 }
580
581 #[test]
582 fn test_available_memory() {
583 let manager = BufferManager::with_budget(1000);
584
585 assert_eq!(manager.available(), 1000);
586
587 let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
588 assert_eq!(manager.available(), 700);
589 }
590}