Skip to main content

grafeo_common/memory/buffer/
manager.rs

1//! Unified buffer manager implementation.
2
3use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::collections::HashSet;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
12
13/// Default memory budget as a fraction of system memory.
14const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
15
16/// Configuration for the buffer manager.
17#[derive(Debug, Clone)]
18pub struct BufferManagerConfig {
19    /// Total memory budget in bytes.
20    pub budget: usize,
21    /// Soft limit threshold (default: 70%).
22    pub soft_limit_fraction: f64,
23    /// Eviction threshold (default: 85%).
24    pub evict_limit_fraction: f64,
25    /// Hard limit threshold (default: 95%).
26    pub hard_limit_fraction: f64,
27    /// Enable background eviction thread.
28    pub background_eviction: bool,
29    /// Directory for spilling data to disk.
30    pub spill_path: Option<PathBuf>,
31}
32
33impl BufferManagerConfig {
34    /// Detects system memory size.
35    ///
36    /// Returns a conservative estimate if detection fails.
37    #[must_use]
38    pub fn detect_system_memory() -> usize {
39        // Under Miri, file I/O is blocked by isolation: use fallback directly
40        #[cfg(miri)]
41        {
42            return Self::fallback_system_memory();
43        }
44
45        // Try to detect system memory
46        // On failure, return a conservative 1GB default
47        #[cfg(not(miri))]
48        {
49            #[cfg(target_os = "windows")]
50            {
51                // Windows: Use GetPhysicallyInstalledSystemMemory or GlobalMemoryStatusEx
52                // For now, use a fallback
53                Self::fallback_system_memory()
54            }
55
56            #[cfg(target_os = "linux")]
57            {
58                // Linux: Read from /proc/meminfo
59                if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
60                    for line in contents.lines() {
61                        if line.starts_with("MemTotal:")
62                            && let Some(kb_str) = line.split_whitespace().nth(1)
63                            && let Ok(kb) = kb_str.parse::<usize>()
64                        {
65                            return kb * 1024;
66                        }
67                    }
68                }
69                Self::fallback_system_memory()
70            }
71
72            #[cfg(target_os = "macos")]
73            {
74                // macOS: Use sysctl
75                Self::fallback_system_memory()
76            }
77
78            #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
79            {
80                Self::fallback_system_memory()
81            }
82        }
83    }
84
85    fn fallback_system_memory() -> usize {
86        // Default to 1GB if detection fails
87        1024 * 1024 * 1024
88    }
89
90    /// Creates a config with the given budget.
91    #[must_use]
92    pub fn with_budget(budget: usize) -> Self {
93        Self {
94            budget,
95            ..Default::default()
96        }
97    }
98}
99
100impl Default for BufferManagerConfig {
101    fn default() -> Self {
102        let system_memory = Self::detect_system_memory();
103        Self {
104            // reason: memory fraction (0.0..1.0) of a positive usize is always a valid positive usize
105            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
106            budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
107            soft_limit_fraction: 0.70,
108            evict_limit_fraction: 0.85,
109            hard_limit_fraction: 0.95,
110            background_eviction: false, // Disabled by default for simplicity
111            spill_path: None,
112        }
113    }
114}
115
116/// The central unified buffer manager.
117///
118/// Manages memory allocation across all subsystems with pressure-aware
119/// eviction and optional spilling support.
120pub struct BufferManager {
121    /// Configuration.
122    config: BufferManagerConfig,
123    /// Total allocated bytes.
124    allocated: AtomicUsize,
125    /// Per-region allocated bytes.
126    region_allocated: [AtomicUsize; 4],
127    /// Registered memory consumers.
128    consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
129    /// Set of consumer names pinned to RAM via `TierOverride::ForceRam`.
130    ///
131    /// Phase 8g enforcement: any spill loop ([`Self::run_eviction_internal`],
132    /// [`Self::spill_all`], [`Self::spill_consumer_by_name`]) skips consumers
133    /// whose name is in this set. Populated by the engine at startup based on
134    /// `Config.section_configs`.
135    force_ram_consumers: RwLock<HashSet<String>>,
136    /// Computed soft limit in bytes.
137    soft_limit: usize,
138    /// Computed eviction limit in bytes.
139    evict_limit: usize,
140    /// Computed hard limit in bytes.
141    hard_limit: usize,
142    /// Shutdown flag.
143    shutdown: AtomicBool,
144}
145
146impl BufferManager {
147    /// Creates a new buffer manager with the given configuration.
148    #[must_use]
149    pub fn new(config: BufferManagerConfig) -> Arc<Self> {
150        // reason: limit fractions (0.0..1.0) of a positive usize are always valid positive usizes
151        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
152        let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
153        // reason: limit fractions (0.0..1.0) of a positive usize are always valid positive usizes
154        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
155        let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
156        // reason: limit fractions (0.0..1.0) of a positive usize are always valid positive usizes
157        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
158        let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
159
160        Arc::new(Self {
161            config,
162            allocated: AtomicUsize::new(0),
163            region_allocated: [
164                AtomicUsize::new(0),
165                AtomicUsize::new(0),
166                AtomicUsize::new(0),
167                AtomicUsize::new(0),
168            ],
169            consumers: RwLock::new(Vec::new()),
170            force_ram_consumers: RwLock::new(HashSet::new()),
171            soft_limit,
172            evict_limit,
173            hard_limit,
174            shutdown: AtomicBool::new(false),
175        })
176    }
177
178    /// Creates a buffer manager with default configuration.
179    #[must_use]
180    pub fn with_defaults() -> Arc<Self> {
181        Self::new(BufferManagerConfig::default())
182    }
183
184    /// Creates a buffer manager with a specific budget.
185    #[must_use]
186    pub fn with_budget(budget: usize) -> Arc<Self> {
187        Self::new(BufferManagerConfig::with_budget(budget))
188    }
189
190    /// Attempts to allocate memory for the given region.
191    ///
192    /// Returns `None` if allocation would exceed the hard limit after
193    /// eviction attempts.
194    pub fn try_allocate(
195        self: &Arc<Self>,
196        size: usize,
197        region: MemoryRegion,
198    ) -> Option<MemoryGrant> {
199        // Check if we can allocate
200        let current = self.allocated.load(Ordering::Relaxed);
201
202        if current + size > self.hard_limit {
203            // Try eviction first
204            self.run_eviction_cycle(true);
205
206            // Check again
207            let current = self.allocated.load(Ordering::Relaxed);
208            if current + size > self.hard_limit {
209                return None;
210            }
211        }
212
213        // Perform allocation
214        self.allocated.fetch_add(size, Ordering::Relaxed);
215        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
216
217        // Check pressure and potentially trigger background eviction
218        self.check_pressure();
219
220        Some(MemoryGrant::new(
221            Arc::clone(self) as Arc<dyn GrantReleaser>,
222            size,
223            region,
224        ))
225    }
226
227    /// Returns the current pressure level.
228    #[must_use]
229    pub fn pressure_level(&self) -> PressureLevel {
230        let current = self.allocated.load(Ordering::Relaxed);
231        self.compute_pressure_level(current)
232    }
233
234    /// Returns current buffer statistics.
235    #[must_use]
236    pub fn stats(&self) -> BufferStats {
237        let total_allocated = self.allocated.load(Ordering::Relaxed);
238        BufferStats {
239            budget: self.config.budget,
240            total_allocated,
241            region_allocated: [
242                self.region_allocated[0].load(Ordering::Relaxed),
243                self.region_allocated[1].load(Ordering::Relaxed),
244                self.region_allocated[2].load(Ordering::Relaxed),
245                self.region_allocated[3].load(Ordering::Relaxed),
246            ],
247            pressure_level: self.compute_pressure_level(total_allocated),
248            consumer_count: self.consumers.read().len(),
249        }
250    }
251
252    /// Registers a memory consumer for eviction callbacks.
253    pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
254        self.consumers.write().push(consumer);
255    }
256
257    /// Unregisters a memory consumer by name.
258    pub fn unregister_consumer(&self, name: &str) {
259        self.consumers.write().retain(|c| c.name() != name);
260        // Also drop any ForceRam pin so re-registering with a different
261        // tier doesn't carry over stale state.
262        self.force_ram_consumers.write().remove(name);
263    }
264
265    /// Pins a consumer to RAM via [`crate::storage::TierOverride::ForceRam`].
266    ///
267    /// After this call, no spill loop in the buffer manager will spill the
268    /// consumer with the given name. Allocation pressure that would otherwise
269    /// have spilled this consumer instead falls through to other consumers,
270    /// or fails if no other spillable consumer can free enough.
271    ///
272    /// Idempotent: calling twice with the same name is a no-op.
273    pub fn mark_force_ram(&self, name: &str) {
274        self.force_ram_consumers.write().insert(name.to_string());
275    }
276
277    /// Removes the [`crate::storage::TierOverride::ForceRam`] pin from a
278    /// consumer (Phase 8g). After this call, the consumer participates in
279    /// spill again like any other.
280    pub fn clear_force_ram(&self, name: &str) {
281        self.force_ram_consumers.write().remove(name);
282    }
283
284    /// Returns `true` if a consumer is currently pinned via ForceRam.
285    #[must_use]
286    pub fn is_force_ram(&self, name: &str) -> bool {
287        self.force_ram_consumers.read().contains(name)
288    }
289
290    /// Forces eviction to reach the target usage.
291    ///
292    /// Returns the number of bytes actually freed.
293    pub fn evict_to_target(&self, target_bytes: usize) -> usize {
294        let current = self.allocated.load(Ordering::Relaxed);
295        if current <= target_bytes {
296            return 0;
297        }
298
299        let to_free = current - target_bytes;
300        self.run_eviction_internal(to_free)
301    }
302
303    /// Spills all consumers that support it, regardless of memory pressure.
304    ///
305    /// Used when `TierOverride::ForceDisk` is configured. Returns total bytes freed.
306    /// Consumers pinned via [`Self::mark_force_ram`] are skipped.
307    pub fn spill_all(&self) -> usize {
308        let consumers = self.consumers.read();
309        let force_ram = self.force_ram_consumers.read();
310        let mut total_freed = 0;
311        for consumer in consumers.iter() {
312            if force_ram.contains(consumer.name()) {
313                continue;
314            }
315            if consumer.can_spill()
316                && let Ok(freed) = consumer.spill(usize::MAX)
317            {
318                total_freed += freed;
319            }
320        }
321        total_freed
322    }
323
324    /// Spills all consumers whose [`MemoryConsumer::name`] equals `name`.
325    ///
326    /// Used for targeted [`crate::storage::TierOverride::ForceDisk`] enforcement
327    /// at database open: each section type configured as `ForceDisk` triggers a
328    /// spill on its matching consumer only, leaving other consumers untouched.
329    ///
330    /// Best-effort: a failure on one consumer does not stop the others. Returns
331    /// total bytes freed across all matching consumers.
332    ///
333    /// If `name` is pinned via [`Self::mark_force_ram`], this call is a no-op
334    /// and returns `0`. The pin is honored even on explicit-by-name spill
335    /// requests: `ForceRam` is a hard contract.
336    pub fn spill_consumer_by_name(&self, name: &str) -> usize {
337        if self.is_force_ram(name) {
338            #[cfg(feature = "tracing")]
339            tracing::debug!(
340                target: "grafeo::buffer",
341                consumer = name,
342                "spill skipped: consumer pinned ForceRam"
343            );
344            return 0;
345        }
346        let consumers = self.consumers.read();
347        let mut total_freed = 0;
348        for consumer in consumers.iter() {
349            if consumer.name() == name
350                && consumer.can_spill()
351                && let Ok(freed) = consumer.spill(usize::MAX)
352            {
353                total_freed += freed;
354            }
355        }
356        #[cfg(feature = "tracing")]
357        tracing::info!(
358            target: "grafeo::buffer",
359            consumer = name,
360            freed_bytes = total_freed,
361            "tier transition: spill"
362        );
363        total_freed
364    }
365
366    /// Reloads OnDisk consumers back into RAM, in priority order (highest
367    /// priority first), as long as projected usage stays below
368    /// `target_fraction` of the budget.
369    ///
370    /// Phase 9a: closes the loop on the spill / reload lifecycle. Today
371    /// consumers spill on memory pressure and stay OnDisk forever; this
372    /// method gives users (or a future background thread) an explicit
373    /// trigger to bring spilled state back into RAM after pressure drops.
374    ///
375    /// The walk visits consumers whose
376    /// [`MemoryConsumer::current_tier`] is
377    /// [`super::tiered::StorageTier::OnDisk`] and calls
378    /// [`MemoryConsumer::reload`] on each. After each reload, if current
379    /// allocation exceeds `target_fraction * budget`, the loop stops and
380    /// leaves remaining consumers on disk. `reload()` errors are
381    /// logged-and-skipped: the operation is best-effort.
382    ///
383    /// Returns the number of consumers successfully reloaded.
384    ///
385    /// `target_fraction` is clamped to `[0.0, 1.0]`. A value of 0.7 means
386    /// "stop bringing things back when we'd hit 70% of the budget" —
387    /// matching the soft-limit threshold default.
388    pub fn reload_eligible(&self, target_fraction: f64) -> usize {
389        let target_fraction = target_fraction.clamp(0.0, 1.0);
390        // reason: target byte count from a bounded fraction is non-negative and bounded by budget
391        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
392        let target_bytes = (self.config.budget as f64 * target_fraction) as usize;
393
394        let candidates: Vec<Arc<dyn MemoryConsumer>> = {
395            let consumers = self.consumers.read();
396            let mut out: Vec<_> = consumers
397                .iter()
398                .filter(|c| c.current_tier() == super::tiered::StorageTier::OnDisk)
399                .map(Arc::clone)
400                .collect();
401            // Highest priority first: graph storage > active txn > index > query cache.
402            out.sort_by_key(|c| std::cmp::Reverse(c.eviction_priority()));
403            out
404        };
405
406        let mut reloaded = 0;
407        for consumer in candidates {
408            let current = self.allocated.load(Ordering::Relaxed);
409            if current >= target_bytes {
410                break;
411            }
412            match consumer.reload() {
413                Ok(()) => {
414                    #[cfg(feature = "tracing")]
415                    tracing::info!(
416                        target: "grafeo::buffer",
417                        consumer = consumer.name(),
418                        "tier transition: reload"
419                    );
420                    reloaded += 1;
421                }
422                Err(_e) => {
423                    #[cfg(feature = "tracing")]
424                    tracing::warn!(
425                        target: "grafeo::buffer",
426                        consumer = consumer.name(),
427                        error = %_e,
428                        "tier reload failed"
429                    );
430                    continue;
431                }
432            }
433        }
434        #[cfg(feature = "tracing")]
435        tracing::debug!(
436            target: "grafeo::buffer",
437            reloaded_count = reloaded,
438            target_fraction = target_fraction,
439            "reload_eligible cycle complete"
440        );
441        reloaded
442    }
443
444    /// Returns the current tier reported by each registered consumer that
445    /// wraps a section.
446    ///
447    /// Tier is sourced from [`MemoryConsumer::current_tier`]. Consumers whose
448    /// names don't follow the `"section:<TypeName>"` convention are skipped
449    /// (e.g. CDC, overlay).
450    #[must_use]
451    pub fn snapshot_consumer_tiers(&self) -> Vec<(String, super::tiered::StorageTier)> {
452        let consumers = self.consumers.read();
453        consumers
454            .iter()
455            .filter_map(|c| {
456                let name = c.name();
457                if !name.starts_with("section:") {
458                    return None;
459                }
460                Some((name.to_string(), c.current_tier()))
461            })
462            .collect()
463    }
464
465    /// Returns the configuration.
466    #[must_use]
467    pub fn config(&self) -> &BufferManagerConfig {
468        &self.config
469    }
470
471    /// Returns the memory budget.
472    #[must_use]
473    pub fn budget(&self) -> usize {
474        self.config.budget
475    }
476
477    /// Returns currently allocated bytes.
478    #[must_use]
479    pub fn allocated(&self) -> usize {
480        self.allocated.load(Ordering::Relaxed)
481    }
482
483    /// Returns available bytes.
484    #[must_use]
485    pub fn available(&self) -> usize {
486        self.config
487            .budget
488            .saturating_sub(self.allocated.load(Ordering::Relaxed))
489    }
490
491    /// Shuts down the buffer manager.
492    pub fn shutdown(&self) {
493        self.shutdown.store(true, Ordering::Relaxed);
494    }
495
496    // === Internal methods ===
497
498    fn compute_pressure_level(&self, current: usize) -> PressureLevel {
499        if current >= self.hard_limit {
500            PressureLevel::Critical
501        } else if current >= self.evict_limit {
502            PressureLevel::High
503        } else if current >= self.soft_limit {
504            PressureLevel::Moderate
505        } else {
506            PressureLevel::Normal
507        }
508    }
509
510    fn check_pressure(&self) {
511        let level = self.pressure_level();
512        if level.requires_eviction() {
513            // In a more complete implementation, this would signal
514            // a background thread. For now, do synchronous eviction.
515            let aggressive = level >= PressureLevel::High;
516            self.run_eviction_cycle(aggressive);
517        }
518    }
519
520    fn run_eviction_cycle(&self, aggressive: bool) -> usize {
521        let target = if aggressive {
522            self.soft_limit
523        } else {
524            self.evict_limit
525        };
526
527        let current = self.allocated.load(Ordering::Relaxed);
528        if current <= target {
529            return 0;
530        }
531
532        let to_free = current - target;
533        self.run_eviction_internal(to_free)
534    }
535
536    fn run_eviction_internal(&self, to_free: usize) -> usize {
537        let consumers = self.consumers.read();
538        let force_ram = self.force_ram_consumers.read();
539
540        // Sort consumers by priority (lowest first = evict first)
541        let mut sorted: Vec<_> = consumers.iter().collect();
542        sorted.sort_by_key(|c| c.eviction_priority());
543
544        let mut total_freed = 0;
545        for consumer in &sorted {
546            if total_freed >= to_free {
547                break;
548            }
549
550            // Phase 8g: respect ForceRam pin even on the evict path.
551            // evict() is typically a no-op for these consumers anyway,
552            // but skipping is the principled choice.
553            if force_ram.contains(consumer.name()) {
554                continue;
555            }
556
557            let remaining = to_free - total_freed;
558            let consumer_usage = consumer.memory_usage();
559
560            // Ask consumer to evict up to half its usage or remaining needed
561            let target_evict = remaining.min(consumer_usage / 2);
562            if target_evict > 0 {
563                let freed = consumer.evict(target_evict);
564                total_freed += freed;
565            }
566        }
567
568        // If eviction was not enough, try spilling to disk for consumers
569        // that support it (e.g., vector indexes with mmap storage).
570        // Phase 8g: ForceRam consumers are skipped here too — that's the
571        // hard contract ("never move me to disk").
572        if total_freed < to_free {
573            for consumer in &sorted {
574                if total_freed >= to_free {
575                    break;
576                }
577                if force_ram.contains(consumer.name()) {
578                    continue;
579                }
580                if !consumer.can_spill() {
581                    continue;
582                }
583                let remaining = to_free - total_freed;
584                match consumer.spill(remaining) {
585                    Ok(freed) => total_freed += freed,
586                    Err(_) => continue,
587                }
588            }
589        }
590
591        total_freed
592    }
593}
594
595impl GrantReleaser for BufferManager {
596    fn release(&self, size: usize, region: MemoryRegion) {
597        self.allocated.fetch_sub(size, Ordering::Relaxed);
598        self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
599    }
600
601    fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
602        let current = self.allocated.load(Ordering::Relaxed);
603
604        if current + size > self.hard_limit {
605            // Try eviction
606            self.run_eviction_cycle(true);
607
608            let current = self.allocated.load(Ordering::Relaxed);
609            if current + size > self.hard_limit {
610                return false;
611            }
612        }
613
614        self.allocated.fetch_add(size, Ordering::Relaxed);
615        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
616        true
617    }
618}
619
620impl Drop for BufferManager {
621    fn drop(&mut self) {
622        self.shutdown.store(true, Ordering::Relaxed);
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629    use crate::memory::buffer::consumer::priorities;
630    use std::sync::atomic::AtomicUsize;
631
632    struct TestConsumer {
633        name: String,
634        usage: AtomicUsize,
635        priority: u8,
636        region: MemoryRegion,
637        evicted: AtomicUsize,
638    }
639
640    impl TestConsumer {
641        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
642            Arc::new(Self {
643                name: name.to_string(),
644                usage: AtomicUsize::new(usage),
645                priority,
646                region,
647                evicted: AtomicUsize::new(0),
648            })
649        }
650    }
651
652    impl MemoryConsumer for TestConsumer {
653        fn name(&self) -> &str {
654            &self.name
655        }
656
657        fn memory_usage(&self) -> usize {
658            self.usage.load(Ordering::Relaxed)
659        }
660
661        fn eviction_priority(&self) -> u8 {
662            self.priority
663        }
664
665        fn region(&self) -> MemoryRegion {
666            self.region
667        }
668
669        fn evict(&self, target_bytes: usize) -> usize {
670            let current = self.usage.load(Ordering::Relaxed);
671            let to_evict = target_bytes.min(current);
672            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
673            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
674            to_evict
675        }
676
677        fn current_tier(&self) -> super::super::tiered::StorageTier {
678            if self.memory_usage() == 0 {
679                super::super::tiered::StorageTier::Uninitialized
680            } else {
681                super::super::tiered::StorageTier::InMemory
682            }
683        }
684    }
685
686    #[test]
687    fn test_basic_allocation() {
688        let config = BufferManagerConfig {
689            budget: 1024 * 1024, // 1MB
690            ..Default::default()
691        };
692        let manager = BufferManager::new(config);
693
694        let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
695        assert!(grant.is_some());
696        assert_eq!(manager.stats().total_allocated, 1024);
697    }
698
699    #[test]
700    fn test_grant_raii_release() {
701        let config = BufferManagerConfig {
702            budget: 1024,
703            ..Default::default()
704        };
705        let manager = BufferManager::new(config);
706
707        {
708            let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
709            assert_eq!(manager.stats().total_allocated, 512);
710        }
711
712        // Grant dropped, memory should be released
713        assert_eq!(manager.stats().total_allocated, 0);
714    }
715
716    #[test]
717    fn test_pressure_levels() {
718        let config = BufferManagerConfig {
719            budget: 1000,
720            soft_limit_fraction: 0.70,
721            evict_limit_fraction: 0.85,
722            hard_limit_fraction: 0.95,
723            background_eviction: false,
724            spill_path: None,
725        };
726        let manager = BufferManager::new(config);
727
728        assert_eq!(manager.pressure_level(), PressureLevel::Normal);
729
730        // Allocate to 70% (soft limit)
731        let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
732        assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
733
734        // Allocate to 85% (evict limit)
735        let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
736        assert_eq!(manager.pressure_level(), PressureLevel::High);
737
738        // Note: Can't easily test Critical without blocking
739    }
740
741    #[test]
742    fn test_region_tracking() {
743        let config = BufferManagerConfig {
744            budget: 10000,
745            ..Default::default()
746        };
747        let manager = BufferManager::new(config);
748
749        let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
750        let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
751        let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
752
753        let stats = manager.stats();
754        assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
755        assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
756        assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
757        assert_eq!(stats.total_allocated, 600);
758    }
759
760    #[test]
761    fn test_consumer_registration() {
762        let manager = BufferManager::with_budget(10000);
763
764        let consumer = TestConsumer::new(
765            "test",
766            1000,
767            priorities::INDEX_BUFFERS,
768            MemoryRegion::IndexBuffers,
769        );
770
771        manager.register_consumer(consumer);
772        assert_eq!(manager.stats().consumer_count, 1);
773
774        manager.unregister_consumer("test");
775        assert_eq!(manager.stats().consumer_count, 0);
776    }
777
778    #[test]
779    fn test_eviction_ordering() {
780        let manager = BufferManager::with_budget(10000);
781
782        // Low priority consumer (evict first)
783        let low_priority = TestConsumer::new(
784            "low",
785            500,
786            priorities::SPILL_STAGING,
787            MemoryRegion::SpillStaging,
788        );
789
790        // High priority consumer (evict last)
791        let high_priority = TestConsumer::new(
792            "high",
793            500,
794            priorities::ACTIVE_TRANSACTION,
795            MemoryRegion::ExecutionBuffers,
796        );
797
798        manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
799        manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
800
801        // Manually set allocated to simulate memory usage
802        // (consumers track their own usage separately from manager's allocation tracking)
803        manager.allocated.store(1000, Ordering::Relaxed);
804
805        // Request eviction to target 700 (need to free 300 bytes)
806        let freed = manager.evict_to_target(700);
807
808        // Low priority should be evicted first (up to half = 250)
809        assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
810        assert!(freed > 0);
811    }
812
813    #[test]
814    fn test_hard_limit_blocking() {
815        let config = BufferManagerConfig {
816            budget: 1000,
817            soft_limit_fraction: 0.70,
818            evict_limit_fraction: 0.85,
819            hard_limit_fraction: 0.95,
820            background_eviction: false,
821            spill_path: None,
822        };
823        let manager = BufferManager::new(config);
824
825        // Allocate up to hard limit (950 bytes)
826        let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
827
828        // This should fail (would exceed hard limit)
829        let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
830        assert!(g2.is_none());
831    }
832
833    #[test]
834    fn test_available_memory() {
835        let manager = BufferManager::with_budget(1000);
836
837        assert_eq!(manager.available(), 1000);
838
839        let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
840        assert_eq!(manager.available(), 700);
841    }
842
843    // --- Spill-aware test consumer ---
844
845    struct SpillableConsumer {
846        name: String,
847        usage: AtomicUsize,
848        priority: u8,
849        region: MemoryRegion,
850        evicted: AtomicUsize,
851        spilled: AtomicUsize,
852        spillable: bool,
853        evict_returns_zero: bool,
854    }
855
856    impl SpillableConsumer {
857        fn new(
858            name: &str,
859            usage: usize,
860            priority: u8,
861            region: MemoryRegion,
862            spillable: bool,
863        ) -> Arc<Self> {
864            Arc::new(Self {
865                name: name.to_string(),
866                usage: AtomicUsize::new(usage),
867                priority,
868                region,
869                evicted: AtomicUsize::new(0),
870                spilled: AtomicUsize::new(0),
871                spillable,
872                evict_returns_zero: false,
873            })
874        }
875
876        fn new_evict_fails(
877            name: &str,
878            usage: usize,
879            priority: u8,
880            region: MemoryRegion,
881            spillable: bool,
882        ) -> Arc<Self> {
883            Arc::new(Self {
884                name: name.to_string(),
885                usage: AtomicUsize::new(usage),
886                priority,
887                region,
888                evicted: AtomicUsize::new(0),
889                spilled: AtomicUsize::new(0),
890                spillable,
891                evict_returns_zero: true,
892            })
893        }
894    }
895
896    impl MemoryConsumer for SpillableConsumer {
897        fn name(&self) -> &str {
898            &self.name
899        }
900
901        fn memory_usage(&self) -> usize {
902            self.usage.load(Ordering::Relaxed)
903        }
904
905        fn eviction_priority(&self) -> u8 {
906            self.priority
907        }
908
909        fn region(&self) -> MemoryRegion {
910            self.region
911        }
912
913        fn evict(&self, target_bytes: usize) -> usize {
914            if self.evict_returns_zero {
915                return 0;
916            }
917            let current = self.usage.load(Ordering::Relaxed);
918            let to_evict = target_bytes.min(current);
919            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
920            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
921            to_evict
922        }
923
924        fn can_spill(&self) -> bool {
925            self.spillable
926        }
927
928        fn spill(
929            &self,
930            target_bytes: usize,
931        ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
932            if !self.spillable {
933                return Err(crate::memory::buffer::consumer::SpillError::NotSupported);
934            }
935            let current = self.usage.load(Ordering::Relaxed);
936            let to_spill = target_bytes.min(current);
937            self.usage.fetch_sub(to_spill, Ordering::Relaxed);
938            self.spilled.fetch_add(to_spill, Ordering::Relaxed);
939            Ok(to_spill)
940        }
941
942        fn current_tier(&self) -> super::super::tiered::StorageTier {
943            if self.spilled.load(Ordering::Relaxed) > 0 {
944                super::super::tiered::StorageTier::OnDisk
945            } else if self.memory_usage() == 0 {
946                super::super::tiered::StorageTier::Uninitialized
947            } else {
948                super::super::tiered::StorageTier::InMemory
949            }
950        }
951    }
952
953    #[test]
954    fn test_spill_all_calls_spillable_consumers() {
955        let manager = BufferManager::with_budget(10000);
956        let spillable = SpillableConsumer::new(
957            "spillable",
958            500,
959            priorities::QUERY_CACHE,
960            MemoryRegion::ExecutionBuffers,
961            true,
962        );
963        let non_spillable = SpillableConsumer::new(
964            "non_spillable",
965            500,
966            priorities::QUERY_CACHE,
967            MemoryRegion::ExecutionBuffers,
968            false,
969        );
970        manager.register_consumer(Arc::clone(&spillable) as Arc<dyn MemoryConsumer>);
971        manager.register_consumer(Arc::clone(&non_spillable) as Arc<dyn MemoryConsumer>);
972
973        let freed = manager.spill_all();
974        assert_eq!(freed, 500);
975        assert_eq!(spillable.spilled.load(Ordering::Relaxed), 500);
976        assert_eq!(non_spillable.spilled.load(Ordering::Relaxed), 0);
977    }
978
979    #[test]
980    fn test_spill_all_skips_non_spillable() {
981        let manager = BufferManager::with_budget(10000);
982        let consumer = SpillableConsumer::new(
983            "no_spill",
984            1000,
985            priorities::INDEX_BUFFERS,
986            MemoryRegion::IndexBuffers,
987            false,
988        );
989        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
990
991        assert_eq!(manager.spill_all(), 0);
992        assert_eq!(consumer.memory_usage(), 1000);
993    }
994
995    #[test]
996    fn test_eviction_falls_back_to_spill() {
997        let manager = BufferManager::with_budget(10000);
998        let consumer = SpillableConsumer::new_evict_fails(
999            "spill_fallback",
1000            1000,
1001            priorities::QUERY_CACHE,
1002            MemoryRegion::ExecutionBuffers,
1003            true,
1004        );
1005        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
1006        manager.allocated.store(2000, Ordering::Relaxed);
1007
1008        let freed = manager.evict_to_target(1500);
1009        assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
1010        assert!(consumer.spilled.load(Ordering::Relaxed) > 0);
1011        assert!(freed > 0);
1012    }
1013
1014    #[test]
1015    fn test_eviction_no_spill_when_sufficient() {
1016        let manager = BufferManager::with_budget(10000);
1017        let consumer = SpillableConsumer::new(
1018            "eviction_enough",
1019            1000,
1020            priorities::QUERY_CACHE,
1021            MemoryRegion::ExecutionBuffers,
1022            true,
1023        );
1024        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
1025        manager.allocated.store(1200, Ordering::Relaxed);
1026
1027        let freed = manager.evict_to_target(1000);
1028        assert_eq!(freed, 200);
1029        assert_eq!(consumer.spilled.load(Ordering::Relaxed), 0);
1030    }
1031
1032    #[test]
1033    fn test_eviction_spill_skips_non_spillable() {
1034        let manager = BufferManager::with_budget(10000);
1035        let consumer = SpillableConsumer::new_evict_fails(
1036            "no_spill",
1037            1000,
1038            priorities::QUERY_CACHE,
1039            MemoryRegion::ExecutionBuffers,
1040            false,
1041        );
1042        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
1043        manager.allocated.store(2000, Ordering::Relaxed);
1044
1045        let freed = manager.evict_to_target(1500);
1046        assert_eq!(freed, 0);
1047        assert_eq!(consumer.memory_usage(), 1000);
1048    }
1049
1050    #[test]
1051    fn alix_with_defaults_creates_manager() {
1052        let manager = BufferManager::with_defaults();
1053        // with_defaults uses system memory detection, budget should be > 0
1054        assert!(manager.budget() > 0);
1055        assert_eq!(manager.allocated(), 0);
1056        assert_eq!(manager.available(), manager.budget());
1057    }
1058
1059    #[test]
1060    fn gus_config_accessor_returns_budget() {
1061        let manager = BufferManager::with_budget(4096);
1062        let config = manager.config();
1063        assert_eq!(config.budget, 4096);
1064        assert!(!config.background_eviction);
1065        assert!(config.spill_path.is_none());
1066    }
1067
1068    #[test]
1069    fn vincent_shutdown_sets_flag() {
1070        let manager = BufferManager::with_budget(1000);
1071        manager.shutdown();
1072        // shutdown stores true; drop also stores true, so this just verifies
1073        // the method runs without error and the manager remains usable
1074        assert_eq!(manager.allocated(), 0);
1075    }
1076
1077    #[test]
1078    fn jules_critical_pressure_level() {
1079        let config = BufferManagerConfig {
1080            budget: 1000,
1081            soft_limit_fraction: 0.70,
1082            evict_limit_fraction: 0.85,
1083            hard_limit_fraction: 0.95,
1084            background_eviction: false,
1085            spill_path: None,
1086        };
1087        let manager = BufferManager::new(config);
1088
1089        // Manually set allocated above hard limit to test Critical level
1090        manager.allocated.store(960, Ordering::Relaxed);
1091        assert_eq!(manager.pressure_level(), PressureLevel::Critical);
1092    }
1093
1094    #[test]
1095    fn mia_evict_to_target_already_below() {
1096        let manager = BufferManager::with_budget(10000);
1097        // allocated is 0, target is 5000: already below target
1098        let freed = manager.evict_to_target(5000);
1099        assert_eq!(freed, 0);
1100    }
1101
1102    #[test]
1103    fn butch_try_allocate_raw_success() {
1104        let config = BufferManagerConfig {
1105            budget: 1000,
1106            soft_limit_fraction: 0.70,
1107            evict_limit_fraction: 0.85,
1108            hard_limit_fraction: 0.95,
1109            background_eviction: false,
1110            spill_path: None,
1111        };
1112        let manager = BufferManager::new(config);
1113
1114        // GrantReleaser::try_allocate_raw succeeds when under hard limit
1115        let success = manager.try_allocate_raw(100, MemoryRegion::GraphStorage);
1116        assert!(success);
1117        assert_eq!(manager.allocated(), 100);
1118        assert_eq!(
1119            manager.stats().region_usage(MemoryRegion::GraphStorage),
1120            100
1121        );
1122    }
1123
1124    #[test]
1125    fn django_try_allocate_raw_fails_at_hard_limit() {
1126        let config = BufferManagerConfig {
1127            budget: 1000,
1128            soft_limit_fraction: 0.70,
1129            evict_limit_fraction: 0.85,
1130            hard_limit_fraction: 0.95,
1131            background_eviction: false,
1132            spill_path: None,
1133        };
1134        let manager = BufferManager::new(config);
1135
1136        // Fill up to hard limit
1137        manager.allocated.store(940, Ordering::Relaxed);
1138
1139        // This exceeds hard limit (940 + 100 = 1040 > 950), no consumers to evict
1140        let success = manager.try_allocate_raw(100, MemoryRegion::ExecutionBuffers);
1141        assert!(!success);
1142    }
1143
1144    #[test]
1145    fn shosanna_drop_sets_shutdown() {
1146        // Create and immediately drop to exercise the Drop impl
1147        let manager = BufferManager::with_budget(512);
1148        drop(manager);
1149        // If we get here without panic, the Drop impl ran successfully.
1150    }
1151
1152    #[test]
1153    fn hans_eviction_with_zero_usage_consumer() {
1154        let manager = BufferManager::with_budget(10000);
1155        // Consumer with zero usage: target_evict will be 0, so evict is skipped
1156        let consumer = TestConsumer::new(
1157            "empty",
1158            0,
1159            priorities::SPILL_STAGING,
1160            MemoryRegion::SpillStaging,
1161        );
1162        manager.register_consumer(Arc::clone(&consumer) as Arc<dyn MemoryConsumer>);
1163        manager.allocated.store(500, Ordering::Relaxed);
1164
1165        let freed = manager.evict_to_target(200);
1166        // Consumer has 0 usage, so target_evict = min(300, 0/2) = 0, evict skipped
1167        assert_eq!(consumer.evicted.load(Ordering::Relaxed), 0);
1168        assert_eq!(freed, 0);
1169    }
1170
1171    #[test]
1172    fn beatrix_grant_releaser_release_decrements() {
1173        let config = BufferManagerConfig {
1174            budget: 1000,
1175            soft_limit_fraction: 0.70,
1176            evict_limit_fraction: 0.85,
1177            hard_limit_fraction: 0.95,
1178            background_eviction: false,
1179            spill_path: None,
1180        };
1181        let manager = BufferManager::new(config);
1182
1183        // Allocate via try_allocate_raw, then release via GrantReleaser trait
1184        assert!(manager.try_allocate_raw(200, MemoryRegion::IndexBuffers));
1185        assert_eq!(manager.allocated(), 200);
1186
1187        manager.release(200, MemoryRegion::IndexBuffers);
1188        assert_eq!(manager.allocated(), 0);
1189        assert_eq!(manager.stats().region_usage(MemoryRegion::IndexBuffers), 0);
1190    }
1191
1192    /// Consumer whose spill() returns an error to exercise the Err(_) => continue path.
1193    struct FailingSpillConsumer {
1194        name: String,
1195        usage: AtomicUsize,
1196        priority: u8,
1197        region: MemoryRegion,
1198    }
1199
1200    impl FailingSpillConsumer {
1201        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
1202            Arc::new(Self {
1203                name: name.to_string(),
1204                usage: AtomicUsize::new(usage),
1205                priority,
1206                region,
1207            })
1208        }
1209    }
1210
1211    impl MemoryConsumer for FailingSpillConsumer {
1212        fn name(&self) -> &str {
1213            &self.name
1214        }
1215
1216        fn memory_usage(&self) -> usize {
1217            self.usage.load(Ordering::Relaxed)
1218        }
1219
1220        fn eviction_priority(&self) -> u8 {
1221            self.priority
1222        }
1223
1224        fn region(&self) -> MemoryRegion {
1225            self.region
1226        }
1227
1228        fn evict(&self, _target_bytes: usize) -> usize {
1229            0 // eviction always fails
1230        }
1231
1232        fn can_spill(&self) -> bool {
1233            true
1234        }
1235
1236        fn spill(
1237            &self,
1238            _target_bytes: usize,
1239        ) -> Result<usize, crate::memory::buffer::consumer::SpillError> {
1240            Err(crate::memory::buffer::consumer::SpillError::IoError(
1241                "disk full".to_string(),
1242            ))
1243        }
1244
1245        fn current_tier(&self) -> super::super::tiered::StorageTier {
1246            if self.memory_usage() == 0 {
1247                super::super::tiered::StorageTier::Uninitialized
1248            } else {
1249                super::super::tiered::StorageTier::InMemory
1250            }
1251        }
1252    }
1253
1254    #[test]
1255    fn vincent_spill_error_continues_to_next_consumer() {
1256        let manager = BufferManager::with_budget(10000);
1257
1258        // First consumer: spill fails
1259        let failing = FailingSpillConsumer::new(
1260            "failing_spill",
1261            500,
1262            priorities::SPILL_STAGING,
1263            MemoryRegion::SpillStaging,
1264        );
1265
1266        // Second consumer: spill succeeds
1267        let working = SpillableConsumer::new_evict_fails(
1268            "working_spill",
1269            500,
1270            priorities::QUERY_CACHE,
1271            MemoryRegion::ExecutionBuffers,
1272            true,
1273        );
1274
1275        manager.register_consumer(Arc::clone(&failing) as Arc<dyn MemoryConsumer>);
1276        manager.register_consumer(Arc::clone(&working) as Arc<dyn MemoryConsumer>);
1277        manager.allocated.store(2000, Ordering::Relaxed);
1278
1279        let freed = manager.evict_to_target(1500);
1280        // failing consumer's spill errors out, working consumer's spill succeeds
1281        assert!(working.spilled.load(Ordering::Relaxed) > 0);
1282        assert!(freed > 0);
1283    }
1284
1285    #[test]
1286    fn django_detect_system_memory_returns_positive() {
1287        let mem = BufferManagerConfig::detect_system_memory();
1288        assert!(mem > 0);
1289    }
1290
1291    #[test]
1292    fn shosanna_spill_path_config() {
1293        let config = BufferManagerConfig {
1294            budget: 1024,
1295            spill_path: Some(PathBuf::from("/tmp/grafeo-spill")),
1296            ..Default::default()
1297        };
1298        assert_eq!(
1299            config.spill_path.as_ref().unwrap().to_str().unwrap(),
1300            "/tmp/grafeo-spill"
1301        );
1302        let manager = BufferManager::new(config);
1303        assert!(manager.config().spill_path.is_some());
1304    }
1305}