Skip to main content

grafeo_common/memory/buffer/
manager.rs

1//! Unified buffer manager implementation.
2
3use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12/// Default memory budget as a fraction of system memory.
13const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15/// Configuration for the buffer manager.
16#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18    /// Total memory budget in bytes.
19    pub budget: usize,
20    /// Soft limit threshold (default: 70%).
21    pub soft_limit_fraction: f64,
22    /// Eviction threshold (default: 85%).
23    pub evict_limit_fraction: f64,
24    /// Hard limit threshold (default: 95%).
25    pub hard_limit_fraction: f64,
26    /// Enable background eviction thread.
27    pub background_eviction: bool,
28    /// Directory for spilling data to disk.
29    pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33    /// Detects system memory size.
34    ///
35    /// Returns a conservative estimate if detection fails.
36    #[must_use]
37    pub fn detect_system_memory() -> usize {
38        // Under Miri, file I/O is blocked by isolation: use fallback directly
39        #[cfg(miri)]
40        {
41            return Self::fallback_system_memory();
42        }
43
44        // Try to detect system memory
45        // On failure, return a conservative 1GB default
46        #[cfg(not(miri))]
47        {
48            #[cfg(target_os = "windows")]
49            {
50                // Windows: Use GetPhysicallyInstalledSystemMemory or GlobalMemoryStatusEx
51                // For now, use a fallback
52                Self::fallback_system_memory()
53            }
54
55            #[cfg(target_os = "linux")]
56            {
57                // Linux: Read from /proc/meminfo
58                if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
59                    for line in contents.lines() {
60                        if line.starts_with("MemTotal:")
61                            && let Some(kb_str) = line.split_whitespace().nth(1)
62                            && let Ok(kb) = kb_str.parse::<usize>()
63                        {
64                            return kb * 1024;
65                        }
66                    }
67                }
68                Self::fallback_system_memory()
69            }
70
71            #[cfg(target_os = "macos")]
72            {
73                // macOS: Use sysctl
74                Self::fallback_system_memory()
75            }
76
77            #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
78            {
79                Self::fallback_system_memory()
80            }
81        }
82    }
83
84    fn fallback_system_memory() -> usize {
85        // Default to 1GB if detection fails
86        1024 * 1024 * 1024
87    }
88
89    /// Creates a config with the given budget.
90    #[must_use]
91    pub fn with_budget(budget: usize) -> Self {
92        Self {
93            budget,
94            ..Default::default()
95        }
96    }
97}
98
99impl Default for BufferManagerConfig {
100    fn default() -> Self {
101        let system_memory = Self::detect_system_memory();
102        Self {
103            // reason: memory fraction (0.0..1.0) of a positive usize is always a valid positive usize
104            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
105            budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
106            soft_limit_fraction: 0.70,
107            evict_limit_fraction: 0.85,
108            hard_limit_fraction: 0.95,
109            background_eviction: false, // Disabled by default for simplicity
110            spill_path: None,
111        }
112    }
113}
114
115/// The central unified buffer manager.
116///
117/// Manages memory allocation across all subsystems with pressure-aware
118/// eviction and optional spilling support.
119pub struct BufferManager {
120    /// Configuration.
121    config: BufferManagerConfig,
122    /// Total allocated bytes.
123    allocated: AtomicUsize,
124    /// Per-region allocated bytes.
125    region_allocated: [AtomicUsize; 4],
126    /// Registered memory consumers.
127    consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
128    /// Computed soft limit in bytes.
129    soft_limit: usize,
130    /// Computed eviction limit in bytes.
131    evict_limit: usize,
132    /// Computed hard limit in bytes.
133    hard_limit: usize,
134    /// Shutdown flag.
135    shutdown: AtomicBool,
136}
137
138impl BufferManager {
139    /// Creates a new buffer manager with the given configuration.
140    #[must_use]
141    pub fn new(config: BufferManagerConfig) -> Arc<Self> {
142        // reason: limit fractions (0.0..1.0) of a positive usize are always valid positive usizes
143        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
144        let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
145        // reason: limit fractions (0.0..1.0) of a positive usize are always valid positive usizes
146        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
147        let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
148        // reason: limit fractions (0.0..1.0) of a positive usize are always valid positive usizes
149        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
150        let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
151
152        Arc::new(Self {
153            config,
154            allocated: AtomicUsize::new(0),
155            region_allocated: [
156                AtomicUsize::new(0),
157                AtomicUsize::new(0),
158                AtomicUsize::new(0),
159                AtomicUsize::new(0),
160            ],
161            consumers: RwLock::new(Vec::new()),
162            soft_limit,
163            evict_limit,
164            hard_limit,
165            shutdown: AtomicBool::new(false),
166        })
167    }
168
169    /// Creates a buffer manager with default configuration.
170    #[must_use]
171    pub fn with_defaults() -> Arc<Self> {
172        Self::new(BufferManagerConfig::default())
173    }
174
175    /// Creates a buffer manager with a specific budget.
176    #[must_use]
177    pub fn with_budget(budget: usize) -> Arc<Self> {
178        Self::new(BufferManagerConfig::with_budget(budget))
179    }
180
181    /// Attempts to allocate memory for the given region.
182    ///
183    /// Returns `None` if allocation would exceed the hard limit after
184    /// eviction attempts.
185    pub fn try_allocate(
186        self: &Arc<Self>,
187        size: usize,
188        region: MemoryRegion,
189    ) -> Option<MemoryGrant> {
190        // Check if we can allocate
191        let current = self.allocated.load(Ordering::Relaxed);
192
193        if current + size > self.hard_limit {
194            // Try eviction first
195            self.run_eviction_cycle(true);
196
197            // Check again
198            let current = self.allocated.load(Ordering::Relaxed);
199            if current + size > self.hard_limit {
200                return None;
201            }
202        }
203
204        // Perform allocation
205        self.allocated.fetch_add(size, Ordering::Relaxed);
206        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
207
208        // Check pressure and potentially trigger background eviction
209        self.check_pressure();
210
211        Some(MemoryGrant::new(
212            Arc::clone(self) as Arc<dyn GrantReleaser>,
213            size,
214            region,
215        ))
216    }
217
218    /// Returns the current pressure level.
219    #[must_use]
220    pub fn pressure_level(&self) -> PressureLevel {
221        let current = self.allocated.load(Ordering::Relaxed);
222        self.compute_pressure_level(current)
223    }
224
225    /// Returns current buffer statistics.
226    #[must_use]
227    pub fn stats(&self) -> BufferStats {
228        let total_allocated = self.allocated.load(Ordering::Relaxed);
229        BufferStats {
230            budget: self.config.budget,
231            total_allocated,
232            region_allocated: [
233                self.region_allocated[0].load(Ordering::Relaxed),
234                self.region_allocated[1].load(Ordering::Relaxed),
235                self.region_allocated[2].load(Ordering::Relaxed),
236                self.region_allocated[3].load(Ordering::Relaxed),
237            ],
238            pressure_level: self.compute_pressure_level(total_allocated),
239            consumer_count: self.consumers.read().len(),
240        }
241    }
242
243    /// Registers a memory consumer for eviction callbacks.
244    pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
245        self.consumers.write().push(consumer);
246    }
247
248    /// Unregisters a memory consumer by name.
249    pub fn unregister_consumer(&self, name: &str) {
250        self.consumers.write().retain(|c| c.name() != name);
251    }
252
253    /// Forces eviction to reach the target usage.
254    ///
255    /// Returns the number of bytes actually freed.
256    pub fn evict_to_target(&self, target_bytes: usize) -> usize {
257        let current = self.allocated.load(Ordering::Relaxed);
258        if current <= target_bytes {
259            return 0;
260        }
261
262        let to_free = current - target_bytes;
263        self.run_eviction_internal(to_free)
264    }
265
266    /// Spills all consumers that support it, regardless of memory pressure.
267    ///
268    /// Used when `TierOverride::ForceDisk` is configured. Returns total bytes freed.
269    pub fn spill_all(&self) -> usize {
270        let consumers = self.consumers.read();
271        let mut total_freed = 0;
272        for consumer in consumers.iter() {
273            if consumer.can_spill()
274                && let Ok(freed) = consumer.spill(usize::MAX)
275            {
276                total_freed += freed;
277            }
278        }
279        total_freed
280    }
281
282    /// Returns the configuration.
283    #[must_use]
284    pub fn config(&self) -> &BufferManagerConfig {
285        &self.config
286    }
287
288    /// Returns the memory budget.
289    #[must_use]
290    pub fn budget(&self) -> usize {
291        self.config.budget
292    }
293
294    /// Returns currently allocated bytes.
295    #[must_use]
296    pub fn allocated(&self) -> usize {
297        self.allocated.load(Ordering::Relaxed)
298    }
299
300    /// Returns available bytes.
301    #[must_use]
302    pub fn available(&self) -> usize {
303        self.config
304            .budget
305            .saturating_sub(self.allocated.load(Ordering::Relaxed))
306    }
307
308    /// Shuts down the buffer manager.
309    pub fn shutdown(&self) {
310        self.shutdown.store(true, Ordering::Relaxed);
311    }
312
313    // === Internal methods ===
314
315    fn compute_pressure_level(&self, current: usize) -> PressureLevel {
316        if current >= self.hard_limit {
317            PressureLevel::Critical
318        } else if current >= self.evict_limit {
319            PressureLevel::High
320        } else if current >= self.soft_limit {
321            PressureLevel::Moderate
322        } else {
323            PressureLevel::Normal
324        }
325    }
326
327    fn check_pressure(&self) {
328        let level = self.pressure_level();
329        if level.requires_eviction() {
330            // In a more complete implementation, this would signal
331            // a background thread. For now, do synchronous eviction.
332            let aggressive = level >= PressureLevel::High;
333            self.run_eviction_cycle(aggressive);
334        }
335    }
336
337    fn run_eviction_cycle(&self, aggressive: bool) -> usize {
338        let target = if aggressive {
339            self.soft_limit
340        } else {
341            self.evict_limit
342        };
343
344        let current = self.allocated.load(Ordering::Relaxed);
345        if current <= target {
346            return 0;
347        }
348
349        let to_free = current - target;
350        self.run_eviction_internal(to_free)
351    }
352
353    fn run_eviction_internal(&self, to_free: usize) -> usize {
354        let consumers = self.consumers.read();
355
356        // Sort consumers by priority (lowest first = evict first)
357        let mut sorted: Vec<_> = consumers.iter().collect();
358        sorted.sort_by_key(|c| c.eviction_priority());
359
360        let mut total_freed = 0;
361        for consumer in &sorted {
362            if total_freed >= to_free {
363                break;
364            }
365
366            let remaining = to_free - total_freed;
367            let consumer_usage = consumer.memory_usage();
368
369            // Ask consumer to evict up to half its usage or remaining needed
370            let target_evict = remaining.min(consumer_usage / 2);
371            if target_evict > 0 {
372                let freed = consumer.evict(target_evict);
373                total_freed += freed;
374            }
375        }
376
377        // If eviction was not enough, try spilling to disk for consumers
378        // that support it (e.g., vector indexes with mmap storage).
379        if total_freed < to_free {
380            for consumer in &sorted {
381                if total_freed >= to_free {
382                    break;
383                }
384                if !consumer.can_spill() {
385                    continue;
386                }
387                let remaining = to_free - total_freed;
388                match consumer.spill(remaining) {
389                    Ok(freed) => total_freed += freed,
390                    Err(_) => continue,
391                }
392            }
393        }
394
395        total_freed
396    }
397}
398
399impl GrantReleaser for BufferManager {
400    fn release(&self, size: usize, region: MemoryRegion) {
401        self.allocated.fetch_sub(size, Ordering::Relaxed);
402        self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
403    }
404
405    fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
406        let current = self.allocated.load(Ordering::Relaxed);
407
408        if current + size > self.hard_limit {
409            // Try eviction
410            self.run_eviction_cycle(true);
411
412            let current = self.allocated.load(Ordering::Relaxed);
413            if current + size > self.hard_limit {
414                return false;
415            }
416        }
417
418        self.allocated.fetch_add(size, Ordering::Relaxed);
419        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
420        true
421    }
422}
423
424impl Drop for BufferManager {
425    fn drop(&mut self) {
426        self.shutdown.store(true, Ordering::Relaxed);
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use crate::memory::buffer::consumer::priorities;
434    use std::sync::atomic::AtomicUsize;
435
436    struct TestConsumer {
437        name: String,
438        usage: AtomicUsize,
439        priority: u8,
440        region: MemoryRegion,
441        evicted: AtomicUsize,
442    }
443
444    impl TestConsumer {
445        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
446            Arc::new(Self {
447                name: name.to_string(),
448                usage: AtomicUsize::new(usage),
449                priority,
450                region,
451                evicted: AtomicUsize::new(0),
452            })
453        }
454    }
455
456    impl MemoryConsumer for TestConsumer {
457        fn name(&self) -> &str {
458            &self.name
459        }
460
461        fn memory_usage(&self) -> usize {
462            self.usage.load(Ordering::Relaxed)
463        }
464
465        fn eviction_priority(&self) -> u8 {
466            self.priority
467        }
468
469        fn region(&self) -> MemoryRegion {
470            self.region
471        }
472
473        fn evict(&self, target_bytes: usize) -> usize {
474            let current = self.usage.load(Ordering::Relaxed);
475            let to_evict = target_bytes.min(current);
476            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
477            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
478            to_evict
479        }
480    }
481
482    #[test]
483    fn test_basic_allocation() {
484        let config = BufferManagerConfig {
485            budget: 1024 * 1024, // 1MB
486            ..Default::default()
487        };
488        let manager = BufferManager::new(config);
489
490        let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
491        assert!(grant.is_some());
492        assert_eq!(manager.stats().total_allocated, 1024);
493    }
494
495    #[test]
496    fn test_grant_raii_release() {
497        let config = BufferManagerConfig {
498            budget: 1024,
499            ..Default::default()
500        };
501        let manager = BufferManager::new(config);
502
503        {
504            let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
505            assert_eq!(manager.stats().total_allocated, 512);
506        }
507
508        // Grant dropped, memory should be released
509        assert_eq!(manager.stats().total_allocated, 0);
510    }
511
512    #[test]
513    fn test_pressure_levels() {
514        let config = BufferManagerConfig {
515            budget: 1000,
516            soft_limit_fraction: 0.70,
517            evict_limit_fraction: 0.85,
518            hard_limit_fraction: 0.95,
519            background_eviction: false,
520            spill_path: None,
521        };
522        let manager = BufferManager::new(config);
523
524        assert_eq!(manager.pressure_level(), PressureLevel::Normal);
525
526        // Allocate to 70% (soft limit)
527        let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
528        assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
529
530        // Allocate to 85% (evict limit)
531        let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
532        assert_eq!(manager.pressure_level(), PressureLevel::High);
533
534        // Note: Can't easily test Critical without blocking
535    }
536
537    #[test]
538    fn test_region_tracking() {
539        let config = BufferManagerConfig {
540            budget: 10000,
541            ..Default::default()
542        };
543        let manager = BufferManager::new(config);
544
545        let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
546        let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
547        let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
548
549        let stats = manager.stats();
550        assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
551        assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
552        assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
553        assert_eq!(stats.total_allocated, 600);
554    }
555
556    #[test]
557    fn test_consumer_registration() {
558        let manager = BufferManager::with_budget(10000);
559
560        let consumer = TestConsumer::new(
561            "test",
562            1000,
563            priorities::INDEX_BUFFERS,
564            MemoryRegion::IndexBuffers,
565        );
566
567        manager.register_consumer(consumer);
568        assert_eq!(manager.stats().consumer_count, 1);
569
570        manager.unregister_consumer("test");
571        assert_eq!(manager.stats().consumer_count, 0);
572    }
573
574    #[test]
575    fn test_eviction_ordering() {
576        let manager = BufferManager::with_budget(10000);
577
578        // Low priority consumer (evict first)
579        let low_priority = TestConsumer::new(
580            "low",
581            500,
582            priorities::SPILL_STAGING,
583            MemoryRegion::SpillStaging,
584        );
585
586        // High priority consumer (evict last)
587        let high_priority = TestConsumer::new(
588            "high",
589            500,
590            priorities::ACTIVE_TRANSACTION,
591            MemoryRegion::ExecutionBuffers,
592        );
593
594        manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
595        manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
596
597        // Manually set allocated to simulate memory usage
598        // (consumers track their own usage separately from manager's allocation tracking)
599        manager.allocated.store(1000, Ordering::Relaxed);
600
601        // Request eviction to target 700 (need to free 300 bytes)
602        let freed = manager.evict_to_target(700);
603
604        // Low priority should be evicted first (up to half = 250)
605        assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
606        assert!(freed > 0);
607    }
608
609    #[test]
610    fn test_hard_limit_blocking() {
611        let config = BufferManagerConfig {
612            budget: 1000,
613            soft_limit_fraction: 0.70,
614            evict_limit_fraction: 0.85,
615            hard_limit_fraction: 0.95,
616            background_eviction: false,
617            spill_path: None,
618        };
619        let manager = BufferManager::new(config);
620
621        // Allocate up to hard limit (950 bytes)
622        let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
623
624        // This should fail (would exceed hard limit)
625        let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
626        assert!(g2.is_none());
627    }
628
629    #[test]
630    fn test_available_memory() {
631        let manager = BufferManager::with_budget(1000);
632
633        assert_eq!(manager.available(), 1000);
634
635        let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
636        assert_eq!(manager.available(), 700);
637    }
638
639    // --- Spill-aware test consumer ---
640
641    struct SpillableConsumer {
642        name: String,
643        usage: AtomicUsize,
644        priority: u8,
645        region: MemoryRegion,
646        evicted: AtomicUsize,
647        spilled: AtomicUsize,
648        spillable: bool,
649        evict_returns_zero: bool,
650    }
651
652    impl SpillableConsumer {
653        fn new(
654            name: &str,
655            usage: usize,
656            priority: u8,
657            region: MemoryRegion,
658            spillable: bool,
659        ) -> Arc<Self> {
660            Arc::new(Self {
661                name: name.to_string(),
662                usage: AtomicUsize::new(usage),
663                priority,
664                region,
665                evicted: AtomicUsize::new(0),
666                spilled: AtomicUsize::new(0),
667                spillable,
668                evict_returns_zero: false,
669            })
670        }
671
672        fn new_evict_fails(
673            name: &str,
674            usage: usize,
675            priority: u8,
676            region: MemoryRegion,
677            spillable: bool,
678        ) -> Arc<Self> {
679            Arc::new(Self {
680                name: name.to_string(),
681                usage: AtomicUsize::new(usage),
682                priority,
683                region,
684                evicted: AtomicUsize::new(0),
685                spilled: AtomicUsize::new(0),
686                spillable,
687                evict_returns_zero: true,
688            })
689        }
690    }
691
692    impl MemoryConsumer for SpillableConsumer {
693        fn name(&self) -> &str {
694            &self.name
695        }
696
697        fn memory_usage(&self) -> usize {
698            self.usage.load(Ordering::Relaxed)
699        }
700
701        fn eviction_priority(&self) -> u8 {
702            self.priority
703        }
704
705        fn region(&self) -> MemoryRegion {
706            self.region
707        }
708
709        fn evict(&self, target_bytes: usize) -> usize {
710            if self.evict_returns_zero {
711                return 0;
712            }
713            let current = self.usage.load(Ordering::Relaxed);
714            let to_evict = target_bytes.min(current);
715            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
716            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
717            to_evict
718        }
719
720        fn can_spill(&self) -> bool {
721            self.spillable
722        }
723
724        fn spill(
725            &self,
726            target_bytes: usize,
727        ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
728            if !self.spillable {
729                return Err(crate::memory::buffer::consumer::SpillError::NotSupported);
730            }
731            let current = self.usage.load(Ordering::Relaxed);
732            let to_spill = target_bytes.min(current);
733            self.usage.fetch_sub(to_spill, Ordering::Relaxed);
734            self.spilled.fetch_add(to_spill, Ordering::Relaxed);
735            Ok(to_spill)
736        }
737    }
738
739    #[test]
740    fn test_spill_all_calls_spillable_consumers() {
741        let manager = BufferManager::with_budget(10000);
742        let spillable = SpillableConsumer::new(
743            "spillable",
744            500,
745            priorities::QUERY_CACHE,
746            MemoryRegion::ExecutionBuffers,
747            true,
748        );
749        let non_spillable = SpillableConsumer::new(
750            "non_spillable",
751            500,
752            priorities::QUERY_CACHE,
753            MemoryRegion::ExecutionBuffers,
754            false,
755        );
756        manager.register_consumer(Arc::clone(&spillable) as Arc<dyn MemoryConsumer>);
757        manager.register_consumer(Arc::clone(&non_spillable) as Arc<dyn MemoryConsumer>);
758
759        let freed = manager.spill_all();
760        assert_eq!(freed, 500);
761        assert_eq!(spillable.spilled.load(Ordering::Relaxed), 500);
762        assert_eq!(non_spillable.spilled.load(Ordering::Relaxed), 0);
763    }
764
765    #[test]
766    fn test_spill_all_skips_non_spillable() {
767        let manager = BufferManager::with_budget(10000);
768        let consumer = SpillableConsumer::new(
769            "no_spill",
770            1000,
771            priorities::INDEX_BUFFERS,
772            MemoryRegion::IndexBuffers,
773            false,
774        );
775        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
776
777        assert_eq!(manager.spill_all(), 0);
778        assert_eq!(consumer.memory_usage(), 1000);
779    }
780
781    #[test]
782    fn test_eviction_falls_back_to_spill() {
783        let manager = BufferManager::with_budget(10000);
784        let consumer = SpillableConsumer::new_evict_fails(
785            "spill_fallback",
786            1000,
787            priorities::QUERY_CACHE,
788            MemoryRegion::ExecutionBuffers,
789            true,
790        );
791        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
792        manager.allocated.store(2000, Ordering::Relaxed);
793
794        let freed = manager.evict_to_target(1500);
795        assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
796        assert!(consumer.spilled.load(Ordering::Relaxed) > 0);
797        assert!(freed > 0);
798    }
799
800    #[test]
801    fn test_eviction_no_spill_when_sufficient() {
802        let manager = BufferManager::with_budget(10000);
803        let consumer = SpillableConsumer::new(
804            "eviction_enough",
805            1000,
806            priorities::QUERY_CACHE,
807            MemoryRegion::ExecutionBuffers,
808            true,
809        );
810        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
811        manager.allocated.store(1200, Ordering::Relaxed);
812
813        let freed = manager.evict_to_target(1000);
814        assert_eq!(freed, 200);
815        assert_eq!(consumer.spilled.load(Ordering::Relaxed), 0);
816    }
817
818    #[test]
819    fn test_eviction_spill_skips_non_spillable() {
820        let manager = BufferManager::with_budget(10000);
821        let consumer = SpillableConsumer::new_evict_fails(
822            "no_spill",
823            1000,
824            priorities::QUERY_CACHE,
825            MemoryRegion::ExecutionBuffers,
826            false,
827        );
828        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
829        manager.allocated.store(2000, Ordering::Relaxed);
830
831        let freed = manager.evict_to_target(1500);
832        assert_eq!(freed, 0);
833        assert_eq!(consumer.memory_usage(), 1000);
834    }
835
836    #[test]
837    fn alix_with_defaults_creates_manager() {
838        let manager = BufferManager::with_defaults();
839        // with_defaults uses system memory detection, budget should be > 0
840        assert!(manager.budget() > 0);
841        assert_eq!(manager.allocated(), 0);
842        assert_eq!(manager.available(), manager.budget());
843    }
844
845    #[test]
846    fn gus_config_accessor_returns_budget() {
847        let manager = BufferManager::with_budget(4096);
848        let config = manager.config();
849        assert_eq!(config.budget, 4096);
850        assert!(!config.background_eviction);
851        assert!(config.spill_path.is_none());
852    }
853
854    #[test]
855    fn vincent_shutdown_sets_flag() {
856        let manager = BufferManager::with_budget(1000);
857        manager.shutdown();
858        // shutdown stores true; drop also stores true, so this just verifies
859        // the method runs without error and the manager remains usable
860        assert_eq!(manager.allocated(), 0);
861    }
862
863    #[test]
864    fn jules_critical_pressure_level() {
865        let config = BufferManagerConfig {
866            budget: 1000,
867            soft_limit_fraction: 0.70,
868            evict_limit_fraction: 0.85,
869            hard_limit_fraction: 0.95,
870            background_eviction: false,
871            spill_path: None,
872        };
873        let manager = BufferManager::new(config);
874
875        // Manually set allocated above hard limit to test Critical level
876        manager.allocated.store(960, Ordering::Relaxed);
877        assert_eq!(manager.pressure_level(), PressureLevel::Critical);
878    }
879
880    #[test]
881    fn mia_evict_to_target_already_below() {
882        let manager = BufferManager::with_budget(10000);
883        // allocated is 0, target is 5000: already below target
884        let freed = manager.evict_to_target(5000);
885        assert_eq!(freed, 0);
886    }
887
888    #[test]
889    fn butch_try_allocate_raw_success() {
890        let config = BufferManagerConfig {
891            budget: 1000,
892            soft_limit_fraction: 0.70,
893            evict_limit_fraction: 0.85,
894            hard_limit_fraction: 0.95,
895            background_eviction: false,
896            spill_path: None,
897        };
898        let manager = BufferManager::new(config);
899
900        // GrantReleaser::try_allocate_raw succeeds when under hard limit
901        let success = manager.try_allocate_raw(100, MemoryRegion::GraphStorage);
902        assert!(success);
903        assert_eq!(manager.allocated(), 100);
904        assert_eq!(
905            manager.stats().region_usage(MemoryRegion::GraphStorage),
906            100
907        );
908    }
909
910    #[test]
911    fn django_try_allocate_raw_fails_at_hard_limit() {
912        let config = BufferManagerConfig {
913            budget: 1000,
914            soft_limit_fraction: 0.70,
915            evict_limit_fraction: 0.85,
916            hard_limit_fraction: 0.95,
917            background_eviction: false,
918            spill_path: None,
919        };
920        let manager = BufferManager::new(config);
921
922        // Fill up to hard limit
923        manager.allocated.store(940, Ordering::Relaxed);
924
925        // This exceeds hard limit (940 + 100 = 1040 > 950), no consumers to evict
926        let success = manager.try_allocate_raw(100, MemoryRegion::ExecutionBuffers);
927        assert!(!success);
928    }
929
930    #[test]
931    fn shosanna_drop_sets_shutdown() {
932        // Create and immediately drop to exercise the Drop impl
933        let manager = BufferManager::with_budget(512);
934        drop(manager);
935        // If we get here without panic, the Drop impl ran successfully.
936    }
937
938    #[test]
939    fn hans_eviction_with_zero_usage_consumer() {
940        let manager = BufferManager::with_budget(10000);
941        // Consumer with zero usage: target_evict will be 0, so evict is skipped
942        let consumer = TestConsumer::new(
943            "empty",
944            0,
945            priorities::SPILL_STAGING,
946            MemoryRegion::SpillStaging,
947        );
948        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
949        manager.allocated.store(500, Ordering::Relaxed);
950
951        let freed = manager.evict_to_target(200);
952        // Consumer has 0 usage, so target_evict = min(300, 0/2) = 0, evict skipped
953        assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
954        assert_eq!(freed, 0);
955    }
956
957    #[test]
958    fn beatrix_grant_releaser_release_decrements() {
959        let config = BufferManagerConfig {
960            budget: 1000,
961            soft_limit_fraction: 0.70,
962            evict_limit_fraction: 0.85,
963            hard_limit_fraction: 0.95,
964            background_eviction: false,
965            spill_path: None,
966        };
967        let manager = BufferManager::new(config);
968
969        // Allocate via try_allocate_raw, then release via GrantReleaser trait
970        assert!(manager.try_allocate_raw(200, MemoryRegion::IndexBuffers));
971        assert_eq!(manager.allocated(), 200);
972
973        manager.release(200, MemoryRegion::IndexBuffers);
974        assert_eq!(manager.allocated(), 0);
975        assert_eq!(manager.stats().region_usage(MemoryRegion::IndexBuffers), 0);
976    }
977
978    /// Consumer whose spill() returns an error to exercise the Err(_) => continue path.
979    struct FailingSpillConsumer {
980        name: String,
981        usage: AtomicUsize,
982        priority: u8,
983        region: MemoryRegion,
984    }
985
986    impl FailingSpillConsumer {
987        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
988            Arc::new(Self {
989                name: name.to_string(),
990                usage: AtomicUsize::new(usage),
991                priority,
992                region,
993            })
994        }
995    }
996
997    impl MemoryConsumer for FailingSpillConsumer {
998        fn name(&self) -> &str {
999            &self.name
1000        }
1001
1002        fn memory_usage(&self) -> usize {
1003            self.usage.load(Ordering::Relaxed)
1004        }
1005
1006        fn eviction_priority(&self) -> u8 {
1007            self.priority
1008        }
1009
1010        fn region(&self) -> MemoryRegion {
1011            self.region
1012        }
1013
1014        fn evict(&self, _target_bytes: usize) -> usize {
1015            0 // eviction always fails
1016        }
1017
1018        fn can_spill(&self) -> bool {
1019            true
1020        }
1021
1022        fn spill(
1023            &self,
1024            _target_bytes: usize,
1025        ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
1026            Err(crate::memory::buffer::consumer::SpillError::IoError(
1027                "disk full".to_string(),
1028            ))
1029        }
1030    }
1031
1032    #[test]
1033    fn vincent_spill_error_continues_to_next_consumer() {
1034        let manager = BufferManager::with_budget(10000);
1035
1036        // First consumer: spill fails
1037        let failing = FailingSpillConsumer::new(
1038            "failing_spill",
1039            500,
1040            priorities::SPILL_STAGING,
1041            MemoryRegion::SpillStaging,
1042        );
1043
1044        // Second consumer: spill succeeds
1045        let working = SpillableConsumer::new_evict_fails(
1046            "working_spill",
1047            500,
1048            priorities::QUERY_CACHE,
1049            MemoryRegion::ExecutionBuffers,
1050            true,
1051        );
1052
1053        manager.register_consumer(Arc::clone(&failing) as Arc<dyn MemoryConsumer>);
1054        manager.register_consumer(Arc::clone(&working) as Arc<dyn MemoryConsumer>);
1055        manager.allocated.store(2000, Ordering::Relaxed);
1056
1057        let freed = manager.evict_to_target(1500);
1058        // failing consumer's spill errors out, working consumer's spill succeeds
1059        assert!(working.spilled.load(Ordering::Relaxed) > 0);
1060        assert!(freed > 0);
1061    }
1062
1063    #[test]
1064    fn django_detect_system_memory_returns_positive() {
1065        let mem = BufferManagerConfig::detect_system_memory();
1066        assert!(mem > 0);
1067    }
1068
1069    #[test]
1070    fn shosanna_spill_path_config() {
1071        let config = BufferManagerConfig {
1072            budget: 1024,
1073            spill_path: Some(PathBuf::from("/tmp/grafeo-spill")),
1074            ..Default::default()
1075        };
1076        assert_eq!(
1077            config.spill_path.as_ref().unwrap().to_str().unwrap(),
1078            "/tmp/grafeo-spill"
1079        );
1080        let manager = BufferManager::new(config);
1081        assert!(manager.config().spill_path.is_some());
1082    }
1083}