1use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18 pub budget: usize,
20 pub soft_limit_fraction: f64,
22 pub evict_limit_fraction: f64,
24 pub hard_limit_fraction: f64,
26 pub background_eviction: bool,
28 pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33 #[must_use]
37 pub fn detect_system_memory() -> usize {
38 #[cfg(miri)]
40 {
41 return Self::fallback_system_memory();
42 }
43
44 #[cfg(not(miri))]
47 {
48 #[cfg(target_os = "windows")]
49 {
50 Self::fallback_system_memory()
53 }
54
55 #[cfg(target_os = "linux")]
56 {
57 if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
59 for line in contents.lines() {
60 if line.starts_with("MemTotal:")
61 && let Some(kb_str) = line.split_whitespace().nth(1)
62 && let Ok(kb) = kb_str.parse::<usize>()
63 {
64 return kb * 1024;
65 }
66 }
67 }
68 Self::fallback_system_memory()
69 }
70
71 #[cfg(target_os = "macos")]
72 {
73 Self::fallback_system_memory()
75 }
76
77 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
78 {
79 Self::fallback_system_memory()
80 }
81 }
82 }
83
84 fn fallback_system_memory() -> usize {
85 1024 * 1024 * 1024
87 }
88
89 #[must_use]
91 pub fn with_budget(budget: usize) -> Self {
92 Self {
93 budget,
94 ..Default::default()
95 }
96 }
97}
98
99impl Default for BufferManagerConfig {
100 fn default() -> Self {
101 let system_memory = Self::detect_system_memory();
102 Self {
103 budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
104 soft_limit_fraction: 0.70,
105 evict_limit_fraction: 0.85,
106 hard_limit_fraction: 0.95,
107 background_eviction: false, spill_path: None,
109 }
110 }
111}
112
113pub struct BufferManager {
118 config: BufferManagerConfig,
120 allocated: AtomicUsize,
122 region_allocated: [AtomicUsize; 4],
124 consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
126 soft_limit: usize,
128 evict_limit: usize,
130 hard_limit: usize,
132 shutdown: AtomicBool,
134}
135
136impl BufferManager {
137 #[must_use]
139 pub fn new(config: BufferManagerConfig) -> Arc<Self> {
140 let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
141 let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
142 let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
143
144 Arc::new(Self {
145 config,
146 allocated: AtomicUsize::new(0),
147 region_allocated: [
148 AtomicUsize::new(0),
149 AtomicUsize::new(0),
150 AtomicUsize::new(0),
151 AtomicUsize::new(0),
152 ],
153 consumers: RwLock::new(Vec::new()),
154 soft_limit,
155 evict_limit,
156 hard_limit,
157 shutdown: AtomicBool::new(false),
158 })
159 }
160
161 #[must_use]
163 pub fn with_defaults() -> Arc<Self> {
164 Self::new(BufferManagerConfig::default())
165 }
166
167 #[must_use]
169 pub fn with_budget(budget: usize) -> Arc<Self> {
170 Self::new(BufferManagerConfig::with_budget(budget))
171 }
172
173 pub fn try_allocate(
178 self: &Arc<Self>,
179 size: usize,
180 region: MemoryRegion,
181 ) -> Option<MemoryGrant> {
182 let current = self.allocated.load(Ordering::Relaxed);
184
185 if current + size > self.hard_limit {
186 self.run_eviction_cycle(true);
188
189 let current = self.allocated.load(Ordering::Relaxed);
191 if current + size > self.hard_limit {
192 return None;
193 }
194 }
195
196 self.allocated.fetch_add(size, Ordering::Relaxed);
198 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
199
200 self.check_pressure();
202
203 Some(MemoryGrant::new(
204 Arc::clone(self) as Arc<dyn GrantReleaser>,
205 size,
206 region,
207 ))
208 }
209
210 #[must_use]
212 pub fn pressure_level(&self) -> PressureLevel {
213 let current = self.allocated.load(Ordering::Relaxed);
214 self.compute_pressure_level(current)
215 }
216
217 #[must_use]
219 pub fn stats(&self) -> BufferStats {
220 let total_allocated = self.allocated.load(Ordering::Relaxed);
221 BufferStats {
222 budget: self.config.budget,
223 total_allocated,
224 region_allocated: [
225 self.region_allocated[0].load(Ordering::Relaxed),
226 self.region_allocated[1].load(Ordering::Relaxed),
227 self.region_allocated[2].load(Ordering::Relaxed),
228 self.region_allocated[3].load(Ordering::Relaxed),
229 ],
230 pressure_level: self.compute_pressure_level(total_allocated),
231 consumer_count: self.consumers.read().len(),
232 }
233 }
234
235 pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
237 self.consumers.write().push(consumer);
238 }
239
240 pub fn unregister_consumer(&self, name: &str) {
242 self.consumers.write().retain(|c| c.name() != name);
243 }
244
245 pub fn evict_to_target(&self, target_bytes: usize) -> usize {
249 let current = self.allocated.load(Ordering::Relaxed);
250 if current <= target_bytes {
251 return 0;
252 }
253
254 let to_free = current - target_bytes;
255 self.run_eviction_internal(to_free)
256 }
257
258 pub fn spill_all(&self) -> usize {
262 let consumers = self.consumers.read();
263 let mut total_freed = 0;
264 for consumer in consumers.iter() {
265 if consumer.can_spill()
266 && let Ok(freed) = consumer.spill(usize::MAX)
267 {
268 total_freed += freed;
269 }
270 }
271 total_freed
272 }
273
274 #[must_use]
276 pub fn config(&self) -> &BufferManagerConfig {
277 &self.config
278 }
279
280 #[must_use]
282 pub fn budget(&self) -> usize {
283 self.config.budget
284 }
285
286 #[must_use]
288 pub fn allocated(&self) -> usize {
289 self.allocated.load(Ordering::Relaxed)
290 }
291
292 #[must_use]
294 pub fn available(&self) -> usize {
295 self.config
296 .budget
297 .saturating_sub(self.allocated.load(Ordering::Relaxed))
298 }
299
300 pub fn shutdown(&self) {
302 self.shutdown.store(true, Ordering::Relaxed);
303 }
304
305 fn compute_pressure_level(&self, current: usize) -> PressureLevel {
308 if current >= self.hard_limit {
309 PressureLevel::Critical
310 } else if current >= self.evict_limit {
311 PressureLevel::High
312 } else if current >= self.soft_limit {
313 PressureLevel::Moderate
314 } else {
315 PressureLevel::Normal
316 }
317 }
318
319 fn check_pressure(&self) {
320 let level = self.pressure_level();
321 if level.requires_eviction() {
322 let aggressive = level >= PressureLevel::High;
325 self.run_eviction_cycle(aggressive);
326 }
327 }
328
329 fn run_eviction_cycle(&self, aggressive: bool) -> usize {
330 let target = if aggressive {
331 self.soft_limit
332 } else {
333 self.evict_limit
334 };
335
336 let current = self.allocated.load(Ordering::Relaxed);
337 if current <= target {
338 return 0;
339 }
340
341 let to_free = current - target;
342 self.run_eviction_internal(to_free)
343 }
344
345 fn run_eviction_internal(&self, to_free: usize) -> usize {
346 let consumers = self.consumers.read();
347
348 let mut sorted: Vec<_> = consumers.iter().collect();
350 sorted.sort_by_key(|c| c.eviction_priority());
351
352 let mut total_freed = 0;
353 for consumer in &sorted {
354 if total_freed >= to_free {
355 break;
356 }
357
358 let remaining = to_free - total_freed;
359 let consumer_usage = consumer.memory_usage();
360
361 let target_evict = remaining.min(consumer_usage / 2);
363 if target_evict > 0 {
364 let freed = consumer.evict(target_evict);
365 total_freed += freed;
366 }
367 }
368
369 if total_freed < to_free {
372 for consumer in &sorted {
373 if total_freed >= to_free {
374 break;
375 }
376 if !consumer.can_spill() {
377 continue;
378 }
379 let remaining = to_free - total_freed;
380 match consumer.spill(remaining) {
381 Ok(freed) => total_freed += freed,
382 Err(_) => continue,
383 }
384 }
385 }
386
387 total_freed
388 }
389}
390
391impl GrantReleaser for BufferManager {
392 fn release(&self, size: usize, region: MemoryRegion) {
393 self.allocated.fetch_sub(size, Ordering::Relaxed);
394 self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
395 }
396
397 fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
398 let current = self.allocated.load(Ordering::Relaxed);
399
400 if current + size > self.hard_limit {
401 self.run_eviction_cycle(true);
403
404 let current = self.allocated.load(Ordering::Relaxed);
405 if current + size > self.hard_limit {
406 return false;
407 }
408 }
409
410 self.allocated.fetch_add(size, Ordering::Relaxed);
411 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
412 true
413 }
414}
415
416impl Drop for BufferManager {
417 fn drop(&mut self) {
418 self.shutdown.store(true, Ordering::Relaxed);
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use crate::memory::buffer::consumer::priorities;
426 use std::sync::atomic::AtomicUsize;
427
428 struct TestConsumer {
429 name: String,
430 usage: AtomicUsize,
431 priority: u8,
432 region: MemoryRegion,
433 evicted: AtomicUsize,
434 }
435
436 impl TestConsumer {
437 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
438 Arc::new(Self {
439 name: name.to_string(),
440 usage: AtomicUsize::new(usage),
441 priority,
442 region,
443 evicted: AtomicUsize::new(0),
444 })
445 }
446 }
447
448 impl MemoryConsumer for TestConsumer {
449 fn name(&self) -> &str {
450 &self.name
451 }
452
453 fn memory_usage(&self) -> usize {
454 self.usage.load(Ordering::Relaxed)
455 }
456
457 fn eviction_priority(&self) -> u8 {
458 self.priority
459 }
460
461 fn region(&self) -> MemoryRegion {
462 self.region
463 }
464
465 fn evict(&self, target_bytes: usize) -> usize {
466 let current = self.usage.load(Ordering::Relaxed);
467 let to_evict = target_bytes.min(current);
468 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
469 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
470 to_evict
471 }
472 }
473
474 #[test]
475 fn test_basic_allocation() {
476 let config = BufferManagerConfig {
477 budget: 1024 * 1024, ..Default::default()
479 };
480 let manager = BufferManager::new(config);
481
482 let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
483 assert!(grant.is_some());
484 assert_eq!(manager.stats().total_allocated, 1024);
485 }
486
487 #[test]
488 fn test_grant_raii_release() {
489 let config = BufferManagerConfig {
490 budget: 1024,
491 ..Default::default()
492 };
493 let manager = BufferManager::new(config);
494
495 {
496 let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
497 assert_eq!(manager.stats().total_allocated, 512);
498 }
499
500 assert_eq!(manager.stats().total_allocated, 0);
502 }
503
504 #[test]
505 fn test_pressure_levels() {
506 let config = BufferManagerConfig {
507 budget: 1000,
508 soft_limit_fraction: 0.70,
509 evict_limit_fraction: 0.85,
510 hard_limit_fraction: 0.95,
511 background_eviction: false,
512 spill_path: None,
513 };
514 let manager = BufferManager::new(config);
515
516 assert_eq!(manager.pressure_level(), PressureLevel::Normal);
517
518 let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
520 assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
521
522 let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
524 assert_eq!(manager.pressure_level(), PressureLevel::High);
525
526 }
528
529 #[test]
530 fn test_region_tracking() {
531 let config = BufferManagerConfig {
532 budget: 10000,
533 ..Default::default()
534 };
535 let manager = BufferManager::new(config);
536
537 let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
538 let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
539 let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
540
541 let stats = manager.stats();
542 assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
543 assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
544 assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
545 assert_eq!(stats.total_allocated, 600);
546 }
547
548 #[test]
549 fn test_consumer_registration() {
550 let manager = BufferManager::with_budget(10000);
551
552 let consumer = TestConsumer::new(
553 "test",
554 1000,
555 priorities::INDEX_BUFFERS,
556 MemoryRegion::IndexBuffers,
557 );
558
559 manager.register_consumer(consumer);
560 assert_eq!(manager.stats().consumer_count, 1);
561
562 manager.unregister_consumer("test");
563 assert_eq!(manager.stats().consumer_count, 0);
564 }
565
566 #[test]
567 fn test_eviction_ordering() {
568 let manager = BufferManager::with_budget(10000);
569
570 let low_priority = TestConsumer::new(
572 "low",
573 500,
574 priorities::SPILL_STAGING,
575 MemoryRegion::SpillStaging,
576 );
577
578 let high_priority = TestConsumer::new(
580 "high",
581 500,
582 priorities::ACTIVE_TRANSACTION,
583 MemoryRegion::ExecutionBuffers,
584 );
585
586 manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
587 manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
588
589 manager.allocated.store(1000, Ordering::Relaxed);
592
593 let freed = manager.evict_to_target(700);
595
596 assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
598 assert!(freed > 0);
599 }
600
601 #[test]
602 fn test_hard_limit_blocking() {
603 let config = BufferManagerConfig {
604 budget: 1000,
605 soft_limit_fraction: 0.70,
606 evict_limit_fraction: 0.85,
607 hard_limit_fraction: 0.95,
608 background_eviction: false,
609 spill_path: None,
610 };
611 let manager = BufferManager::new(config);
612
613 let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
615
616 let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
618 assert!(g2.is_none());
619 }
620
621 #[test]
622 fn test_available_memory() {
623 let manager = BufferManager::with_budget(1000);
624
625 assert_eq!(manager.available(), 1000);
626
627 let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
628 assert_eq!(manager.available(), 700);
629 }
630
631 struct SpillableConsumer {
634 name: String,
635 usage: AtomicUsize,
636 priority: u8,
637 region: MemoryRegion,
638 evicted: AtomicUsize,
639 spilled: AtomicUsize,
640 spillable: bool,
641 evict_returns_zero: bool,
642 }
643
644 impl SpillableConsumer {
645 fn new(
646 name: &str,
647 usage: usize,
648 priority: u8,
649 region: MemoryRegion,
650 spillable: bool,
651 ) -> Arc<Self> {
652 Arc::new(Self {
653 name: name.to_string(),
654 usage: AtomicUsize::new(usage),
655 priority,
656 region,
657 evicted: AtomicUsize::new(0),
658 spilled: AtomicUsize::new(0),
659 spillable,
660 evict_returns_zero: false,
661 })
662 }
663
664 fn new_evict_fails(
665 name: &str,
666 usage: usize,
667 priority: u8,
668 region: MemoryRegion,
669 spillable: bool,
670 ) -> Arc<Self> {
671 Arc::new(Self {
672 name: name.to_string(),
673 usage: AtomicUsize::new(usage),
674 priority,
675 region,
676 evicted: AtomicUsize::new(0),
677 spilled: AtomicUsize::new(0),
678 spillable,
679 evict_returns_zero: true,
680 })
681 }
682 }
683
684 impl MemoryConsumer for SpillableConsumer {
685 fn name(&self) -> &str {
686 &self.name
687 }
688
689 fn memory_usage(&self) -> usize {
690 self.usage.load(Ordering::Relaxed)
691 }
692
693 fn eviction_priority(&self) -> u8 {
694 self.priority
695 }
696
697 fn region(&self) -> MemoryRegion {
698 self.region
699 }
700
701 fn evict(&self, target_bytes: usize) -> usize {
702 if self.evict_returns_zero {
703 return 0;
704 }
705 let current = self.usage.load(Ordering::Relaxed);
706 let to_evict = target_bytes.min(current);
707 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
708 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
709 to_evict
710 }
711
712 fn can_spill(&self) -> bool {
713 self.spillable
714 }
715
716 fn spill(
717 &self,
718 target_bytes: usize,
719 ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
720 if !self.spillable {
721 return Err(crate::memory::buffer::consumer::SpillError::NotSupported);
722 }
723 let current = self.usage.load(Ordering::Relaxed);
724 let to_spill = target_bytes.min(current);
725 self.usage.fetch_sub(to_spill, Ordering::Relaxed);
726 self.spilled.fetch_add(to_spill, Ordering::Relaxed);
727 Ok(to_spill)
728 }
729 }
730
731 #[test]
732 fn test_spill_all_calls_spillable_consumers() {
733 let manager = BufferManager::with_budget(10000);
734 let spillable = SpillableConsumer::new(
735 "spillable",
736 500,
737 priorities::QUERY_CACHE,
738 MemoryRegion::ExecutionBuffers,
739 true,
740 );
741 let non_spillable = SpillableConsumer::new(
742 "non_spillable",
743 500,
744 priorities::QUERY_CACHE,
745 MemoryRegion::ExecutionBuffers,
746 false,
747 );
748 manager.register_consumer(Arc::clone(&spillable) as Arc<dyn MemoryConsumer>);
749 manager.register_consumer(Arc::clone(&non_spillable) as Arc<dyn MemoryConsumer>);
750
751 let freed = manager.spill_all();
752 assert_eq!(freed, 500);
753 assert_eq!(spillable.spilled.load(Ordering::Relaxed), 500);
754 assert_eq!(non_spillable.spilled.load(Ordering::Relaxed), 0);
755 }
756
757 #[test]
758 fn test_spill_all_skips_non_spillable() {
759 let manager = BufferManager::with_budget(10000);
760 let consumer = SpillableConsumer::new(
761 "no_spill",
762 1000,
763 priorities::INDEX_BUFFERS,
764 MemoryRegion::IndexBuffers,
765 false,
766 );
767 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
768
769 assert_eq!(manager.spill_all(), 0);
770 assert_eq!(consumer.memory_usage(), 1000);
771 }
772
773 #[test]
774 fn test_eviction_falls_back_to_spill() {
775 let manager = BufferManager::with_budget(10000);
776 let consumer = SpillableConsumer::new_evict_fails(
777 "spill_fallback",
778 1000,
779 priorities::QUERY_CACHE,
780 MemoryRegion::ExecutionBuffers,
781 true,
782 );
783 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
784 manager.allocated.store(2000, Ordering::Relaxed);
785
786 let freed = manager.evict_to_target(1500);
787 assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
788 assert!(consumer.spilled.load(Ordering::Relaxed) > 0);
789 assert!(freed > 0);
790 }
791
792 #[test]
793 fn test_eviction_no_spill_when_sufficient() {
794 let manager = BufferManager::with_budget(10000);
795 let consumer = SpillableConsumer::new(
796 "eviction_enough",
797 1000,
798 priorities::QUERY_CACHE,
799 MemoryRegion::ExecutionBuffers,
800 true,
801 );
802 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
803 manager.allocated.store(1200, Ordering::Relaxed);
804
805 let freed = manager.evict_to_target(1000);
806 assert_eq!(freed, 200);
807 assert_eq!(consumer.spilled.load(Ordering::Relaxed), 0);
808 }
809
810 #[test]
811 fn test_eviction_spill_skips_non_spillable() {
812 let manager = BufferManager::with_budget(10000);
813 let consumer = SpillableConsumer::new_evict_fails(
814 "no_spill",
815 1000,
816 priorities::QUERY_CACHE,
817 MemoryRegion::ExecutionBuffers,
818 false,
819 );
820 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
821 manager.allocated.store(2000, Ordering::Relaxed);
822
823 let freed = manager.evict_to_target(1500);
824 assert_eq!(freed, 0);
825 assert_eq!(consumer.memory_usage(), 1000);
826 }
827
828 #[test]
829 fn alix_with_defaults_creates_manager() {
830 let manager = BufferManager::with_defaults();
831 assert!(manager.budget() > 0);
833 assert_eq!(manager.allocated(), 0);
834 assert_eq!(manager.available(), manager.budget());
835 }
836
837 #[test]
838 fn gus_config_accessor_returns_budget() {
839 let manager = BufferManager::with_budget(4096);
840 let config = manager.config();
841 assert_eq!(config.budget, 4096);
842 assert!(!config.background_eviction);
843 assert!(config.spill_path.is_none());
844 }
845
846 #[test]
847 fn vincent_shutdown_sets_flag() {
848 let manager = BufferManager::with_budget(1000);
849 manager.shutdown();
850 assert_eq!(manager.allocated(), 0);
853 }
854
855 #[test]
856 fn jules_critical_pressure_level() {
857 let config = BufferManagerConfig {
858 budget: 1000,
859 soft_limit_fraction: 0.70,
860 evict_limit_fraction: 0.85,
861 hard_limit_fraction: 0.95,
862 background_eviction: false,
863 spill_path: None,
864 };
865 let manager = BufferManager::new(config);
866
867 manager.allocated.store(960, Ordering::Relaxed);
869 assert_eq!(manager.pressure_level(), PressureLevel::Critical);
870 }
871
872 #[test]
873 fn mia_evict_to_target_already_below() {
874 let manager = BufferManager::with_budget(10000);
875 let freed = manager.evict_to_target(5000);
877 assert_eq!(freed, 0);
878 }
879
880 #[test]
881 fn butch_try_allocate_raw_success() {
882 let config = BufferManagerConfig {
883 budget: 1000,
884 soft_limit_fraction: 0.70,
885 evict_limit_fraction: 0.85,
886 hard_limit_fraction: 0.95,
887 background_eviction: false,
888 spill_path: None,
889 };
890 let manager = BufferManager::new(config);
891
892 let success = manager.try_allocate_raw(100, MemoryRegion::GraphStorage);
894 assert!(success);
895 assert_eq!(manager.allocated(), 100);
896 assert_eq!(
897 manager.stats().region_usage(MemoryRegion::GraphStorage),
898 100
899 );
900 }
901
902 #[test]
903 fn django_try_allocate_raw_fails_at_hard_limit() {
904 let config = BufferManagerConfig {
905 budget: 1000,
906 soft_limit_fraction: 0.70,
907 evict_limit_fraction: 0.85,
908 hard_limit_fraction: 0.95,
909 background_eviction: false,
910 spill_path: None,
911 };
912 let manager = BufferManager::new(config);
913
914 manager.allocated.store(940, Ordering::Relaxed);
916
917 let success = manager.try_allocate_raw(100, MemoryRegion::ExecutionBuffers);
919 assert!(!success);
920 }
921
922 #[test]
923 fn shosanna_drop_sets_shutdown() {
924 let manager = BufferManager::with_budget(512);
926 drop(manager);
927 }
929
930 #[test]
931 fn hans_eviction_with_zero_usage_consumer() {
932 let manager = BufferManager::with_budget(10000);
933 let consumer = TestConsumer::new(
935 "empty",
936 0,
937 priorities::SPILL_STAGING,
938 MemoryRegion::SpillStaging,
939 );
940 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
941 manager.allocated.store(500, Ordering::Relaxed);
942
943 let freed = manager.evict_to_target(200);
944 assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
946 assert_eq!(freed, 0);
947 }
948
949 #[test]
950 fn beatrix_grant_releaser_release_decrements() {
951 let config = BufferManagerConfig {
952 budget: 1000,
953 soft_limit_fraction: 0.70,
954 evict_limit_fraction: 0.85,
955 hard_limit_fraction: 0.95,
956 background_eviction: false,
957 spill_path: None,
958 };
959 let manager = BufferManager::new(config);
960
961 assert!(manager.try_allocate_raw(200, MemoryRegion::IndexBuffers));
963 assert_eq!(manager.allocated(), 200);
964
965 manager.release(200, MemoryRegion::IndexBuffers);
966 assert_eq!(manager.allocated(), 0);
967 assert_eq!(manager.stats().region_usage(MemoryRegion::IndexBuffers), 0);
968 }
969
970 struct FailingSpillConsumer {
972 name: String,
973 usage: AtomicUsize,
974 priority: u8,
975 region: MemoryRegion,
976 }
977
978 impl FailingSpillConsumer {
979 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
980 Arc::new(Self {
981 name: name.to_string(),
982 usage: AtomicUsize::new(usage),
983 priority,
984 region,
985 })
986 }
987 }
988
989 impl MemoryConsumer for FailingSpillConsumer {
990 fn name(&self) -> &str {
991 &self.name
992 }
993
994 fn memory_usage(&self) -> usize {
995 self.usage.load(Ordering::Relaxed)
996 }
997
998 fn eviction_priority(&self) -> u8 {
999 self.priority
1000 }
1001
1002 fn region(&self) -> MemoryRegion {
1003 self.region
1004 }
1005
1006 fn evict(&self, _target_bytes: usize) -> usize {
1007 0 }
1009
1010 fn can_spill(&self) -> bool {
1011 true
1012 }
1013
1014 fn spill(
1015 &self,
1016 _target_bytes: usize,
1017 ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
1018 Err(crate::memory::buffer::consumer::SpillError::IoError(
1019 "disk full".to_string(),
1020 ))
1021 }
1022 }
1023
1024 #[test]
1025 fn vincent_spill_error_continues_to_next_consumer() {
1026 let manager = BufferManager::with_budget(10000);
1027
1028 let failing = FailingSpillConsumer::new(
1030 "failing_spill",
1031 500,
1032 priorities::SPILL_STAGING,
1033 MemoryRegion::SpillStaging,
1034 );
1035
1036 let working = SpillableConsumer::new_evict_fails(
1038 "working_spill",
1039 500,
1040 priorities::QUERY_CACHE,
1041 MemoryRegion::ExecutionBuffers,
1042 true,
1043 );
1044
1045 manager.register_consumer(Arc::clone(&failing) as Arc<dyn MemoryConsumer>);
1046 manager.register_consumer(Arc::clone(&working) as Arc<dyn MemoryConsumer>);
1047 manager.allocated.store(2000, Ordering::Relaxed);
1048
1049 let freed = manager.evict_to_target(1500);
1050 assert!(working.spilled.load(Ordering::Relaxed) > 0);
1052 assert!(freed > 0);
1053 }
1054
1055 #[test]
1056 fn django_detect_system_memory_returns_positive() {
1057 let mem = BufferManagerConfig::detect_system_memory();
1058 assert!(mem > 0);
1059 }
1060
1061 #[test]
1062 fn shosanna_spill_path_config() {
1063 let config = BufferManagerConfig {
1064 budget: 1024,
1065 spill_path: Some(PathBuf::from("/tmp/grafeo-spill")),
1066 ..Default::default()
1067 };
1068 assert_eq!(
1069 config.spill_path.as_ref().unwrap().to_str().unwrap(),
1070 "/tmp/grafeo-spill"
1071 );
1072 let manager = BufferManager::new(config);
1073 assert!(manager.config().spill_path.is_some());
1074 }
1075}