Skip to main content

grafeo_common/memory/buffer/
manager.rs

1//! Unified buffer manager implementation.
2
3use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12/// Default memory budget as a fraction of system memory.
13const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15/// Configuration for the buffer manager.
16#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18    /// Total memory budget in bytes.
19    pub budget: usize,
20    /// Soft limit threshold (default: 70%).
21    pub soft_limit_fraction: f64,
22    /// Eviction threshold (default: 85%).
23    pub evict_limit_fraction: f64,
24    /// Hard limit threshold (default: 95%).
25    pub hard_limit_fraction: f64,
26    /// Enable background eviction thread.
27    pub background_eviction: bool,
28    /// Directory for spilling data to disk.
29    pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33    /// Detects system memory size.
34    ///
35    /// Returns a conservative estimate if detection fails.
36    #[must_use]
37    pub fn detect_system_memory() -> usize {
38        // Under Miri, file I/O is blocked by isolation: use fallback directly
39        #[cfg(miri)]
40        {
41            return Self::fallback_system_memory();
42        }
43
44        // Try to detect system memory
45        // On failure, return a conservative 1GB default
46        #[cfg(not(miri))]
47        {
48            #[cfg(target_os = "windows")]
49            {
50                // Windows: Use GetPhysicallyInstalledSystemMemory or GlobalMemoryStatusEx
51                // For now, use a fallback
52                Self::fallback_system_memory()
53            }
54
55            #[cfg(target_os = "linux")]
56            {
57                // Linux: Read from /proc/meminfo
58                if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
59                    for line in contents.lines() {
60                        if line.starts_with("MemTotal:")
61                            && let Some(kb_str) = line.split_whitespace().nth(1)
62                            && let Ok(kb) = kb_str.parse::<usize>()
63                        {
64                            return kb * 1024;
65                        }
66                    }
67                }
68                Self::fallback_system_memory()
69            }
70
71            #[cfg(target_os = "macos")]
72            {
73                // macOS: Use sysctl
74                Self::fallback_system_memory()
75            }
76
77            #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
78            {
79                Self::fallback_system_memory()
80            }
81        }
82    }
83
84    fn fallback_system_memory() -> usize {
85        // Default to 1GB if detection fails
86        1024 * 1024 * 1024
87    }
88
89    /// Creates a config with the given budget.
90    #[must_use]
91    pub fn with_budget(budget: usize) -> Self {
92        Self {
93            budget,
94            ..Default::default()
95        }
96    }
97}
98
99impl Default for BufferManagerConfig {
100    fn default() -> Self {
101        let system_memory = Self::detect_system_memory();
102        Self {
103            budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
104            soft_limit_fraction: 0.70,
105            evict_limit_fraction: 0.85,
106            hard_limit_fraction: 0.95,
107            background_eviction: false, // Disabled by default for simplicity
108            spill_path: None,
109        }
110    }
111}
112
113/// The central unified buffer manager.
114///
115/// Manages memory allocation across all subsystems with pressure-aware
116/// eviction and optional spilling support.
117pub struct BufferManager {
118    /// Configuration.
119    config: BufferManagerConfig,
120    /// Total allocated bytes.
121    allocated: AtomicUsize,
122    /// Per-region allocated bytes.
123    region_allocated: [AtomicUsize; 4],
124    /// Registered memory consumers.
125    consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
126    /// Computed soft limit in bytes.
127    soft_limit: usize,
128    /// Computed eviction limit in bytes.
129    evict_limit: usize,
130    /// Computed hard limit in bytes.
131    hard_limit: usize,
132    /// Shutdown flag.
133    shutdown: AtomicBool,
134}
135
136impl BufferManager {
137    /// Creates a new buffer manager with the given configuration.
138    #[must_use]
139    pub fn new(config: BufferManagerConfig) -> Arc<Self> {
140        let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
141        let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
142        let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
143
144        Arc::new(Self {
145            config,
146            allocated: AtomicUsize::new(0),
147            region_allocated: [
148                AtomicUsize::new(0),
149                AtomicUsize::new(0),
150                AtomicUsize::new(0),
151                AtomicUsize::new(0),
152            ],
153            consumers: RwLock::new(Vec::new()),
154            soft_limit,
155            evict_limit,
156            hard_limit,
157            shutdown: AtomicBool::new(false),
158        })
159    }
160
161    /// Creates a buffer manager with default configuration.
162    #[must_use]
163    pub fn with_defaults() -> Arc<Self> {
164        Self::new(BufferManagerConfig::default())
165    }
166
167    /// Creates a buffer manager with a specific budget.
168    #[must_use]
169    pub fn with_budget(budget: usize) -> Arc<Self> {
170        Self::new(BufferManagerConfig::with_budget(budget))
171    }
172
173    /// Attempts to allocate memory for the given region.
174    ///
175    /// Returns `None` if allocation would exceed the hard limit after
176    /// eviction attempts.
177    pub fn try_allocate(
178        self: &Arc<Self>,
179        size: usize,
180        region: MemoryRegion,
181    ) -> Option<MemoryGrant> {
182        // Check if we can allocate
183        let current = self.allocated.load(Ordering::Relaxed);
184
185        if current + size > self.hard_limit {
186            // Try eviction first
187            self.run_eviction_cycle(true);
188
189            // Check again
190            let current = self.allocated.load(Ordering::Relaxed);
191            if current + size > self.hard_limit {
192                return None;
193            }
194        }
195
196        // Perform allocation
197        self.allocated.fetch_add(size, Ordering::Relaxed);
198        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
199
200        // Check pressure and potentially trigger background eviction
201        self.check_pressure();
202
203        Some(MemoryGrant::new(
204            Arc::clone(self) as Arc<dyn GrantReleaser>,
205            size,
206            region,
207        ))
208    }
209
210    /// Returns the current pressure level.
211    #[must_use]
212    pub fn pressure_level(&self) -> PressureLevel {
213        let current = self.allocated.load(Ordering::Relaxed);
214        self.compute_pressure_level(current)
215    }
216
217    /// Returns current buffer statistics.
218    #[must_use]
219    pub fn stats(&self) -> BufferStats {
220        let total_allocated = self.allocated.load(Ordering::Relaxed);
221        BufferStats {
222            budget: self.config.budget,
223            total_allocated,
224            region_allocated: [
225                self.region_allocated[0].load(Ordering::Relaxed),
226                self.region_allocated[1].load(Ordering::Relaxed),
227                self.region_allocated[2].load(Ordering::Relaxed),
228                self.region_allocated[3].load(Ordering::Relaxed),
229            ],
230            pressure_level: self.compute_pressure_level(total_allocated),
231            consumer_count: self.consumers.read().len(),
232        }
233    }
234
235    /// Registers a memory consumer for eviction callbacks.
236    pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
237        self.consumers.write().push(consumer);
238    }
239
240    /// Unregisters a memory consumer by name.
241    pub fn unregister_consumer(&self, name: &str) {
242        self.consumers.write().retain(|c| c.name() != name);
243    }
244
245    /// Forces eviction to reach the target usage.
246    ///
247    /// Returns the number of bytes actually freed.
248    pub fn evict_to_target(&self, target_bytes: usize) -> usize {
249        let current = self.allocated.load(Ordering::Relaxed);
250        if current <= target_bytes {
251            return 0;
252        }
253
254        let to_free = current - target_bytes;
255        self.run_eviction_internal(to_free)
256    }
257
258    /// Spills all consumers that support it, regardless of memory pressure.
259    ///
260    /// Used when `TierOverride::ForceDisk` is configured. Returns total bytes freed.
261    pub fn spill_all(&self) -> usize {
262        let consumers = self.consumers.read();
263        let mut total_freed = 0;
264        for consumer in consumers.iter() {
265            if consumer.can_spill()
266                && let Ok(freed) = consumer.spill(usize::MAX)
267            {
268                total_freed += freed;
269            }
270        }
271        total_freed
272    }
273
274    /// Returns the configuration.
275    #[must_use]
276    pub fn config(&self) -> &BufferManagerConfig {
277        &self.config
278    }
279
280    /// Returns the memory budget.
281    #[must_use]
282    pub fn budget(&self) -> usize {
283        self.config.budget
284    }
285
286    /// Returns currently allocated bytes.
287    #[must_use]
288    pub fn allocated(&self) -> usize {
289        self.allocated.load(Ordering::Relaxed)
290    }
291
292    /// Returns available bytes.
293    #[must_use]
294    pub fn available(&self) -> usize {
295        self.config
296            .budget
297            .saturating_sub(self.allocated.load(Ordering::Relaxed))
298    }
299
300    /// Shuts down the buffer manager.
301    pub fn shutdown(&self) {
302        self.shutdown.store(true, Ordering::Relaxed);
303    }
304
305    // === Internal methods ===
306
307    fn compute_pressure_level(&self, current: usize) -> PressureLevel {
308        if current >= self.hard_limit {
309            PressureLevel::Critical
310        } else if current >= self.evict_limit {
311            PressureLevel::High
312        } else if current >= self.soft_limit {
313            PressureLevel::Moderate
314        } else {
315            PressureLevel::Normal
316        }
317    }
318
319    fn check_pressure(&self) {
320        let level = self.pressure_level();
321        if level.requires_eviction() {
322            // In a more complete implementation, this would signal
323            // a background thread. For now, do synchronous eviction.
324            let aggressive = level >= PressureLevel::High;
325            self.run_eviction_cycle(aggressive);
326        }
327    }
328
329    fn run_eviction_cycle(&self, aggressive: bool) -> usize {
330        let target = if aggressive {
331            self.soft_limit
332        } else {
333            self.evict_limit
334        };
335
336        let current = self.allocated.load(Ordering::Relaxed);
337        if current <= target {
338            return 0;
339        }
340
341        let to_free = current - target;
342        self.run_eviction_internal(to_free)
343    }
344
345    fn run_eviction_internal(&self, to_free: usize) -> usize {
346        let consumers = self.consumers.read();
347
348        // Sort consumers by priority (lowest first = evict first)
349        let mut sorted: Vec<_> = consumers.iter().collect();
350        sorted.sort_by_key(|c| c.eviction_priority());
351
352        let mut total_freed = 0;
353        for consumer in &sorted {
354            if total_freed >= to_free {
355                break;
356            }
357
358            let remaining = to_free - total_freed;
359            let consumer_usage = consumer.memory_usage();
360
361            // Ask consumer to evict up to half its usage or remaining needed
362            let target_evict = remaining.min(consumer_usage / 2);
363            if target_evict > 0 {
364                let freed = consumer.evict(target_evict);
365                total_freed += freed;
366            }
367        }
368
369        // If eviction was not enough, try spilling to disk for consumers
370        // that support it (e.g., vector indexes with mmap storage).
371        if total_freed < to_free {
372            for consumer in &sorted {
373                if total_freed >= to_free {
374                    break;
375                }
376                if !consumer.can_spill() {
377                    continue;
378                }
379                let remaining = to_free - total_freed;
380                match consumer.spill(remaining) {
381                    Ok(freed) => total_freed += freed,
382                    Err(_) => continue,
383                }
384            }
385        }
386
387        total_freed
388    }
389}
390
391impl GrantReleaser for BufferManager {
392    fn release(&self, size: usize, region: MemoryRegion) {
393        self.allocated.fetch_sub(size, Ordering::Relaxed);
394        self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
395    }
396
397    fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
398        let current = self.allocated.load(Ordering::Relaxed);
399
400        if current + size > self.hard_limit {
401            // Try eviction
402            self.run_eviction_cycle(true);
403
404            let current = self.allocated.load(Ordering::Relaxed);
405            if current + size > self.hard_limit {
406                return false;
407            }
408        }
409
410        self.allocated.fetch_add(size, Ordering::Relaxed);
411        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
412        true
413    }
414}
415
416impl Drop for BufferManager {
417    fn drop(&mut self) {
418        self.shutdown.store(true, Ordering::Relaxed);
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use crate::memory::buffer::consumer::priorities;
426    use std::sync::atomic::AtomicUsize;
427
428    struct TestConsumer {
429        name: String,
430        usage: AtomicUsize,
431        priority: u8,
432        region: MemoryRegion,
433        evicted: AtomicUsize,
434    }
435
436    impl TestConsumer {
437        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
438            Arc::new(Self {
439                name: name.to_string(),
440                usage: AtomicUsize::new(usage),
441                priority,
442                region,
443                evicted: AtomicUsize::new(0),
444            })
445        }
446    }
447
448    impl MemoryConsumer for TestConsumer {
449        fn name(&self) -> &str {
450            &self.name
451        }
452
453        fn memory_usage(&self) -> usize {
454            self.usage.load(Ordering::Relaxed)
455        }
456
457        fn eviction_priority(&self) -> u8 {
458            self.priority
459        }
460
461        fn region(&self) -> MemoryRegion {
462            self.region
463        }
464
465        fn evict(&self, target_bytes: usize) -> usize {
466            let current = self.usage.load(Ordering::Relaxed);
467            let to_evict = target_bytes.min(current);
468            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
469            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
470            to_evict
471        }
472    }
473
474    #[test]
475    fn test_basic_allocation() {
476        let config = BufferManagerConfig {
477            budget: 1024 * 1024, // 1MB
478            ..Default::default()
479        };
480        let manager = BufferManager::new(config);
481
482        let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
483        assert!(grant.is_some());
484        assert_eq!(manager.stats().total_allocated, 1024);
485    }
486
487    #[test]
488    fn test_grant_raii_release() {
489        let config = BufferManagerConfig {
490            budget: 1024,
491            ..Default::default()
492        };
493        let manager = BufferManager::new(config);
494
495        {
496            let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
497            assert_eq!(manager.stats().total_allocated, 512);
498        }
499
500        // Grant dropped, memory should be released
501        assert_eq!(manager.stats().total_allocated, 0);
502    }
503
504    #[test]
505    fn test_pressure_levels() {
506        let config = BufferManagerConfig {
507            budget: 1000,
508            soft_limit_fraction: 0.70,
509            evict_limit_fraction: 0.85,
510            hard_limit_fraction: 0.95,
511            background_eviction: false,
512            spill_path: None,
513        };
514        let manager = BufferManager::new(config);
515
516        assert_eq!(manager.pressure_level(), PressureLevel::Normal);
517
518        // Allocate to 70% (soft limit)
519        let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
520        assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
521
522        // Allocate to 85% (evict limit)
523        let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
524        assert_eq!(manager.pressure_level(), PressureLevel::High);
525
526        // Note: Can't easily test Critical without blocking
527    }
528
529    #[test]
530    fn test_region_tracking() {
531        let config = BufferManagerConfig {
532            budget: 10000,
533            ..Default::default()
534        };
535        let manager = BufferManager::new(config);
536
537        let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
538        let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
539        let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
540
541        let stats = manager.stats();
542        assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
543        assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
544        assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
545        assert_eq!(stats.total_allocated, 600);
546    }
547
548    #[test]
549    fn test_consumer_registration() {
550        let manager = BufferManager::with_budget(10000);
551
552        let consumer = TestConsumer::new(
553            "test",
554            1000,
555            priorities::INDEX_BUFFERS,
556            MemoryRegion::IndexBuffers,
557        );
558
559        manager.register_consumer(consumer);
560        assert_eq!(manager.stats().consumer_count, 1);
561
562        manager.unregister_consumer("test");
563        assert_eq!(manager.stats().consumer_count, 0);
564    }
565
566    #[test]
567    fn test_eviction_ordering() {
568        let manager = BufferManager::with_budget(10000);
569
570        // Low priority consumer (evict first)
571        let low_priority = TestConsumer::new(
572            "low",
573            500,
574            priorities::SPILL_STAGING,
575            MemoryRegion::SpillStaging,
576        );
577
578        // High priority consumer (evict last)
579        let high_priority = TestConsumer::new(
580            "high",
581            500,
582            priorities::ACTIVE_TRANSACTION,
583            MemoryRegion::ExecutionBuffers,
584        );
585
586        manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
587        manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
588
589        // Manually set allocated to simulate memory usage
590        // (consumers track their own usage separately from manager's allocation tracking)
591        manager.allocated.store(1000, Ordering::Relaxed);
592
593        // Request eviction to target 700 (need to free 300 bytes)
594        let freed = manager.evict_to_target(700);
595
596        // Low priority should be evicted first (up to half = 250)
597        assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
598        assert!(freed > 0);
599    }
600
601    #[test]
602    fn test_hard_limit_blocking() {
603        let config = BufferManagerConfig {
604            budget: 1000,
605            soft_limit_fraction: 0.70,
606            evict_limit_fraction: 0.85,
607            hard_limit_fraction: 0.95,
608            background_eviction: false,
609            spill_path: None,
610        };
611        let manager = BufferManager::new(config);
612
613        // Allocate up to hard limit (950 bytes)
614        let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
615
616        // This should fail (would exceed hard limit)
617        let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
618        assert!(g2.is_none());
619    }
620
621    #[test]
622    fn test_available_memory() {
623        let manager = BufferManager::with_budget(1000);
624
625        assert_eq!(manager.available(), 1000);
626
627        let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
628        assert_eq!(manager.available(), 700);
629    }
630
631    // --- Spill-aware test consumer ---
632
633    struct SpillableConsumer {
634        name: String,
635        usage: AtomicUsize,
636        priority: u8,
637        region: MemoryRegion,
638        evicted: AtomicUsize,
639        spilled: AtomicUsize,
640        spillable: bool,
641        evict_returns_zero: bool,
642    }
643
644    impl SpillableConsumer {
645        fn new(
646            name: &str,
647            usage: usize,
648            priority: u8,
649            region: MemoryRegion,
650            spillable: bool,
651        ) -> Arc<Self> {
652            Arc::new(Self {
653                name: name.to_string(),
654                usage: AtomicUsize::new(usage),
655                priority,
656                region,
657                evicted: AtomicUsize::new(0),
658                spilled: AtomicUsize::new(0),
659                spillable,
660                evict_returns_zero: false,
661            })
662        }
663
664        fn new_evict_fails(
665            name: &str,
666            usage: usize,
667            priority: u8,
668            region: MemoryRegion,
669            spillable: bool,
670        ) -> Arc<Self> {
671            Arc::new(Self {
672                name: name.to_string(),
673                usage: AtomicUsize::new(usage),
674                priority,
675                region,
676                evicted: AtomicUsize::new(0),
677                spilled: AtomicUsize::new(0),
678                spillable,
679                evict_returns_zero: true,
680            })
681        }
682    }
683
684    impl MemoryConsumer for SpillableConsumer {
685        fn name(&self) -> &str {
686            &self.name
687        }
688
689        fn memory_usage(&self) -> usize {
690            self.usage.load(Ordering::Relaxed)
691        }
692
693        fn eviction_priority(&self) -> u8 {
694            self.priority
695        }
696
697        fn region(&self) -> MemoryRegion {
698            self.region
699        }
700
701        fn evict(&self, target_bytes: usize) -> usize {
702            if self.evict_returns_zero {
703                return 0;
704            }
705            let current = self.usage.load(Ordering::Relaxed);
706            let to_evict = target_bytes.min(current);
707            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
708            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
709            to_evict
710        }
711
712        fn can_spill(&self) -> bool {
713            self.spillable
714        }
715
716        fn spill(
717            &self,
718            target_bytes: usize,
719        ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
720            if !self.spillable {
721                return Err(crate::memory::buffer::consumer::SpillError::NotSupported);
722            }
723            let current = self.usage.load(Ordering::Relaxed);
724            let to_spill = target_bytes.min(current);
725            self.usage.fetch_sub(to_spill, Ordering::Relaxed);
726            self.spilled.fetch_add(to_spill, Ordering::Relaxed);
727            Ok(to_spill)
728        }
729    }
730
731    #[test]
732    fn test_spill_all_calls_spillable_consumers() {
733        let manager = BufferManager::with_budget(10000);
734        let spillable = SpillableConsumer::new(
735            "spillable",
736            500,
737            priorities::QUERY_CACHE,
738            MemoryRegion::ExecutionBuffers,
739            true,
740        );
741        let non_spillable = SpillableConsumer::new(
742            "non_spillable",
743            500,
744            priorities::QUERY_CACHE,
745            MemoryRegion::ExecutionBuffers,
746            false,
747        );
748        manager.register_consumer(Arc::clone(&spillable) as Arc<dyn MemoryConsumer>);
749        manager.register_consumer(Arc::clone(&non_spillable) as Arc<dyn MemoryConsumer>);
750
751        let freed = manager.spill_all();
752        assert_eq!(freed, 500);
753        assert_eq!(spillable.spilled.load(Ordering::Relaxed), 500);
754        assert_eq!(non_spillable.spilled.load(Ordering::Relaxed), 0);
755    }
756
757    #[test]
758    fn test_spill_all_skips_non_spillable() {
759        let manager = BufferManager::with_budget(10000);
760        let consumer = SpillableConsumer::new(
761            "no_spill",
762            1000,
763            priorities::INDEX_BUFFERS,
764            MemoryRegion::IndexBuffers,
765            false,
766        );
767        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
768
769        assert_eq!(manager.spill_all(), 0);
770        assert_eq!(consumer.memory_usage(), 1000);
771    }
772
773    #[test]
774    fn test_eviction_falls_back_to_spill() {
775        let manager = BufferManager::with_budget(10000);
776        let consumer = SpillableConsumer::new_evict_fails(
777            "spill_fallback",
778            1000,
779            priorities::QUERY_CACHE,
780            MemoryRegion::ExecutionBuffers,
781            true,
782        );
783        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
784        manager.allocated.store(2000, Ordering::Relaxed);
785
786        let freed = manager.evict_to_target(1500);
787        assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
788        assert!(consumer.spilled.load(Ordering::Relaxed) > 0);
789        assert!(freed > 0);
790    }
791
792    #[test]
793    fn test_eviction_no_spill_when_sufficient() {
794        let manager = BufferManager::with_budget(10000);
795        let consumer = SpillableConsumer::new(
796            "eviction_enough",
797            1000,
798            priorities::QUERY_CACHE,
799            MemoryRegion::ExecutionBuffers,
800            true,
801        );
802        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
803        manager.allocated.store(1200, Ordering::Relaxed);
804
805        let freed = manager.evict_to_target(1000);
806        assert_eq!(freed, 200);
807        assert_eq!(consumer.spilled.load(Ordering::Relaxed), 0);
808    }
809
810    #[test]
811    fn test_eviction_spill_skips_non_spillable() {
812        let manager = BufferManager::with_budget(10000);
813        let consumer = SpillableConsumer::new_evict_fails(
814            "no_spill",
815            1000,
816            priorities::QUERY_CACHE,
817            MemoryRegion::ExecutionBuffers,
818            false,
819        );
820        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
821        manager.allocated.store(2000, Ordering::Relaxed);
822
823        let freed = manager.evict_to_target(1500);
824        assert_eq!(freed, 0);
825        assert_eq!(consumer.memory_usage(), 1000);
826    }
827
828    #[test]
829    fn alix_with_defaults_creates_manager() {
830        let manager = BufferManager::with_defaults();
831        // with_defaults uses system memory detection, budget should be > 0
832        assert!(manager.budget() > 0);
833        assert_eq!(manager.allocated(), 0);
834        assert_eq!(manager.available(), manager.budget());
835    }
836
837    #[test]
838    fn gus_config_accessor_returns_budget() {
839        let manager = BufferManager::with_budget(4096);
840        let config = manager.config();
841        assert_eq!(config.budget, 4096);
842        assert!(!config.background_eviction);
843        assert!(config.spill_path.is_none());
844    }
845
846    #[test]
847    fn vincent_shutdown_sets_flag() {
848        let manager = BufferManager::with_budget(1000);
849        manager.shutdown();
850        // shutdown stores true; drop also stores true, so this just verifies
851        // the method runs without error and the manager remains usable
852        assert_eq!(manager.allocated(), 0);
853    }
854
855    #[test]
856    fn jules_critical_pressure_level() {
857        let config = BufferManagerConfig {
858            budget: 1000,
859            soft_limit_fraction: 0.70,
860            evict_limit_fraction: 0.85,
861            hard_limit_fraction: 0.95,
862            background_eviction: false,
863            spill_path: None,
864        };
865        let manager = BufferManager::new(config);
866
867        // Manually set allocated above hard limit to test Critical level
868        manager.allocated.store(960, Ordering::Relaxed);
869        assert_eq!(manager.pressure_level(), PressureLevel::Critical);
870    }
871
872    #[test]
873    fn mia_evict_to_target_already_below() {
874        let manager = BufferManager::with_budget(10000);
875        // allocated is 0, target is 5000: already below target
876        let freed = manager.evict_to_target(5000);
877        assert_eq!(freed, 0);
878    }
879
880    #[test]
881    fn butch_try_allocate_raw_success() {
882        let config = BufferManagerConfig {
883            budget: 1000,
884            soft_limit_fraction: 0.70,
885            evict_limit_fraction: 0.85,
886            hard_limit_fraction: 0.95,
887            background_eviction: false,
888            spill_path: None,
889        };
890        let manager = BufferManager::new(config);
891
892        // GrantReleaser::try_allocate_raw succeeds when under hard limit
893        let success = manager.try_allocate_raw(100, MemoryRegion::GraphStorage);
894        assert!(success);
895        assert_eq!(manager.allocated(), 100);
896        assert_eq!(
897            manager.stats().region_usage(MemoryRegion::GraphStorage),
898            100
899        );
900    }
901
902    #[test]
903    fn django_try_allocate_raw_fails_at_hard_limit() {
904        let config = BufferManagerConfig {
905            budget: 1000,
906            soft_limit_fraction: 0.70,
907            evict_limit_fraction: 0.85,
908            hard_limit_fraction: 0.95,
909            background_eviction: false,
910            spill_path: None,
911        };
912        let manager = BufferManager::new(config);
913
914        // Fill up to hard limit
915        manager.allocated.store(940, Ordering::Relaxed);
916
917        // This exceeds hard limit (940 + 100 = 1040 > 950), no consumers to evict
918        let success = manager.try_allocate_raw(100, MemoryRegion::ExecutionBuffers);
919        assert!(!success);
920    }
921
922    #[test]
923    fn shosanna_drop_sets_shutdown() {
924        // Create and immediately drop to exercise the Drop impl
925        let manager = BufferManager::with_budget(512);
926        drop(manager);
927        // If we get here without panic, the Drop impl ran successfully.
928    }
929
930    #[test]
931    fn hans_eviction_with_zero_usage_consumer() {
932        let manager = BufferManager::with_budget(10000);
933        // Consumer with zero usage: target_evict will be 0, so evict is skipped
934        let consumer = TestConsumer::new(
935            "empty",
936            0,
937            priorities::SPILL_STAGING,
938            MemoryRegion::SpillStaging,
939        );
940        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
941        manager.allocated.store(500, Ordering::Relaxed);
942
943        let freed = manager.evict_to_target(200);
944        // Consumer has 0 usage, so target_evict = min(300, 0/2) = 0, evict skipped
945        assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
946        assert_eq!(freed, 0);
947    }
948
949    #[test]
950    fn beatrix_grant_releaser_release_decrements() {
951        let config = BufferManagerConfig {
952            budget: 1000,
953            soft_limit_fraction: 0.70,
954            evict_limit_fraction: 0.85,
955            hard_limit_fraction: 0.95,
956            background_eviction: false,
957            spill_path: None,
958        };
959        let manager = BufferManager::new(config);
960
961        // Allocate via try_allocate_raw, then release via GrantReleaser trait
962        assert!(manager.try_allocate_raw(200, MemoryRegion::IndexBuffers));
963        assert_eq!(manager.allocated(), 200);
964
965        manager.release(200, MemoryRegion::IndexBuffers);
966        assert_eq!(manager.allocated(), 0);
967        assert_eq!(manager.stats().region_usage(MemoryRegion::IndexBuffers), 0);
968    }
969
970    /// Consumer whose spill() returns an error to exercise the Err(_) => continue path.
971    struct FailingSpillConsumer {
972        name: String,
973        usage: AtomicUsize,
974        priority: u8,
975        region: MemoryRegion,
976    }
977
978    impl FailingSpillConsumer {
979        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
980            Arc::new(Self {
981                name: name.to_string(),
982                usage: AtomicUsize::new(usage),
983                priority,
984                region,
985            })
986        }
987    }
988
989    impl MemoryConsumer for FailingSpillConsumer {
990        fn name(&self) -> &str {
991            &self.name
992        }
993
994        fn memory_usage(&self) -> usize {
995            self.usage.load(Ordering::Relaxed)
996        }
997
998        fn eviction_priority(&self) -> u8 {
999            self.priority
1000        }
1001
1002        fn region(&self) -> MemoryRegion {
1003            self.region
1004        }
1005
1006        fn evict(&self, _target_bytes: usize) -> usize {
1007            0 // eviction always fails
1008        }
1009
1010        fn can_spill(&self) -> bool {
1011            true
1012        }
1013
1014        fn spill(
1015            &self,
1016            _target_bytes: usize,
1017        ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
1018            Err(crate::memory::buffer::consumer::SpillError::IoError(
1019                "disk full".to_string(),
1020            ))
1021        }
1022    }
1023
1024    #[test]
1025    fn vincent_spill_error_continues_to_next_consumer() {
1026        let manager = BufferManager::with_budget(10000);
1027
1028        // First consumer: spill fails
1029        let failing = FailingSpillConsumer::new(
1030            "failing_spill",
1031            500,
1032            priorities::SPILL_STAGING,
1033            MemoryRegion::SpillStaging,
1034        );
1035
1036        // Second consumer: spill succeeds
1037        let working = SpillableConsumer::new_evict_fails(
1038            "working_spill",
1039            500,
1040            priorities::QUERY_CACHE,
1041            MemoryRegion::ExecutionBuffers,
1042            true,
1043        );
1044
1045        manager.register_consumer(Arc::clone(&failing) as Arc<dyn MemoryConsumer>);
1046        manager.register_consumer(Arc::clone(&working) as Arc<dyn MemoryConsumer>);
1047        manager.allocated.store(2000, Ordering::Relaxed);
1048
1049        let freed = manager.evict_to_target(1500);
1050        // failing consumer's spill errors out, working consumer's spill succeeds
1051        assert!(working.spilled.load(Ordering::Relaxed) > 0);
1052        assert!(freed > 0);
1053    }
1054
1055    #[test]
1056    fn django_detect_system_memory_returns_positive() {
1057        let mem = BufferManagerConfig::detect_system_memory();
1058        assert!(mem > 0);
1059    }
1060
1061    #[test]
1062    fn shosanna_spill_path_config() {
1063        let config = BufferManagerConfig {
1064            budget: 1024,
1065            spill_path: Some(PathBuf::from("/tmp/grafeo-spill")),
1066            ..Default::default()
1067        };
1068        assert_eq!(
1069            config.spill_path.as_ref().unwrap().to_str().unwrap(),
1070            "/tmp/grafeo-spill"
1071        );
1072        let manager = BufferManager::new(config);
1073        assert!(manager.config().spill_path.is_some());
1074    }
1075}