1use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18 pub budget: usize,
20 pub soft_limit_fraction: f64,
22 pub evict_limit_fraction: f64,
24 pub hard_limit_fraction: f64,
26 pub background_eviction: bool,
28 pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33 #[must_use]
37 pub fn detect_system_memory() -> usize {
38 #[cfg(miri)]
40 {
41 return Self::fallback_system_memory();
42 }
43
44 #[cfg(not(miri))]
47 {
48 #[cfg(target_os = "windows")]
49 {
50 Self::fallback_system_memory()
53 }
54
55 #[cfg(target_os = "linux")]
56 {
57 if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
59 for line in contents.lines() {
60 if line.starts_with("MemTotal:")
61 && let Some(kb_str) = line.split_whitespace().nth(1)
62 && let Ok(kb) = kb_str.parse::<usize>()
63 {
64 return kb * 1024;
65 }
66 }
67 }
68 Self::fallback_system_memory()
69 }
70
71 #[cfg(target_os = "macos")]
72 {
73 Self::fallback_system_memory()
75 }
76
77 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
78 {
79 Self::fallback_system_memory()
80 }
81 }
82 }
83
84 fn fallback_system_memory() -> usize {
85 1024 * 1024 * 1024
87 }
88
89 #[must_use]
91 pub fn with_budget(budget: usize) -> Self {
92 Self {
93 budget,
94 ..Default::default()
95 }
96 }
97}
98
99impl Default for BufferManagerConfig {
100 fn default() -> Self {
101 let system_memory = Self::detect_system_memory();
102 Self {
103 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
105 budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
106 soft_limit_fraction: 0.70,
107 evict_limit_fraction: 0.85,
108 hard_limit_fraction: 0.95,
109 background_eviction: false, spill_path: None,
111 }
112 }
113}
114
115pub struct BufferManager {
120 config: BufferManagerConfig,
122 allocated: AtomicUsize,
124 region_allocated: [AtomicUsize; 4],
126 consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
128 soft_limit: usize,
130 evict_limit: usize,
132 hard_limit: usize,
134 shutdown: AtomicBool,
136}
137
138impl BufferManager {
139 #[must_use]
141 pub fn new(config: BufferManagerConfig) -> Arc<Self> {
142 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
144 let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
145 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
147 let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
148 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
150 let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
151
152 Arc::new(Self {
153 config,
154 allocated: AtomicUsize::new(0),
155 region_allocated: [
156 AtomicUsize::new(0),
157 AtomicUsize::new(0),
158 AtomicUsize::new(0),
159 AtomicUsize::new(0),
160 ],
161 consumers: RwLock::new(Vec::new()),
162 soft_limit,
163 evict_limit,
164 hard_limit,
165 shutdown: AtomicBool::new(false),
166 })
167 }
168
169 #[must_use]
171 pub fn with_defaults() -> Arc<Self> {
172 Self::new(BufferManagerConfig::default())
173 }
174
175 #[must_use]
177 pub fn with_budget(budget: usize) -> Arc<Self> {
178 Self::new(BufferManagerConfig::with_budget(budget))
179 }
180
181 pub fn try_allocate(
186 self: &Arc<Self>,
187 size: usize,
188 region: MemoryRegion,
189 ) -> Option<MemoryGrant> {
190 let current = self.allocated.load(Ordering::Relaxed);
192
193 if current + size > self.hard_limit {
194 self.run_eviction_cycle(true);
196
197 let current = self.allocated.load(Ordering::Relaxed);
199 if current + size > self.hard_limit {
200 return None;
201 }
202 }
203
204 self.allocated.fetch_add(size, Ordering::Relaxed);
206 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
207
208 self.check_pressure();
210
211 Some(MemoryGrant::new(
212 Arc::clone(self) as Arc<dyn GrantReleaser>,
213 size,
214 region,
215 ))
216 }
217
218 #[must_use]
220 pub fn pressure_level(&self) -> PressureLevel {
221 let current = self.allocated.load(Ordering::Relaxed);
222 self.compute_pressure_level(current)
223 }
224
225 #[must_use]
227 pub fn stats(&self) -> BufferStats {
228 let total_allocated = self.allocated.load(Ordering::Relaxed);
229 BufferStats {
230 budget: self.config.budget,
231 total_allocated,
232 region_allocated: [
233 self.region_allocated[0].load(Ordering::Relaxed),
234 self.region_allocated[1].load(Ordering::Relaxed),
235 self.region_allocated[2].load(Ordering::Relaxed),
236 self.region_allocated[3].load(Ordering::Relaxed),
237 ],
238 pressure_level: self.compute_pressure_level(total_allocated),
239 consumer_count: self.consumers.read().len(),
240 }
241 }
242
243 pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
245 self.consumers.write().push(consumer);
246 }
247
248 pub fn unregister_consumer(&self, name: &str) {
250 self.consumers.write().retain(|c| c.name() != name);
251 }
252
253 pub fn evict_to_target(&self, target_bytes: usize) -> usize {
257 let current = self.allocated.load(Ordering::Relaxed);
258 if current <= target_bytes {
259 return 0;
260 }
261
262 let to_free = current - target_bytes;
263 self.run_eviction_internal(to_free)
264 }
265
266 pub fn spill_all(&self) -> usize {
270 let consumers = self.consumers.read();
271 let mut total_freed = 0;
272 for consumer in consumers.iter() {
273 if consumer.can_spill()
274 && let Ok(freed) = consumer.spill(usize::MAX)
275 {
276 total_freed += freed;
277 }
278 }
279 total_freed
280 }
281
282 #[must_use]
284 pub fn config(&self) -> &BufferManagerConfig {
285 &self.config
286 }
287
288 #[must_use]
290 pub fn budget(&self) -> usize {
291 self.config.budget
292 }
293
294 #[must_use]
296 pub fn allocated(&self) -> usize {
297 self.allocated.load(Ordering::Relaxed)
298 }
299
300 #[must_use]
302 pub fn available(&self) -> usize {
303 self.config
304 .budget
305 .saturating_sub(self.allocated.load(Ordering::Relaxed))
306 }
307
308 pub fn shutdown(&self) {
310 self.shutdown.store(true, Ordering::Relaxed);
311 }
312
313 fn compute_pressure_level(&self, current: usize) -> PressureLevel {
316 if current >= self.hard_limit {
317 PressureLevel::Critical
318 } else if current >= self.evict_limit {
319 PressureLevel::High
320 } else if current >= self.soft_limit {
321 PressureLevel::Moderate
322 } else {
323 PressureLevel::Normal
324 }
325 }
326
327 fn check_pressure(&self) {
328 let level = self.pressure_level();
329 if level.requires_eviction() {
330 let aggressive = level >= PressureLevel::High;
333 self.run_eviction_cycle(aggressive);
334 }
335 }
336
337 fn run_eviction_cycle(&self, aggressive: bool) -> usize {
338 let target = if aggressive {
339 self.soft_limit
340 } else {
341 self.evict_limit
342 };
343
344 let current = self.allocated.load(Ordering::Relaxed);
345 if current <= target {
346 return 0;
347 }
348
349 let to_free = current - target;
350 self.run_eviction_internal(to_free)
351 }
352
353 fn run_eviction_internal(&self, to_free: usize) -> usize {
354 let consumers = self.consumers.read();
355
356 let mut sorted: Vec<_> = consumers.iter().collect();
358 sorted.sort_by_key(|c| c.eviction_priority());
359
360 let mut total_freed = 0;
361 for consumer in &sorted {
362 if total_freed >= to_free {
363 break;
364 }
365
366 let remaining = to_free - total_freed;
367 let consumer_usage = consumer.memory_usage();
368
369 let target_evict = remaining.min(consumer_usage / 2);
371 if target_evict > 0 {
372 let freed = consumer.evict(target_evict);
373 total_freed += freed;
374 }
375 }
376
377 if total_freed < to_free {
380 for consumer in &sorted {
381 if total_freed >= to_free {
382 break;
383 }
384 if !consumer.can_spill() {
385 continue;
386 }
387 let remaining = to_free - total_freed;
388 match consumer.spill(remaining) {
389 Ok(freed) => total_freed += freed,
390 Err(_) => continue,
391 }
392 }
393 }
394
395 total_freed
396 }
397}
398
399impl GrantReleaser for BufferManager {
400 fn release(&self, size: usize, region: MemoryRegion) {
401 self.allocated.fetch_sub(size, Ordering::Relaxed);
402 self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
403 }
404
405 fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
406 let current = self.allocated.load(Ordering::Relaxed);
407
408 if current + size > self.hard_limit {
409 self.run_eviction_cycle(true);
411
412 let current = self.allocated.load(Ordering::Relaxed);
413 if current + size > self.hard_limit {
414 return false;
415 }
416 }
417
418 self.allocated.fetch_add(size, Ordering::Relaxed);
419 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
420 true
421 }
422}
423
424impl Drop for BufferManager {
425 fn drop(&mut self) {
426 self.shutdown.store(true, Ordering::Relaxed);
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use crate::memory::buffer::consumer::priorities;
434 use std::sync::atomic::AtomicUsize;
435
436 struct TestConsumer {
437 name: String,
438 usage: AtomicUsize,
439 priority: u8,
440 region: MemoryRegion,
441 evicted: AtomicUsize,
442 }
443
444 impl TestConsumer {
445 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
446 Arc::new(Self {
447 name: name.to_string(),
448 usage: AtomicUsize::new(usage),
449 priority,
450 region,
451 evicted: AtomicUsize::new(0),
452 })
453 }
454 }
455
456 impl MemoryConsumer for TestConsumer {
457 fn name(&self) -> &str {
458 &self.name
459 }
460
461 fn memory_usage(&self) -> usize {
462 self.usage.load(Ordering::Relaxed)
463 }
464
465 fn eviction_priority(&self) -> u8 {
466 self.priority
467 }
468
469 fn region(&self) -> MemoryRegion {
470 self.region
471 }
472
473 fn evict(&self, target_bytes: usize) -> usize {
474 let current = self.usage.load(Ordering::Relaxed);
475 let to_evict = target_bytes.min(current);
476 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
477 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
478 to_evict
479 }
480 }
481
482 #[test]
483 fn test_basic_allocation() {
484 let config = BufferManagerConfig {
485 budget: 1024 * 1024, ..Default::default()
487 };
488 let manager = BufferManager::new(config);
489
490 let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
491 assert!(grant.is_some());
492 assert_eq!(manager.stats().total_allocated, 1024);
493 }
494
495 #[test]
496 fn test_grant_raii_release() {
497 let config = BufferManagerConfig {
498 budget: 1024,
499 ..Default::default()
500 };
501 let manager = BufferManager::new(config);
502
503 {
504 let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
505 assert_eq!(manager.stats().total_allocated, 512);
506 }
507
508 assert_eq!(manager.stats().total_allocated, 0);
510 }
511
512 #[test]
513 fn test_pressure_levels() {
514 let config = BufferManagerConfig {
515 budget: 1000,
516 soft_limit_fraction: 0.70,
517 evict_limit_fraction: 0.85,
518 hard_limit_fraction: 0.95,
519 background_eviction: false,
520 spill_path: None,
521 };
522 let manager = BufferManager::new(config);
523
524 assert_eq!(manager.pressure_level(), PressureLevel::Normal);
525
526 let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
528 assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
529
530 let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
532 assert_eq!(manager.pressure_level(), PressureLevel::High);
533
534 }
536
537 #[test]
538 fn test_region_tracking() {
539 let config = BufferManagerConfig {
540 budget: 10000,
541 ..Default::default()
542 };
543 let manager = BufferManager::new(config);
544
545 let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
546 let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
547 let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
548
549 let stats = manager.stats();
550 assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
551 assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
552 assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
553 assert_eq!(stats.total_allocated, 600);
554 }
555
556 #[test]
557 fn test_consumer_registration() {
558 let manager = BufferManager::with_budget(10000);
559
560 let consumer = TestConsumer::new(
561 "test",
562 1000,
563 priorities::INDEX_BUFFERS,
564 MemoryRegion::IndexBuffers,
565 );
566
567 manager.register_consumer(consumer);
568 assert_eq!(manager.stats().consumer_count, 1);
569
570 manager.unregister_consumer("test");
571 assert_eq!(manager.stats().consumer_count, 0);
572 }
573
574 #[test]
575 fn test_eviction_ordering() {
576 let manager = BufferManager::with_budget(10000);
577
578 let low_priority = TestConsumer::new(
580 "low",
581 500,
582 priorities::SPILL_STAGING,
583 MemoryRegion::SpillStaging,
584 );
585
586 let high_priority = TestConsumer::new(
588 "high",
589 500,
590 priorities::ACTIVE_TRANSACTION,
591 MemoryRegion::ExecutionBuffers,
592 );
593
594 manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
595 manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
596
597 manager.allocated.store(1000, Ordering::Relaxed);
600
601 let freed = manager.evict_to_target(700);
603
604 assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
606 assert!(freed > 0);
607 }
608
609 #[test]
610 fn test_hard_limit_blocking() {
611 let config = BufferManagerConfig {
612 budget: 1000,
613 soft_limit_fraction: 0.70,
614 evict_limit_fraction: 0.85,
615 hard_limit_fraction: 0.95,
616 background_eviction: false,
617 spill_path: None,
618 };
619 let manager = BufferManager::new(config);
620
621 let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
623
624 let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
626 assert!(g2.is_none());
627 }
628
629 #[test]
630 fn test_available_memory() {
631 let manager = BufferManager::with_budget(1000);
632
633 assert_eq!(manager.available(), 1000);
634
635 let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
636 assert_eq!(manager.available(), 700);
637 }
638
639 struct SpillableConsumer {
642 name: String,
643 usage: AtomicUsize,
644 priority: u8,
645 region: MemoryRegion,
646 evicted: AtomicUsize,
647 spilled: AtomicUsize,
648 spillable: bool,
649 evict_returns_zero: bool,
650 }
651
652 impl SpillableConsumer {
653 fn new(
654 name: &str,
655 usage: usize,
656 priority: u8,
657 region: MemoryRegion,
658 spillable: bool,
659 ) -> Arc<Self> {
660 Arc::new(Self {
661 name: name.to_string(),
662 usage: AtomicUsize::new(usage),
663 priority,
664 region,
665 evicted: AtomicUsize::new(0),
666 spilled: AtomicUsize::new(0),
667 spillable,
668 evict_returns_zero: false,
669 })
670 }
671
672 fn new_evict_fails(
673 name: &str,
674 usage: usize,
675 priority: u8,
676 region: MemoryRegion,
677 spillable: bool,
678 ) -> Arc<Self> {
679 Arc::new(Self {
680 name: name.to_string(),
681 usage: AtomicUsize::new(usage),
682 priority,
683 region,
684 evicted: AtomicUsize::new(0),
685 spilled: AtomicUsize::new(0),
686 spillable,
687 evict_returns_zero: true,
688 })
689 }
690 }
691
692 impl MemoryConsumer for SpillableConsumer {
693 fn name(&self) -> &str {
694 &self.name
695 }
696
697 fn memory_usage(&self) -> usize {
698 self.usage.load(Ordering::Relaxed)
699 }
700
701 fn eviction_priority(&self) -> u8 {
702 self.priority
703 }
704
705 fn region(&self) -> MemoryRegion {
706 self.region
707 }
708
709 fn evict(&self, target_bytes: usize) -> usize {
710 if self.evict_returns_zero {
711 return 0;
712 }
713 let current = self.usage.load(Ordering::Relaxed);
714 let to_evict = target_bytes.min(current);
715 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
716 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
717 to_evict
718 }
719
720 fn can_spill(&self) -> bool {
721 self.spillable
722 }
723
724 fn spill(
725 &self,
726 target_bytes: usize,
727 ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
728 if !self.spillable {
729 return Err(crate::memory::buffer::consumer::SpillError::NotSupported);
730 }
731 let current = self.usage.load(Ordering::Relaxed);
732 let to_spill = target_bytes.min(current);
733 self.usage.fetch_sub(to_spill, Ordering::Relaxed);
734 self.spilled.fetch_add(to_spill, Ordering::Relaxed);
735 Ok(to_spill)
736 }
737 }
738
739 #[test]
740 fn test_spill_all_calls_spillable_consumers() {
741 let manager = BufferManager::with_budget(10000);
742 let spillable = SpillableConsumer::new(
743 "spillable",
744 500,
745 priorities::QUERY_CACHE,
746 MemoryRegion::ExecutionBuffers,
747 true,
748 );
749 let non_spillable = SpillableConsumer::new(
750 "non_spillable",
751 500,
752 priorities::QUERY_CACHE,
753 MemoryRegion::ExecutionBuffers,
754 false,
755 );
756 manager.register_consumer(Arc::clone(&spillable) as Arc<dyn MemoryConsumer>);
757 manager.register_consumer(Arc::clone(&non_spillable) as Arc<dyn MemoryConsumer>);
758
759 let freed = manager.spill_all();
760 assert_eq!(freed, 500);
761 assert_eq!(spillable.spilled.load(Ordering::Relaxed), 500);
762 assert_eq!(non_spillable.spilled.load(Ordering::Relaxed), 0);
763 }
764
765 #[test]
766 fn test_spill_all_skips_non_spillable() {
767 let manager = BufferManager::with_budget(10000);
768 let consumer = SpillableConsumer::new(
769 "no_spill",
770 1000,
771 priorities::INDEX_BUFFERS,
772 MemoryRegion::IndexBuffers,
773 false,
774 );
775 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
776
777 assert_eq!(manager.spill_all(), 0);
778 assert_eq!(consumer.memory_usage(), 1000);
779 }
780
781 #[test]
782 fn test_eviction_falls_back_to_spill() {
783 let manager = BufferManager::with_budget(10000);
784 let consumer = SpillableConsumer::new_evict_fails(
785 "spill_fallback",
786 1000,
787 priorities::QUERY_CACHE,
788 MemoryRegion::ExecutionBuffers,
789 true,
790 );
791 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
792 manager.allocated.store(2000, Ordering::Relaxed);
793
794 let freed = manager.evict_to_target(1500);
795 assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
796 assert!(consumer.spilled.load(Ordering::Relaxed) > 0);
797 assert!(freed > 0);
798 }
799
800 #[test]
801 fn test_eviction_no_spill_when_sufficient() {
802 let manager = BufferManager::with_budget(10000);
803 let consumer = SpillableConsumer::new(
804 "eviction_enough",
805 1000,
806 priorities::QUERY_CACHE,
807 MemoryRegion::ExecutionBuffers,
808 true,
809 );
810 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
811 manager.allocated.store(1200, Ordering::Relaxed);
812
813 let freed = manager.evict_to_target(1000);
814 assert_eq!(freed, 200);
815 assert_eq!(consumer.spilled.load(Ordering::Relaxed), 0);
816 }
817
818 #[test]
819 fn test_eviction_spill_skips_non_spillable() {
820 let manager = BufferManager::with_budget(10000);
821 let consumer = SpillableConsumer::new_evict_fails(
822 "no_spill",
823 1000,
824 priorities::QUERY_CACHE,
825 MemoryRegion::ExecutionBuffers,
826 false,
827 );
828 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
829 manager.allocated.store(2000, Ordering::Relaxed);
830
831 let freed = manager.evict_to_target(1500);
832 assert_eq!(freed, 0);
833 assert_eq!(consumer.memory_usage(), 1000);
834 }
835
836 #[test]
837 fn alix_with_defaults_creates_manager() {
838 let manager = BufferManager::with_defaults();
839 assert!(manager.budget() > 0);
841 assert_eq!(manager.allocated(), 0);
842 assert_eq!(manager.available(), manager.budget());
843 }
844
845 #[test]
846 fn gus_config_accessor_returns_budget() {
847 let manager = BufferManager::with_budget(4096);
848 let config = manager.config();
849 assert_eq!(config.budget, 4096);
850 assert!(!config.background_eviction);
851 assert!(config.spill_path.is_none());
852 }
853
854 #[test]
855 fn vincent_shutdown_sets_flag() {
856 let manager = BufferManager::with_budget(1000);
857 manager.shutdown();
858 assert_eq!(manager.allocated(), 0);
861 }
862
863 #[test]
864 fn jules_critical_pressure_level() {
865 let config = BufferManagerConfig {
866 budget: 1000,
867 soft_limit_fraction: 0.70,
868 evict_limit_fraction: 0.85,
869 hard_limit_fraction: 0.95,
870 background_eviction: false,
871 spill_path: None,
872 };
873 let manager = BufferManager::new(config);
874
875 manager.allocated.store(960, Ordering::Relaxed);
877 assert_eq!(manager.pressure_level(), PressureLevel::Critical);
878 }
879
880 #[test]
881 fn mia_evict_to_target_already_below() {
882 let manager = BufferManager::with_budget(10000);
883 let freed = manager.evict_to_target(5000);
885 assert_eq!(freed, 0);
886 }
887
888 #[test]
889 fn butch_try_allocate_raw_success() {
890 let config = BufferManagerConfig {
891 budget: 1000,
892 soft_limit_fraction: 0.70,
893 evict_limit_fraction: 0.85,
894 hard_limit_fraction: 0.95,
895 background_eviction: false,
896 spill_path: None,
897 };
898 let manager = BufferManager::new(config);
899
900 let success = manager.try_allocate_raw(100, MemoryRegion::GraphStorage);
902 assert!(success);
903 assert_eq!(manager.allocated(), 100);
904 assert_eq!(
905 manager.stats().region_usage(MemoryRegion::GraphStorage),
906 100
907 );
908 }
909
910 #[test]
911 fn django_try_allocate_raw_fails_at_hard_limit() {
912 let config = BufferManagerConfig {
913 budget: 1000,
914 soft_limit_fraction: 0.70,
915 evict_limit_fraction: 0.85,
916 hard_limit_fraction: 0.95,
917 background_eviction: false,
918 spill_path: None,
919 };
920 let manager = BufferManager::new(config);
921
922 manager.allocated.store(940, Ordering::Relaxed);
924
925 let success = manager.try_allocate_raw(100, MemoryRegion::ExecutionBuffers);
927 assert!(!success);
928 }
929
930 #[test]
931 fn shosanna_drop_sets_shutdown() {
932 let manager = BufferManager::with_budget(512);
934 drop(manager);
935 }
937
938 #[test]
939 fn hans_eviction_with_zero_usage_consumer() {
940 let manager = BufferManager::with_budget(10000);
941 let consumer = TestConsumer::new(
943 "empty",
944 0,
945 priorities::SPILL_STAGING,
946 MemoryRegion::SpillStaging,
947 );
948 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
949 manager.allocated.store(500, Ordering::Relaxed);
950
951 let freed = manager.evict_to_target(200);
952 assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
954 assert_eq!(freed, 0);
955 }
956
957 #[test]
958 fn beatrix_grant_releaser_release_decrements() {
959 let config = BufferManagerConfig {
960 budget: 1000,
961 soft_limit_fraction: 0.70,
962 evict_limit_fraction: 0.85,
963 hard_limit_fraction: 0.95,
964 background_eviction: false,
965 spill_path: None,
966 };
967 let manager = BufferManager::new(config);
968
969 assert!(manager.try_allocate_raw(200, MemoryRegion::IndexBuffers));
971 assert_eq!(manager.allocated(), 200);
972
973 manager.release(200, MemoryRegion::IndexBuffers);
974 assert_eq!(manager.allocated(), 0);
975 assert_eq!(manager.stats().region_usage(MemoryRegion::IndexBuffers), 0);
976 }
977
978 struct FailingSpillConsumer {
980 name: String,
981 usage: AtomicUsize,
982 priority: u8,
983 region: MemoryRegion,
984 }
985
986 impl FailingSpillConsumer {
987 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
988 Arc::new(Self {
989 name: name.to_string(),
990 usage: AtomicUsize::new(usage),
991 priority,
992 region,
993 })
994 }
995 }
996
997 impl MemoryConsumer for FailingSpillConsumer {
998 fn name(&self) -> &str {
999 &self.name
1000 }
1001
1002 fn memory_usage(&self) -> usize {
1003 self.usage.load(Ordering::Relaxed)
1004 }
1005
1006 fn eviction_priority(&self) -> u8 {
1007 self.priority
1008 }
1009
1010 fn region(&self) -> MemoryRegion {
1011 self.region
1012 }
1013
1014 fn evict(&self, _target_bytes: usize) -> usize {
1015 0 }
1017
1018 fn can_spill(&self) -> bool {
1019 true
1020 }
1021
1022 fn spill(
1023 &self,
1024 _target_bytes: usize,
1025 ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
1026 Err(crate::memory::buffer::consumer::SpillError::IoError(
1027 "disk full".to_string(),
1028 ))
1029 }
1030 }
1031
1032 #[test]
1033 fn vincent_spill_error_continues_to_next_consumer() {
1034 let manager = BufferManager::with_budget(10000);
1035
1036 let failing = FailingSpillConsumer::new(
1038 "failing_spill",
1039 500,
1040 priorities::SPILL_STAGING,
1041 MemoryRegion::SpillStaging,
1042 );
1043
1044 let working = SpillableConsumer::new_evict_fails(
1046 "working_spill",
1047 500,
1048 priorities::QUERY_CACHE,
1049 MemoryRegion::ExecutionBuffers,
1050 true,
1051 );
1052
1053 manager.register_consumer(Arc::clone(&failing) as Arc<dyn MemoryConsumer>);
1054 manager.register_consumer(Arc::clone(&working) as Arc<dyn MemoryConsumer>);
1055 manager.allocated.store(2000, Ordering::Relaxed);
1056
1057 let freed = manager.evict_to_target(1500);
1058 assert!(working.spilled.load(Ordering::Relaxed) > 0);
1060 assert!(freed > 0);
1061 }
1062
1063 #[test]
1064 fn django_detect_system_memory_returns_positive() {
1065 let mem = BufferManagerConfig::detect_system_memory();
1066 assert!(mem > 0);
1067 }
1068
1069 #[test]
1070 fn shosanna_spill_path_config() {
1071 let config = BufferManagerConfig {
1072 budget: 1024,
1073 spill_path: Some(PathBuf::from("/tmp/grafeo-spill")),
1074 ..Default::default()
1075 };
1076 assert_eq!(
1077 config.spill_path.as_ref().unwrap().to_str().unwrap(),
1078 "/tmp/grafeo-spill"
1079 );
1080 let manager = BufferManager::new(config);
1081 assert!(manager.config().spill_path.is_some());
1082 }
1083}