grafeo_common/memory/buffer/
manager.rs1use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18 pub budget: usize,
20 pub soft_limit_fraction: f64,
22 pub evict_limit_fraction: f64,
24 pub hard_limit_fraction: f64,
26 pub background_eviction: bool,
28 pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33 #[must_use]
37 pub fn detect_system_memory() -> usize {
38 #[cfg(miri)]
40 {
41 return Self::fallback_system_memory();
42 }
43
44 #[cfg(not(miri))]
47 {
48 #[cfg(target_os = "windows")]
49 {
50 Self::fallback_system_memory()
53 }
54
55 #[cfg(target_os = "linux")]
56 {
57 if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
59 for line in contents.lines() {
60 if line.starts_with("MemTotal:")
61 && let Some(kb_str) = line.split_whitespace().nth(1)
62 && let Ok(kb) = kb_str.parse::<usize>()
63 {
64 return kb * 1024;
65 }
66 }
67 }
68 Self::fallback_system_memory()
69 }
70
71 #[cfg(target_os = "macos")]
72 {
73 Self::fallback_system_memory()
75 }
76
77 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
78 {
79 Self::fallback_system_memory()
80 }
81 }
82 }
83
84 fn fallback_system_memory() -> usize {
85 1024 * 1024 * 1024
87 }
88
89 #[must_use]
91 pub fn with_budget(budget: usize) -> Self {
92 Self {
93 budget,
94 ..Default::default()
95 }
96 }
97}
98
99impl Default for BufferManagerConfig {
100 fn default() -> Self {
101 let system_memory = Self::detect_system_memory();
102 Self {
103 budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
104 soft_limit_fraction: 0.70,
105 evict_limit_fraction: 0.85,
106 hard_limit_fraction: 0.95,
107 background_eviction: false, spill_path: None,
109 }
110 }
111}
112
113pub struct BufferManager {
118 config: BufferManagerConfig,
120 allocated: AtomicUsize,
122 region_allocated: [AtomicUsize; 4],
124 consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
126 soft_limit: usize,
128 evict_limit: usize,
130 hard_limit: usize,
132 shutdown: AtomicBool,
134}
135
136impl BufferManager {
137 #[must_use]
139 pub fn new(config: BufferManagerConfig) -> Arc<Self> {
140 let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
141 let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
142 let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
143
144 Arc::new(Self {
145 config,
146 allocated: AtomicUsize::new(0),
147 region_allocated: [
148 AtomicUsize::new(0),
149 AtomicUsize::new(0),
150 AtomicUsize::new(0),
151 AtomicUsize::new(0),
152 ],
153 consumers: RwLock::new(Vec::new()),
154 soft_limit,
155 evict_limit,
156 hard_limit,
157 shutdown: AtomicBool::new(false),
158 })
159 }
160
161 #[must_use]
163 pub fn with_defaults() -> Arc<Self> {
164 Self::new(BufferManagerConfig::default())
165 }
166
167 #[must_use]
169 pub fn with_budget(budget: usize) -> Arc<Self> {
170 Self::new(BufferManagerConfig::with_budget(budget))
171 }
172
173 pub fn try_allocate(
178 self: &Arc<Self>,
179 size: usize,
180 region: MemoryRegion,
181 ) -> Option<MemoryGrant> {
182 let current = self.allocated.load(Ordering::Relaxed);
184
185 if current + size > self.hard_limit {
186 self.run_eviction_cycle(true);
188
189 let current = self.allocated.load(Ordering::Relaxed);
191 if current + size > self.hard_limit {
192 return None;
193 }
194 }
195
196 self.allocated.fetch_add(size, Ordering::Relaxed);
198 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
199
200 self.check_pressure();
202
203 Some(MemoryGrant::new(
204 Arc::clone(self) as Arc<dyn GrantReleaser>,
205 size,
206 region,
207 ))
208 }
209
210 #[must_use]
212 pub fn pressure_level(&self) -> PressureLevel {
213 let current = self.allocated.load(Ordering::Relaxed);
214 self.compute_pressure_level(current)
215 }
216
217 #[must_use]
219 pub fn stats(&self) -> BufferStats {
220 let total_allocated = self.allocated.load(Ordering::Relaxed);
221 BufferStats {
222 budget: self.config.budget,
223 total_allocated,
224 region_allocated: [
225 self.region_allocated[0].load(Ordering::Relaxed),
226 self.region_allocated[1].load(Ordering::Relaxed),
227 self.region_allocated[2].load(Ordering::Relaxed),
228 self.region_allocated[3].load(Ordering::Relaxed),
229 ],
230 pressure_level: self.compute_pressure_level(total_allocated),
231 consumer_count: self.consumers.read().len(),
232 }
233 }
234
235 pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
237 self.consumers.write().push(consumer);
238 }
239
240 pub fn unregister_consumer(&self, name: &str) {
242 self.consumers.write().retain(|c| c.name() != name);
243 }
244
245 pub fn evict_to_target(&self, target_bytes: usize) -> usize {
249 let current = self.allocated.load(Ordering::Relaxed);
250 if current <= target_bytes {
251 return 0;
252 }
253
254 let to_free = current - target_bytes;
255 self.run_eviction_internal(to_free)
256 }
257
258 #[must_use]
260 pub fn config(&self) -> &BufferManagerConfig {
261 &self.config
262 }
263
264 #[must_use]
266 pub fn budget(&self) -> usize {
267 self.config.budget
268 }
269
270 #[must_use]
272 pub fn allocated(&self) -> usize {
273 self.allocated.load(Ordering::Relaxed)
274 }
275
276 #[must_use]
278 pub fn available(&self) -> usize {
279 self.config
280 .budget
281 .saturating_sub(self.allocated.load(Ordering::Relaxed))
282 }
283
284 pub fn shutdown(&self) {
286 self.shutdown.store(true, Ordering::Relaxed);
287 }
288
289 fn compute_pressure_level(&self, current: usize) -> PressureLevel {
292 if current >= self.hard_limit {
293 PressureLevel::Critical
294 } else if current >= self.evict_limit {
295 PressureLevel::High
296 } else if current >= self.soft_limit {
297 PressureLevel::Moderate
298 } else {
299 PressureLevel::Normal
300 }
301 }
302
303 fn check_pressure(&self) {
304 let level = self.pressure_level();
305 if level.requires_eviction() {
306 let aggressive = level >= PressureLevel::High;
309 self.run_eviction_cycle(aggressive);
310 }
311 }
312
313 fn run_eviction_cycle(&self, aggressive: bool) -> usize {
314 let target = if aggressive {
315 self.soft_limit
316 } else {
317 self.evict_limit
318 };
319
320 let current = self.allocated.load(Ordering::Relaxed);
321 if current <= target {
322 return 0;
323 }
324
325 let to_free = current - target;
326 self.run_eviction_internal(to_free)
327 }
328
329 fn run_eviction_internal(&self, to_free: usize) -> usize {
330 let consumers = self.consumers.read();
331
332 let mut sorted: Vec<_> = consumers.iter().collect();
334 sorted.sort_by_key(|c| c.eviction_priority());
335
336 let mut total_freed = 0;
337 for consumer in sorted {
338 if total_freed >= to_free {
339 break;
340 }
341
342 let remaining = to_free - total_freed;
343 let consumer_usage = consumer.memory_usage();
344
345 let target_evict = remaining.min(consumer_usage / 2);
347 if target_evict > 0 {
348 let freed = consumer.evict(target_evict);
349 total_freed += freed;
350 }
353 }
354
355 total_freed
356 }
357}
358
359impl GrantReleaser for BufferManager {
360 fn release(&self, size: usize, region: MemoryRegion) {
361 self.allocated.fetch_sub(size, Ordering::Relaxed);
362 self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
363 }
364
365 fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
366 let current = self.allocated.load(Ordering::Relaxed);
367
368 if current + size > self.hard_limit {
369 self.run_eviction_cycle(true);
371
372 let current = self.allocated.load(Ordering::Relaxed);
373 if current + size > self.hard_limit {
374 return false;
375 }
376 }
377
378 self.allocated.fetch_add(size, Ordering::Relaxed);
379 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
380 true
381 }
382}
383
384impl Drop for BufferManager {
385 fn drop(&mut self) {
386 self.shutdown.store(true, Ordering::Relaxed);
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use crate::memory::buffer::consumer::priorities;
394 use std::sync::atomic::AtomicUsize;
395
396 struct TestConsumer {
397 name: String,
398 usage: AtomicUsize,
399 priority: u8,
400 region: MemoryRegion,
401 evicted: AtomicUsize,
402 }
403
404 impl TestConsumer {
405 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
406 Arc::new(Self {
407 name: name.to_string(),
408 usage: AtomicUsize::new(usage),
409 priority,
410 region,
411 evicted: AtomicUsize::new(0),
412 })
413 }
414 }
415
416 impl MemoryConsumer for TestConsumer {
417 fn name(&self) -> &str {
418 &self.name
419 }
420
421 fn memory_usage(&self) -> usize {
422 self.usage.load(Ordering::Relaxed)
423 }
424
425 fn eviction_priority(&self) -> u8 {
426 self.priority
427 }
428
429 fn region(&self) -> MemoryRegion {
430 self.region
431 }
432
433 fn evict(&self, target_bytes: usize) -> usize {
434 let current = self.usage.load(Ordering::Relaxed);
435 let to_evict = target_bytes.min(current);
436 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
437 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
438 to_evict
439 }
440 }
441
442 #[test]
443 fn test_basic_allocation() {
444 let config = BufferManagerConfig {
445 budget: 1024 * 1024, ..Default::default()
447 };
448 let manager = BufferManager::new(config);
449
450 let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
451 assert!(grant.is_some());
452 assert_eq!(manager.stats().total_allocated, 1024);
453 }
454
455 #[test]
456 fn test_grant_raii_release() {
457 let config = BufferManagerConfig {
458 budget: 1024,
459 ..Default::default()
460 };
461 let manager = BufferManager::new(config);
462
463 {
464 let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
465 assert_eq!(manager.stats().total_allocated, 512);
466 }
467
468 assert_eq!(manager.stats().total_allocated, 0);
470 }
471
472 #[test]
473 fn test_pressure_levels() {
474 let config = BufferManagerConfig {
475 budget: 1000,
476 soft_limit_fraction: 0.70,
477 evict_limit_fraction: 0.85,
478 hard_limit_fraction: 0.95,
479 background_eviction: false,
480 spill_path: None,
481 };
482 let manager = BufferManager::new(config);
483
484 assert_eq!(manager.pressure_level(), PressureLevel::Normal);
485
486 let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
488 assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
489
490 let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
492 assert_eq!(manager.pressure_level(), PressureLevel::High);
493
494 }
496
497 #[test]
498 fn test_region_tracking() {
499 let config = BufferManagerConfig {
500 budget: 10000,
501 ..Default::default()
502 };
503 let manager = BufferManager::new(config);
504
505 let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
506 let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
507 let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
508
509 let stats = manager.stats();
510 assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
511 assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
512 assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
513 assert_eq!(stats.total_allocated, 600);
514 }
515
516 #[test]
517 fn test_consumer_registration() {
518 let manager = BufferManager::with_budget(10000);
519
520 let consumer = TestConsumer::new(
521 "test",
522 1000,
523 priorities::INDEX_BUFFERS,
524 MemoryRegion::IndexBuffers,
525 );
526
527 manager.register_consumer(consumer);
528 assert_eq!(manager.stats().consumer_count, 1);
529
530 manager.unregister_consumer("test");
531 assert_eq!(manager.stats().consumer_count, 0);
532 }
533
534 #[test]
535 fn test_eviction_ordering() {
536 let manager = BufferManager::with_budget(10000);
537
538 let low_priority = TestConsumer::new(
540 "low",
541 500,
542 priorities::SPILL_STAGING,
543 MemoryRegion::SpillStaging,
544 );
545
546 let high_priority = TestConsumer::new(
548 "high",
549 500,
550 priorities::ACTIVE_TRANSACTION,
551 MemoryRegion::ExecutionBuffers,
552 );
553
554 manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
555 manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
556
557 manager.allocated.store(1000, Ordering::Relaxed);
560
561 let freed = manager.evict_to_target(700);
563
564 assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
566 assert!(freed > 0);
567 }
568
569 #[test]
570 fn test_hard_limit_blocking() {
571 let config = BufferManagerConfig {
572 budget: 1000,
573 soft_limit_fraction: 0.70,
574 evict_limit_fraction: 0.85,
575 hard_limit_fraction: 0.95,
576 background_eviction: false,
577 spill_path: None,
578 };
579 let manager = BufferManager::new(config);
580
581 let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
583
584 let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
586 assert!(g2.is_none());
587 }
588
589 #[test]
590 fn test_available_memory() {
591 let manager = BufferManager::with_budget(1000);
592
593 assert_eq!(manager.available(), 1000);
594
595 let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
596 assert_eq!(manager.available(), 700);
597 }
598}