1use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::collections::HashSet;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
12
13const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
15
16#[derive(Debug, Clone)]
18pub struct BufferManagerConfig {
19 pub budget: usize,
21 pub soft_limit_fraction: f64,
23 pub evict_limit_fraction: f64,
25 pub hard_limit_fraction: f64,
27 pub background_eviction: bool,
29 pub spill_path: Option<PathBuf>,
31}
32
33impl BufferManagerConfig {
34 #[must_use]
38 pub fn detect_system_memory() -> usize {
39 #[cfg(miri)]
41 {
42 return Self::fallback_system_memory();
43 }
44
45 #[cfg(not(miri))]
48 {
49 #[cfg(target_os = "windows")]
50 {
51 Self::fallback_system_memory()
54 }
55
56 #[cfg(target_os = "linux")]
57 {
58 if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
60 for line in contents.lines() {
61 if line.starts_with("MemTotal:")
62 && let Some(kb_str) = line.split_whitespace().nth(1)
63 && let Ok(kb) = kb_str.parse::<usize>()
64 {
65 return kb * 1024;
66 }
67 }
68 }
69 Self::fallback_system_memory()
70 }
71
72 #[cfg(target_os = "macos")]
73 {
74 Self::fallback_system_memory()
76 }
77
78 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
79 {
80 Self::fallback_system_memory()
81 }
82 }
83 }
84
85 fn fallback_system_memory() -> usize {
86 1024 * 1024 * 1024
88 }
89
90 #[must_use]
92 pub fn with_budget(budget: usize) -> Self {
93 Self {
94 budget,
95 ..Default::default()
96 }
97 }
98}
99
100impl Default for BufferManagerConfig {
101 fn default() -> Self {
102 let system_memory = Self::detect_system_memory();
103 Self {
104 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
106 budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
107 soft_limit_fraction: 0.70,
108 evict_limit_fraction: 0.85,
109 hard_limit_fraction: 0.95,
110 background_eviction: false, spill_path: None,
112 }
113 }
114}
115
116pub struct BufferManager {
121 config: BufferManagerConfig,
123 allocated: AtomicUsize,
125 region_allocated: [AtomicUsize; 4],
127 consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
129 force_ram_consumers: RwLock<HashSet<String>>,
136 soft_limit: usize,
138 evict_limit: usize,
140 hard_limit: usize,
142 shutdown: AtomicBool,
144}
145
146impl BufferManager {
147 #[must_use]
149 pub fn new(config: BufferManagerConfig) -> Arc<Self> {
150 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
152 let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
153 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
155 let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
156 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
158 let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
159
160 Arc::new(Self {
161 config,
162 allocated: AtomicUsize::new(0),
163 region_allocated: [
164 AtomicUsize::new(0),
165 AtomicUsize::new(0),
166 AtomicUsize::new(0),
167 AtomicUsize::new(0),
168 ],
169 consumers: RwLock::new(Vec::new()),
170 force_ram_consumers: RwLock::new(HashSet::new()),
171 soft_limit,
172 evict_limit,
173 hard_limit,
174 shutdown: AtomicBool::new(false),
175 })
176 }
177
178 #[must_use]
180 pub fn with_defaults() -> Arc<Self> {
181 Self::new(BufferManagerConfig::default())
182 }
183
184 #[must_use]
186 pub fn with_budget(budget: usize) -> Arc<Self> {
187 Self::new(BufferManagerConfig::with_budget(budget))
188 }
189
190 pub fn try_allocate(
195 self: &Arc<Self>,
196 size: usize,
197 region: MemoryRegion,
198 ) -> Option<MemoryGrant> {
199 let current = self.allocated.load(Ordering::Relaxed);
201
202 if current + size > self.hard_limit {
203 self.run_eviction_cycle(true);
205
206 let current = self.allocated.load(Ordering::Relaxed);
208 if current + size > self.hard_limit {
209 return None;
210 }
211 }
212
213 self.allocated.fetch_add(size, Ordering::Relaxed);
215 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
216
217 self.check_pressure();
219
220 Some(MemoryGrant::new(
221 Arc::clone(self) as Arc<dyn GrantReleaser>,
222 size,
223 region,
224 ))
225 }
226
227 #[must_use]
229 pub fn pressure_level(&self) -> PressureLevel {
230 let current = self.allocated.load(Ordering::Relaxed);
231 self.compute_pressure_level(current)
232 }
233
234 #[must_use]
236 pub fn stats(&self) -> BufferStats {
237 let total_allocated = self.allocated.load(Ordering::Relaxed);
238 BufferStats {
239 budget: self.config.budget,
240 total_allocated,
241 region_allocated: [
242 self.region_allocated[0].load(Ordering::Relaxed),
243 self.region_allocated[1].load(Ordering::Relaxed),
244 self.region_allocated[2].load(Ordering::Relaxed),
245 self.region_allocated[3].load(Ordering::Relaxed),
246 ],
247 pressure_level: self.compute_pressure_level(total_allocated),
248 consumer_count: self.consumers.read().len(),
249 }
250 }
251
252 pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
254 self.consumers.write().push(consumer);
255 }
256
257 pub fn unregister_consumer(&self, name: &str) {
259 self.consumers.write().retain(|c| c.name() != name);
260 self.force_ram_consumers.write().remove(name);
263 }
264
265 pub fn mark_force_ram(&self, name: &str) {
274 self.force_ram_consumers.write().insert(name.to_string());
275 }
276
277 pub fn clear_force_ram(&self, name: &str) {
281 self.force_ram_consumers.write().remove(name);
282 }
283
284 #[must_use]
286 pub fn is_force_ram(&self, name: &str) -> bool {
287 self.force_ram_consumers.read().contains(name)
288 }
289
290 pub fn evict_to_target(&self, target_bytes: usize) -> usize {
294 let current = self.allocated.load(Ordering::Relaxed);
295 if current <= target_bytes {
296 return 0;
297 }
298
299 let to_free = current - target_bytes;
300 self.run_eviction_internal(to_free)
301 }
302
303 pub fn spill_all(&self) -> usize {
308 let consumers = self.consumers.read();
309 let force_ram = self.force_ram_consumers.read();
310 let mut total_freed = 0;
311 for consumer in consumers.iter() {
312 if force_ram.contains(consumer.name()) {
313 continue;
314 }
315 if consumer.can_spill()
316 && let Ok(freed) = consumer.spill(usize::MAX)
317 {
318 total_freed += freed;
319 }
320 }
321 total_freed
322 }
323
324 pub fn spill_consumer_by_name(&self, name: &str) -> usize {
337 if self.is_force_ram(name) {
338 #[cfg(feature = "tracing")]
339 tracing::debug!(
340 target: "grafeo::buffer",
341 consumer = name,
342 "spill skipped: consumer pinned ForceRam"
343 );
344 return 0;
345 }
346 let consumers = self.consumers.read();
347 let mut total_freed = 0;
348 for consumer in consumers.iter() {
349 if consumer.name() == name
350 && consumer.can_spill()
351 && let Ok(freed) = consumer.spill(usize::MAX)
352 {
353 total_freed += freed;
354 }
355 }
356 #[cfg(feature = "tracing")]
357 tracing::info!(
358 target: "grafeo::buffer",
359 consumer = name,
360 freed_bytes = total_freed,
361 "tier transition: spill"
362 );
363 total_freed
364 }
365
366 pub fn reload_eligible(&self, target_fraction: f64) -> usize {
389 let target_fraction = target_fraction.clamp(0.0, 1.0);
390 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
392 let target_bytes = (self.config.budget as f64 * target_fraction) as usize;
393
394 let candidates: Vec<Arc<dyn MemoryConsumer>> = {
395 let consumers = self.consumers.read();
396 let mut out: Vec<_> = consumers
397 .iter()
398 .filter(|c| c.current_tier() == super::tiered::StorageTier::OnDisk)
399 .map(Arc::clone)
400 .collect();
401 out.sort_by_key(|c| std::cmp::Reverse(c.eviction_priority()));
403 out
404 };
405
406 let mut reloaded = 0;
407 for consumer in candidates {
408 let current = self.allocated.load(Ordering::Relaxed);
409 if current >= target_bytes {
410 break;
411 }
412 match consumer.reload() {
413 Ok(()) => {
414 #[cfg(feature = "tracing")]
415 tracing::info!(
416 target: "grafeo::buffer",
417 consumer = consumer.name(),
418 "tier transition: reload"
419 );
420 reloaded += 1;
421 }
422 Err(_e) => {
423 #[cfg(feature = "tracing")]
424 tracing::warn!(
425 target: "grafeo::buffer",
426 consumer = consumer.name(),
427 error = %_e,
428 "tier reload failed"
429 );
430 continue;
431 }
432 }
433 }
434 #[cfg(feature = "tracing")]
435 tracing::debug!(
436 target: "grafeo::buffer",
437 reloaded_count = reloaded,
438 target_fraction = target_fraction,
439 "reload_eligible cycle complete"
440 );
441 reloaded
442 }
443
444 #[must_use]
451 pub fn snapshot_consumer_tiers(&self) -> Vec<(String, super::tiered::StorageTier)> {
452 let consumers = self.consumers.read();
453 consumers
454 .iter()
455 .filter_map(|c| {
456 let name = c.name();
457 if !name.starts_with("section:") {
458 return None;
459 }
460 Some((name.to_string(), c.current_tier()))
461 })
462 .collect()
463 }
464
465 #[must_use]
467 pub fn config(&self) -> &BufferManagerConfig {
468 &self.config
469 }
470
471 #[must_use]
473 pub fn budget(&self) -> usize {
474 self.config.budget
475 }
476
477 #[must_use]
479 pub fn allocated(&self) -> usize {
480 self.allocated.load(Ordering::Relaxed)
481 }
482
483 #[must_use]
485 pub fn available(&self) -> usize {
486 self.config
487 .budget
488 .saturating_sub(self.allocated.load(Ordering::Relaxed))
489 }
490
491 pub fn shutdown(&self) {
493 self.shutdown.store(true, Ordering::Relaxed);
494 }
495
496 fn compute_pressure_level(&self, current: usize) -> PressureLevel {
499 if current >= self.hard_limit {
500 PressureLevel::Critical
501 } else if current >= self.evict_limit {
502 PressureLevel::High
503 } else if current >= self.soft_limit {
504 PressureLevel::Moderate
505 } else {
506 PressureLevel::Normal
507 }
508 }
509
510 fn check_pressure(&self) {
511 let level = self.pressure_level();
512 if level.requires_eviction() {
513 let aggressive = level >= PressureLevel::High;
516 self.run_eviction_cycle(aggressive);
517 }
518 }
519
520 fn run_eviction_cycle(&self, aggressive: bool) -> usize {
521 let target = if aggressive {
522 self.soft_limit
523 } else {
524 self.evict_limit
525 };
526
527 let current = self.allocated.load(Ordering::Relaxed);
528 if current <= target {
529 return 0;
530 }
531
532 let to_free = current - target;
533 self.run_eviction_internal(to_free)
534 }
535
536 fn run_eviction_internal(&self, to_free: usize) -> usize {
537 let consumers = self.consumers.read();
538 let force_ram = self.force_ram_consumers.read();
539
540 let mut sorted: Vec<_> = consumers.iter().collect();
542 sorted.sort_by_key(|c| c.eviction_priority());
543
544 let mut total_freed = 0;
545 for consumer in &sorted {
546 if total_freed >= to_free {
547 break;
548 }
549
550 if force_ram.contains(consumer.name()) {
554 continue;
555 }
556
557 let remaining = to_free - total_freed;
558 let consumer_usage = consumer.memory_usage();
559
560 let target_evict = remaining.min(consumer_usage / 2);
562 if target_evict > 0 {
563 let freed = consumer.evict(target_evict);
564 total_freed += freed;
565 }
566 }
567
568 if total_freed < to_free {
573 for consumer in &sorted {
574 if total_freed >= to_free {
575 break;
576 }
577 if force_ram.contains(consumer.name()) {
578 continue;
579 }
580 if !consumer.can_spill() {
581 continue;
582 }
583 let remaining = to_free - total_freed;
584 match consumer.spill(remaining) {
585 Ok(freed) => total_freed += freed,
586 Err(_) => continue,
587 }
588 }
589 }
590
591 total_freed
592 }
593}
594
595impl GrantReleaser for BufferManager {
596 fn release(&self, size: usize, region: MemoryRegion) {
597 self.allocated.fetch_sub(size, Ordering::Relaxed);
598 self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
599 }
600
601 fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
602 let current = self.allocated.load(Ordering::Relaxed);
603
604 if current + size > self.hard_limit {
605 self.run_eviction_cycle(true);
607
608 let current = self.allocated.load(Ordering::Relaxed);
609 if current + size > self.hard_limit {
610 return false;
611 }
612 }
613
614 self.allocated.fetch_add(size, Ordering::Relaxed);
615 self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
616 true
617 }
618}
619
620impl Drop for BufferManager {
621 fn drop(&mut self) {
622 self.shutdown.store(true, Ordering::Relaxed);
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629 use crate::memory::buffer::consumer::priorities;
630 use std::sync::atomic::AtomicUsize;
631
632 struct TestConsumer {
633 name: String,
634 usage: AtomicUsize,
635 priority: u8,
636 region: MemoryRegion,
637 evicted: AtomicUsize,
638 }
639
640 impl TestConsumer {
641 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
642 Arc::new(Self {
643 name: name.to_string(),
644 usage: AtomicUsize::new(usage),
645 priority,
646 region,
647 evicted: AtomicUsize::new(0),
648 })
649 }
650 }
651
652 impl MemoryConsumer for TestConsumer {
653 fn name(&self) -> &str {
654 &self.name
655 }
656
657 fn memory_usage(&self) -> usize {
658 self.usage.load(Ordering::Relaxed)
659 }
660
661 fn eviction_priority(&self) -> u8 {
662 self.priority
663 }
664
665 fn region(&self) -> MemoryRegion {
666 self.region
667 }
668
669 fn evict(&self, target_bytes: usize) -> usize {
670 let current = self.usage.load(Ordering::Relaxed);
671 let to_evict = target_bytes.min(current);
672 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
673 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
674 to_evict
675 }
676
677 fn current_tier(&self) -> super::super::tiered::StorageTier {
678 if self.memory_usage() == 0 {
679 super::super::tiered::StorageTier::Uninitialized
680 } else {
681 super::super::tiered::StorageTier::InMemory
682 }
683 }
684 }
685
686 #[test]
687 fn test_basic_allocation() {
688 let config = BufferManagerConfig {
689 budget: 1024 * 1024, ..Default::default()
691 };
692 let manager = BufferManager::new(config);
693
694 let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
695 assert!(grant.is_some());
696 assert_eq!(manager.stats().total_allocated, 1024);
697 }
698
699 #[test]
700 fn test_grant_raii_release() {
701 let config = BufferManagerConfig {
702 budget: 1024,
703 ..Default::default()
704 };
705 let manager = BufferManager::new(config);
706
707 {
708 let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
709 assert_eq!(manager.stats().total_allocated, 512);
710 }
711
712 assert_eq!(manager.stats().total_allocated, 0);
714 }
715
716 #[test]
717 fn test_pressure_levels() {
718 let config = BufferManagerConfig {
719 budget: 1000,
720 soft_limit_fraction: 0.70,
721 evict_limit_fraction: 0.85,
722 hard_limit_fraction: 0.95,
723 background_eviction: false,
724 spill_path: None,
725 };
726 let manager = BufferManager::new(config);
727
728 assert_eq!(manager.pressure_level(), PressureLevel::Normal);
729
730 let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
732 assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
733
734 let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
736 assert_eq!(manager.pressure_level(), PressureLevel::High);
737
738 }
740
741 #[test]
742 fn test_region_tracking() {
743 let config = BufferManagerConfig {
744 budget: 10000,
745 ..Default::default()
746 };
747 let manager = BufferManager::new(config);
748
749 let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
750 let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
751 let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
752
753 let stats = manager.stats();
754 assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
755 assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
756 assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
757 assert_eq!(stats.total_allocated, 600);
758 }
759
760 #[test]
761 fn test_consumer_registration() {
762 let manager = BufferManager::with_budget(10000);
763
764 let consumer = TestConsumer::new(
765 "test",
766 1000,
767 priorities::INDEX_BUFFERS,
768 MemoryRegion::IndexBuffers,
769 );
770
771 manager.register_consumer(consumer);
772 assert_eq!(manager.stats().consumer_count, 1);
773
774 manager.unregister_consumer("test");
775 assert_eq!(manager.stats().consumer_count, 0);
776 }
777
778 #[test]
779 fn test_eviction_ordering() {
780 let manager = BufferManager::with_budget(10000);
781
782 let low_priority = TestConsumer::new(
784 "low",
785 500,
786 priorities::SPILL_STAGING,
787 MemoryRegion::SpillStaging,
788 );
789
790 let high_priority = TestConsumer::new(
792 "high",
793 500,
794 priorities::ACTIVE_TRANSACTION,
795 MemoryRegion::ExecutionBuffers,
796 );
797
798 manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
799 manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
800
801 manager.allocated.store(1000, Ordering::Relaxed);
804
805 let freed = manager.evict_to_target(700);
807
808 assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
810 assert!(freed > 0);
811 }
812
813 #[test]
814 fn test_hard_limit_blocking() {
815 let config = BufferManagerConfig {
816 budget: 1000,
817 soft_limit_fraction: 0.70,
818 evict_limit_fraction: 0.85,
819 hard_limit_fraction: 0.95,
820 background_eviction: false,
821 spill_path: None,
822 };
823 let manager = BufferManager::new(config);
824
825 let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
827
828 let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
830 assert!(g2.is_none());
831 }
832
833 #[test]
834 fn test_available_memory() {
835 let manager = BufferManager::with_budget(1000);
836
837 assert_eq!(manager.available(), 1000);
838
839 let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
840 assert_eq!(manager.available(), 700);
841 }
842
843 struct SpillableConsumer {
846 name: String,
847 usage: AtomicUsize,
848 priority: u8,
849 region: MemoryRegion,
850 evicted: AtomicUsize,
851 spilled: AtomicUsize,
852 spillable: bool,
853 evict_returns_zero: bool,
854 }
855
856 impl SpillableConsumer {
857 fn new(
858 name: &str,
859 usage: usize,
860 priority: u8,
861 region: MemoryRegion,
862 spillable: bool,
863 ) -> Arc<Self> {
864 Arc::new(Self {
865 name: name.to_string(),
866 usage: AtomicUsize::new(usage),
867 priority,
868 region,
869 evicted: AtomicUsize::new(0),
870 spilled: AtomicUsize::new(0),
871 spillable,
872 evict_returns_zero: false,
873 })
874 }
875
876 fn new_evict_fails(
877 name: &str,
878 usage: usize,
879 priority: u8,
880 region: MemoryRegion,
881 spillable: bool,
882 ) -> Arc<Self> {
883 Arc::new(Self {
884 name: name.to_string(),
885 usage: AtomicUsize::new(usage),
886 priority,
887 region,
888 evicted: AtomicUsize::new(0),
889 spilled: AtomicUsize::new(0),
890 spillable,
891 evict_returns_zero: true,
892 })
893 }
894 }
895
896 impl MemoryConsumer for SpillableConsumer {
897 fn name(&self) -> &str {
898 &self.name
899 }
900
901 fn memory_usage(&self) -> usize {
902 self.usage.load(Ordering::Relaxed)
903 }
904
905 fn eviction_priority(&self) -> u8 {
906 self.priority
907 }
908
909 fn region(&self) -> MemoryRegion {
910 self.region
911 }
912
913 fn evict(&self, target_bytes: usize) -> usize {
914 if self.evict_returns_zero {
915 return 0;
916 }
917 let current = self.usage.load(Ordering::Relaxed);
918 let to_evict = target_bytes.min(current);
919 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
920 self.evicted.fetch_add(to_evict, Ordering::Relaxed);
921 to_evict
922 }
923
924 fn can_spill(&self) -> bool {
925 self.spillable
926 }
927
928 fn spill(
929 &self,
930 target_bytes: usize,
931 ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
932 if !self.spillable {
933 return Err(crate::memory::buffer::consumer::SpillError::NotSupported);
934 }
935 let current = self.usage.load(Ordering::Relaxed);
936 let to_spill = target_bytes.min(current);
937 self.usage.fetch_sub(to_spill, Ordering::Relaxed);
938 self.spilled.fetch_add(to_spill, Ordering::Relaxed);
939 Ok(to_spill)
940 }
941
942 fn current_tier(&self) -> super::super::tiered::StorageTier {
943 if self.spilled.load(Ordering::Relaxed) > 0 {
944 super::super::tiered::StorageTier::OnDisk
945 } else if self.memory_usage() == 0 {
946 super::super::tiered::StorageTier::Uninitialized
947 } else {
948 super::super::tiered::StorageTier::InMemory
949 }
950 }
951 }
952
953 #[test]
954 fn test_spill_all_calls_spillable_consumers() {
955 let manager = BufferManager::with_budget(10000);
956 let spillable = SpillableConsumer::new(
957 "spillable",
958 500,
959 priorities::QUERY_CACHE,
960 MemoryRegion::ExecutionBuffers,
961 true,
962 );
963 let non_spillable = SpillableConsumer::new(
964 "non_spillable",
965 500,
966 priorities::QUERY_CACHE,
967 MemoryRegion::ExecutionBuffers,
968 false,
969 );
970 manager.register_consumer(Arc::clone(&spillable) as Arc<dyn MemoryConsumer>);
971 manager.register_consumer(Arc::clone(&non_spillable) as Arc<dyn MemoryConsumer>);
972
973 let freed = manager.spill_all();
974 assert_eq!(freed, 500);
975 assert_eq!(spillable.spilled.load(Ordering::Relaxed), 500);
976 assert_eq!(non_spillable.spilled.load(Ordering::Relaxed), 0);
977 }
978
979 #[test]
980 fn test_spill_all_skips_non_spillable() {
981 let manager = BufferManager::with_budget(10000);
982 let consumer = SpillableConsumer::new(
983 "no_spill",
984 1000,
985 priorities::INDEX_BUFFERS,
986 MemoryRegion::IndexBuffers,
987 false,
988 );
989 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
990
991 assert_eq!(manager.spill_all(), 0);
992 assert_eq!(consumer.memory_usage(), 1000);
993 }
994
995 #[test]
996 fn test_eviction_falls_back_to_spill() {
997 let manager = BufferManager::with_budget(10000);
998 let consumer = SpillableConsumer::new_evict_fails(
999 "spill_fallback",
1000 1000,
1001 priorities::QUERY_CACHE,
1002 MemoryRegion::ExecutionBuffers,
1003 true,
1004 );
1005 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
1006 manager.allocated.store(2000, Ordering::Relaxed);
1007
1008 let freed = manager.evict_to_target(1500);
1009 assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
1010 assert!(consumer.spilled.load(Ordering::Relaxed) > 0);
1011 assert!(freed > 0);
1012 }
1013
1014 #[test]
1015 fn test_eviction_no_spill_when_sufficient() {
1016 let manager = BufferManager::with_budget(10000);
1017 let consumer = SpillableConsumer::new(
1018 "eviction_enough",
1019 1000,
1020 priorities::QUERY_CACHE,
1021 MemoryRegion::ExecutionBuffers,
1022 true,
1023 );
1024 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
1025 manager.allocated.store(1200, Ordering::Relaxed);
1026
1027 let freed = manager.evict_to_target(1000);
1028 assert_eq!(freed, 200);
1029 assert_eq!(consumer.spilled.load(Ordering::Relaxed), 0);
1030 }
1031
1032 #[test]
1033 fn test_eviction_spill_skips_non_spillable() {
1034 let manager = BufferManager::with_budget(10000);
1035 let consumer = SpillableConsumer::new_evict_fails(
1036 "no_spill",
1037 1000,
1038 priorities::QUERY_CACHE,
1039 MemoryRegion::ExecutionBuffers,
1040 false,
1041 );
1042 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
1043 manager.allocated.store(2000, Ordering::Relaxed);
1044
1045 let freed = manager.evict_to_target(1500);
1046 assert_eq!(freed, 0);
1047 assert_eq!(consumer.memory_usage(), 1000);
1048 }
1049
1050 #[test]
1051 fn alix_with_defaults_creates_manager() {
1052 let manager = BufferManager::with_defaults();
1053 assert!(manager.budget() > 0);
1055 assert_eq!(manager.allocated(), 0);
1056 assert_eq!(manager.available(), manager.budget());
1057 }
1058
1059 #[test]
1060 fn gus_config_accessor_returns_budget() {
1061 let manager = BufferManager::with_budget(4096);
1062 let config = manager.config();
1063 assert_eq!(config.budget, 4096);
1064 assert!(!config.background_eviction);
1065 assert!(config.spill_path.is_none());
1066 }
1067
1068 #[test]
1069 fn vincent_shutdown_sets_flag() {
1070 let manager = BufferManager::with_budget(1000);
1071 manager.shutdown();
1072 assert_eq!(manager.allocated(), 0);
1075 }
1076
1077 #[test]
1078 fn jules_critical_pressure_level() {
1079 let config = BufferManagerConfig {
1080 budget: 1000,
1081 soft_limit_fraction: 0.70,
1082 evict_limit_fraction: 0.85,
1083 hard_limit_fraction: 0.95,
1084 background_eviction: false,
1085 spill_path: None,
1086 };
1087 let manager = BufferManager::new(config);
1088
1089 manager.allocated.store(960, Ordering::Relaxed);
1091 assert_eq!(manager.pressure_level(), PressureLevel::Critical);
1092 }
1093
1094 #[test]
1095 fn mia_evict_to_target_already_below() {
1096 let manager = BufferManager::with_budget(10000);
1097 let freed = manager.evict_to_target(5000);
1099 assert_eq!(freed, 0);
1100 }
1101
1102 #[test]
1103 fn butch_try_allocate_raw_success() {
1104 let config = BufferManagerConfig {
1105 budget: 1000,
1106 soft_limit_fraction: 0.70,
1107 evict_limit_fraction: 0.85,
1108 hard_limit_fraction: 0.95,
1109 background_eviction: false,
1110 spill_path: None,
1111 };
1112 let manager = BufferManager::new(config);
1113
1114 let success = manager.try_allocate_raw(100, MemoryRegion::GraphStorage);
1116 assert!(success);
1117 assert_eq!(manager.allocated(), 100);
1118 assert_eq!(
1119 manager.stats().region_usage(MemoryRegion::GraphStorage),
1120 100
1121 );
1122 }
1123
1124 #[test]
1125 fn django_try_allocate_raw_fails_at_hard_limit() {
1126 let config = BufferManagerConfig {
1127 budget: 1000,
1128 soft_limit_fraction: 0.70,
1129 evict_limit_fraction: 0.85,
1130 hard_limit_fraction: 0.95,
1131 background_eviction: false,
1132 spill_path: None,
1133 };
1134 let manager = BufferManager::new(config);
1135
1136 manager.allocated.store(940, Ordering::Relaxed);
1138
1139 let success = manager.try_allocate_raw(100, MemoryRegion::ExecutionBuffers);
1141 assert!(!success);
1142 }
1143
1144 #[test]
1145 fn shosanna_drop_sets_shutdown() {
1146 let manager = BufferManager::with_budget(512);
1148 drop(manager);
1149 }
1151
1152 #[test]
1153 fn hans_eviction_with_zero_usage_consumer() {
1154 let manager = BufferManager::with_budget(10000);
1155 let consumer = TestConsumer::new(
1157 "empty",
1158 0,
1159 priorities::SPILL_STAGING,
1160 MemoryRegion::SpillStaging,
1161 );
1162 manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
1163 manager.allocated.store(500, Ordering::Relaxed);
1164
1165 let freed = manager.evict_to_target(200);
1166 assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
1168 assert_eq!(freed, 0);
1169 }
1170
1171 #[test]
1172 fn beatrix_grant_releaser_release_decrements() {
1173 let config = BufferManagerConfig {
1174 budget: 1000,
1175 soft_limit_fraction: 0.70,
1176 evict_limit_fraction: 0.85,
1177 hard_limit_fraction: 0.95,
1178 background_eviction: false,
1179 spill_path: None,
1180 };
1181 let manager = BufferManager::new(config);
1182
1183 assert!(manager.try_allocate_raw(200, MemoryRegion::IndexBuffers));
1185 assert_eq!(manager.allocated(), 200);
1186
1187 manager.release(200, MemoryRegion::IndexBuffers);
1188 assert_eq!(manager.allocated(), 0);
1189 assert_eq!(manager.stats().region_usage(MemoryRegion::IndexBuffers), 0);
1190 }
1191
1192 struct FailingSpillConsumer {
1194 name: String,
1195 usage: AtomicUsize,
1196 priority: u8,
1197 region: MemoryRegion,
1198 }
1199
1200 impl FailingSpillConsumer {
1201 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
1202 Arc::new(Self {
1203 name: name.to_string(),
1204 usage: AtomicUsize::new(usage),
1205 priority,
1206 region,
1207 })
1208 }
1209 }
1210
1211 impl MemoryConsumer for FailingSpillConsumer {
1212 fn name(&self) -> &str {
1213 &self.name
1214 }
1215
1216 fn memory_usage(&self) -> usize {
1217 self.usage.load(Ordering::Relaxed)
1218 }
1219
1220 fn eviction_priority(&self) -> u8 {
1221 self.priority
1222 }
1223
1224 fn region(&self) -> MemoryRegion {
1225 self.region
1226 }
1227
1228 fn evict(&self, _target_bytes: usize) -> usize {
1229 0 }
1231
1232 fn can_spill(&self) -> bool {
1233 true
1234 }
1235
1236 fn spill(
1237 &self,
1238 _target_bytes: usize,
1239 ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
1240 Err(crate::memory::buffer::consumer::SpillError::IoError(
1241 "disk full".to_string(),
1242 ))
1243 }
1244
1245 fn current_tier(&self) -> super::super::tiered::StorageTier {
1246 if self.memory_usage() == 0 {
1247 super::super::tiered::StorageTier::Uninitialized
1248 } else {
1249 super::super::tiered::StorageTier::InMemory
1250 }
1251 }
1252 }
1253
1254 #[test]
1255 fn vincent_spill_error_continues_to_next_consumer() {
1256 let manager = BufferManager::with_budget(10000);
1257
1258 let failing = FailingSpillConsumer::new(
1260 "failing_spill",
1261 500,
1262 priorities::SPILL_STAGING,
1263 MemoryRegion::SpillStaging,
1264 );
1265
1266 let working = SpillableConsumer::new_evict_fails(
1268 "working_spill",
1269 500,
1270 priorities::QUERY_CACHE,
1271 MemoryRegion::ExecutionBuffers,
1272 true,
1273 );
1274
1275 manager.register_consumer(Arc::clone(&failing) as Arc<dyn MemoryConsumer>);
1276 manager.register_consumer(Arc::clone(&working) as Arc<dyn MemoryConsumer>);
1277 manager.allocated.store(2000, Ordering::Relaxed);
1278
1279 let freed = manager.evict_to_target(1500);
1280 assert!(working.spilled.load(Ordering::Relaxed) > 0);
1282 assert!(freed > 0);
1283 }
1284
1285 #[test]
1286 fn django_detect_system_memory_returns_positive() {
1287 let mem = BufferManagerConfig::detect_system_memory();
1288 assert!(mem > 0);
1289 }
1290
1291 #[test]
1292 fn shosanna_spill_path_config() {
1293 let config = BufferManagerConfig {
1294 budget: 1024,
1295 spill_path: Some(PathBuf::from("/tmp/grafeo-spill")),
1296 ..Default::default()
1297 };
1298 assert_eq!(
1299 config.spill_path.as_ref().unwrap().to_str().unwrap(),
1300 "/tmp/grafeo-spill"
1301 );
1302 let manager = BufferManager::new(config);
1303 assert!(manager.config().spill_path.is_some());
1304 }
1305}