Skip to main content

grafeo_common/memory/buffer/
manager.rs

1//! Unified buffer manager implementation.
2
3use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12/// Default memory budget as a fraction of system memory.
13const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15/// Configuration for the buffer manager.
16#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18    /// Total memory budget in bytes.
19    pub budget: usize,
20    /// Soft limit threshold (default: 70%).
21    pub soft_limit_fraction: f64,
22    /// Eviction threshold (default: 85%).
23    pub evict_limit_fraction: f64,
24    /// Hard limit threshold (default: 95%).
25    pub hard_limit_fraction: f64,
26    /// Enable background eviction thread.
27    pub background_eviction: bool,
28    /// Directory for spilling data to disk.
29    pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33    /// Detects system memory size.
34    ///
35    /// Returns a conservative estimate if detection fails.
36    #[must_use]
37    pub fn detect_system_memory() -> usize {
38        // Under Miri, file I/O is blocked by isolation — use fallback directly
39        #[cfg(miri)]
40        {
41            return Self::fallback_system_memory();
42        }
43
44        // Try to detect system memory
45        // On failure, return a conservative 1GB default
46        #[cfg(not(miri))]
47        {
48            #[cfg(target_os = "windows")]
49            {
50                // Windows: Use GetPhysicallyInstalledSystemMemory or GlobalMemoryStatusEx
51                // For now, use a fallback
52                Self::fallback_system_memory()
53            }
54
55            #[cfg(target_os = "linux")]
56            {
57                // Linux: Read from /proc/meminfo
58                if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
59                    for line in contents.lines() {
60                        if line.starts_with("MemTotal:") {
61                            if let Some(kb_str) = line.split_whitespace().nth(1) {
62                                if let Ok(kb) = kb_str.parse::<usize>() {
63                                    return kb * 1024;
64                                }
65                            }
66                        }
67                    }
68                }
69                Self::fallback_system_memory()
70            }
71
72            #[cfg(target_os = "macos")]
73            {
74                // macOS: Use sysctl
75                Self::fallback_system_memory()
76            }
77
78            #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
79            {
80                Self::fallback_system_memory()
81            }
82        }
83    }
84
85    fn fallback_system_memory() -> usize {
86        // Default to 1GB if detection fails
87        1024 * 1024 * 1024
88    }
89
90    /// Creates a config with the given budget.
91    #[must_use]
92    pub fn with_budget(budget: usize) -> Self {
93        Self {
94            budget,
95            ..Default::default()
96        }
97    }
98}
99
100impl Default for BufferManagerConfig {
101    fn default() -> Self {
102        let system_memory = Self::detect_system_memory();
103        Self {
104            budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
105            soft_limit_fraction: 0.70,
106            evict_limit_fraction: 0.85,
107            hard_limit_fraction: 0.95,
108            background_eviction: false, // Disabled by default for simplicity
109            spill_path: None,
110        }
111    }
112}
113
114/// The central unified buffer manager.
115///
116/// Manages memory allocation across all subsystems with pressure-aware
117/// eviction and optional spilling support.
118pub struct BufferManager {
119    /// Configuration.
120    config: BufferManagerConfig,
121    /// Total allocated bytes.
122    allocated: AtomicUsize,
123    /// Per-region allocated bytes.
124    region_allocated: [AtomicUsize; 4],
125    /// Registered memory consumers.
126    consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
127    /// Computed soft limit in bytes.
128    soft_limit: usize,
129    /// Computed eviction limit in bytes.
130    evict_limit: usize,
131    /// Computed hard limit in bytes.
132    hard_limit: usize,
133    /// Shutdown flag.
134    shutdown: AtomicBool,
135}
136
137impl BufferManager {
138    /// Creates a new buffer manager with the given configuration.
139    #[must_use]
140    pub fn new(config: BufferManagerConfig) -> Arc<Self> {
141        let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
142        let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
143        let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
144
145        Arc::new(Self {
146            config,
147            allocated: AtomicUsize::new(0),
148            region_allocated: [
149                AtomicUsize::new(0),
150                AtomicUsize::new(0),
151                AtomicUsize::new(0),
152                AtomicUsize::new(0),
153            ],
154            consumers: RwLock::new(Vec::new()),
155            soft_limit,
156            evict_limit,
157            hard_limit,
158            shutdown: AtomicBool::new(false),
159        })
160    }
161
162    /// Creates a buffer manager with default configuration.
163    #[must_use]
164    pub fn with_defaults() -> Arc<Self> {
165        Self::new(BufferManagerConfig::default())
166    }
167
168    /// Creates a buffer manager with a specific budget.
169    #[must_use]
170    pub fn with_budget(budget: usize) -> Arc<Self> {
171        Self::new(BufferManagerConfig::with_budget(budget))
172    }
173
174    /// Attempts to allocate memory for the given region.
175    ///
176    /// Returns `None` if allocation would exceed the hard limit after
177    /// eviction attempts.
178    pub fn try_allocate(
179        self: &Arc<Self>,
180        size: usize,
181        region: MemoryRegion,
182    ) -> Option<MemoryGrant> {
183        // Check if we can allocate
184        let current = self.allocated.load(Ordering::Relaxed);
185
186        if current + size > self.hard_limit {
187            // Try eviction first
188            self.run_eviction_cycle(true);
189
190            // Check again
191            let current = self.allocated.load(Ordering::Relaxed);
192            if current + size > self.hard_limit {
193                return None;
194            }
195        }
196
197        // Perform allocation
198        self.allocated.fetch_add(size, Ordering::Relaxed);
199        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
200
201        // Check pressure and potentially trigger background eviction
202        self.check_pressure();
203
204        Some(MemoryGrant::new(
205            Arc::clone(self) as Arc<dyn GrantReleaser>,
206            size,
207            region,
208        ))
209    }
210
211    /// Returns the current pressure level.
212    #[must_use]
213    pub fn pressure_level(&self) -> PressureLevel {
214        let current = self.allocated.load(Ordering::Relaxed);
215        self.compute_pressure_level(current)
216    }
217
218    /// Returns current buffer statistics.
219    #[must_use]
220    pub fn stats(&self) -> BufferStats {
221        let total_allocated = self.allocated.load(Ordering::Relaxed);
222        BufferStats {
223            budget: self.config.budget,
224            total_allocated,
225            region_allocated: [
226                self.region_allocated[0].load(Ordering::Relaxed),
227                self.region_allocated[1].load(Ordering::Relaxed),
228                self.region_allocated[2].load(Ordering::Relaxed),
229                self.region_allocated[3].load(Ordering::Relaxed),
230            ],
231            pressure_level: self.compute_pressure_level(total_allocated),
232            consumer_count: self.consumers.read().len(),
233        }
234    }
235
236    /// Registers a memory consumer for eviction callbacks.
237    pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
238        self.consumers.write().push(consumer);
239    }
240
241    /// Unregisters a memory consumer by name.
242    pub fn unregister_consumer(&self, name: &str) {
243        self.consumers.write().retain(|c| c.name() != name);
244    }
245
246    /// Forces eviction to reach the target usage.
247    ///
248    /// Returns the number of bytes actually freed.
249    pub fn evict_to_target(&self, target_bytes: usize) -> usize {
250        let current = self.allocated.load(Ordering::Relaxed);
251        if current <= target_bytes {
252            return 0;
253        }
254
255        let to_free = current - target_bytes;
256        self.run_eviction_internal(to_free)
257    }
258
259    /// Returns the configuration.
260    #[must_use]
261    pub fn config(&self) -> &BufferManagerConfig {
262        &self.config
263    }
264
265    /// Returns the memory budget.
266    #[must_use]
267    pub fn budget(&self) -> usize {
268        self.config.budget
269    }
270
271    /// Returns currently allocated bytes.
272    #[must_use]
273    pub fn allocated(&self) -> usize {
274        self.allocated.load(Ordering::Relaxed)
275    }
276
277    /// Returns available bytes.
278    #[must_use]
279    pub fn available(&self) -> usize {
280        self.config
281            .budget
282            .saturating_sub(self.allocated.load(Ordering::Relaxed))
283    }
284
285    /// Shuts down the buffer manager.
286    pub fn shutdown(&self) {
287        self.shutdown.store(true, Ordering::Relaxed);
288    }
289
290    // === Internal methods ===
291
292    fn compute_pressure_level(&self, current: usize) -> PressureLevel {
293        if current >= self.hard_limit {
294            PressureLevel::Critical
295        } else if current >= self.evict_limit {
296            PressureLevel::High
297        } else if current >= self.soft_limit {
298            PressureLevel::Moderate
299        } else {
300            PressureLevel::Normal
301        }
302    }
303
304    fn check_pressure(&self) {
305        let level = self.pressure_level();
306        if level.requires_eviction() {
307            // In a more complete implementation, this would signal
308            // a background thread. For now, do synchronous eviction.
309            let aggressive = level >= PressureLevel::High;
310            self.run_eviction_cycle(aggressive);
311        }
312    }
313
314    fn run_eviction_cycle(&self, aggressive: bool) -> usize {
315        let target = if aggressive {
316            self.soft_limit
317        } else {
318            self.evict_limit
319        };
320
321        let current = self.allocated.load(Ordering::Relaxed);
322        if current <= target {
323            return 0;
324        }
325
326        let to_free = current - target;
327        self.run_eviction_internal(to_free)
328    }
329
330    fn run_eviction_internal(&self, to_free: usize) -> usize {
331        let consumers = self.consumers.read();
332
333        // Sort consumers by priority (lowest first = evict first)
334        let mut sorted: Vec<_> = consumers.iter().collect();
335        sorted.sort_by_key(|c| c.eviction_priority());
336
337        let mut total_freed = 0;
338        for consumer in sorted {
339            if total_freed >= to_free {
340                break;
341            }
342
343            let remaining = to_free - total_freed;
344            let consumer_usage = consumer.memory_usage();
345
346            // Ask consumer to evict up to half its usage or remaining needed
347            let target_evict = remaining.min(consumer_usage / 2);
348            if target_evict > 0 {
349                let freed = consumer.evict(target_evict);
350                total_freed += freed;
351                // Note: consumers should call release through their grants,
352                // so we don't double-decrement here.
353            }
354        }
355
356        total_freed
357    }
358}
359
360impl GrantReleaser for BufferManager {
361    fn release(&self, size: usize, region: MemoryRegion) {
362        self.allocated.fetch_sub(size, Ordering::Relaxed);
363        self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
364    }
365
366    fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
367        let current = self.allocated.load(Ordering::Relaxed);
368
369        if current + size > self.hard_limit {
370            // Try eviction
371            self.run_eviction_cycle(true);
372
373            let current = self.allocated.load(Ordering::Relaxed);
374            if current + size > self.hard_limit {
375                return false;
376            }
377        }
378
379        self.allocated.fetch_add(size, Ordering::Relaxed);
380        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
381        true
382    }
383}
384
385impl Drop for BufferManager {
386    fn drop(&mut self) {
387        self.shutdown.store(true, Ordering::Relaxed);
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394    use crate::memory::buffer::consumer::priorities;
395    use std::sync::atomic::AtomicUsize;
396
397    struct TestConsumer {
398        name: String,
399        usage: AtomicUsize,
400        priority: u8,
401        region: MemoryRegion,
402        evicted: AtomicUsize,
403    }
404
405    impl TestConsumer {
406        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
407            Arc::new(Self {
408                name: name.to_string(),
409                usage: AtomicUsize::new(usage),
410                priority,
411                region,
412                evicted: AtomicUsize::new(0),
413            })
414        }
415    }
416
417    impl MemoryConsumer for TestConsumer {
418        fn name(&self) -> &str {
419            &self.name
420        }
421
422        fn memory_usage(&self) -> usize {
423            self.usage.load(Ordering::Relaxed)
424        }
425
426        fn eviction_priority(&self) -> u8 {
427            self.priority
428        }
429
430        fn region(&self) -> MemoryRegion {
431            self.region
432        }
433
434        fn evict(&self, target_bytes: usize) -> usize {
435            let current = self.usage.load(Ordering::Relaxed);
436            let to_evict = target_bytes.min(current);
437            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
438            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
439            to_evict
440        }
441    }
442
443    #[test]
444    fn test_basic_allocation() {
445        let config = BufferManagerConfig {
446            budget: 1024 * 1024, // 1MB
447            ..Default::default()
448        };
449        let manager = BufferManager::new(config);
450
451        let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
452        assert!(grant.is_some());
453        assert_eq!(manager.stats().total_allocated, 1024);
454    }
455
456    #[test]
457    fn test_grant_raii_release() {
458        let config = BufferManagerConfig {
459            budget: 1024,
460            ..Default::default()
461        };
462        let manager = BufferManager::new(config);
463
464        {
465            let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
466            assert_eq!(manager.stats().total_allocated, 512);
467        }
468
469        // Grant dropped, memory should be released
470        assert_eq!(manager.stats().total_allocated, 0);
471    }
472
473    #[test]
474    fn test_pressure_levels() {
475        let config = BufferManagerConfig {
476            budget: 1000,
477            soft_limit_fraction: 0.70,
478            evict_limit_fraction: 0.85,
479            hard_limit_fraction: 0.95,
480            background_eviction: false,
481            spill_path: None,
482        };
483        let manager = BufferManager::new(config);
484
485        assert_eq!(manager.pressure_level(), PressureLevel::Normal);
486
487        // Allocate to 70% (soft limit)
488        let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
489        assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
490
491        // Allocate to 85% (evict limit)
492        let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
493        assert_eq!(manager.pressure_level(), PressureLevel::High);
494
495        // Note: Can't easily test Critical without blocking
496    }
497
498    #[test]
499    fn test_region_tracking() {
500        let config = BufferManagerConfig {
501            budget: 10000,
502            ..Default::default()
503        };
504        let manager = BufferManager::new(config);
505
506        let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
507        let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
508        let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
509
510        let stats = manager.stats();
511        assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
512        assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
513        assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
514        assert_eq!(stats.total_allocated, 600);
515    }
516
517    #[test]
518    fn test_consumer_registration() {
519        let manager = BufferManager::with_budget(10000);
520
521        let consumer = TestConsumer::new(
522            "test",
523            1000,
524            priorities::INDEX_BUFFERS,
525            MemoryRegion::IndexBuffers,
526        );
527
528        manager.register_consumer(consumer);
529        assert_eq!(manager.stats().consumer_count, 1);
530
531        manager.unregister_consumer("test");
532        assert_eq!(manager.stats().consumer_count, 0);
533    }
534
535    #[test]
536    fn test_eviction_ordering() {
537        let manager = BufferManager::with_budget(10000);
538
539        // Low priority consumer (evict first)
540        let low_priority = TestConsumer::new(
541            "low",
542            500,
543            priorities::SPILL_STAGING,
544            MemoryRegion::SpillStaging,
545        );
546
547        // High priority consumer (evict last)
548        let high_priority = TestConsumer::new(
549            "high",
550            500,
551            priorities::ACTIVE_TRANSACTION,
552            MemoryRegion::ExecutionBuffers,
553        );
554
555        manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
556        manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
557
558        // Manually set allocated to simulate memory usage
559        // (consumers track their own usage separately from manager's allocation tracking)
560        manager.allocated.store(1000, Ordering::Relaxed);
561
562        // Request eviction to target 700 (need to free 300 bytes)
563        let freed = manager.evict_to_target(700);
564
565        // Low priority should be evicted first (up to half = 250)
566        assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
567        assert!(freed > 0);
568    }
569
570    #[test]
571    fn test_hard_limit_blocking() {
572        let config = BufferManagerConfig {
573            budget: 1000,
574            soft_limit_fraction: 0.70,
575            evict_limit_fraction: 0.85,
576            hard_limit_fraction: 0.95,
577            background_eviction: false,
578            spill_path: None,
579        };
580        let manager = BufferManager::new(config);
581
582        // Allocate up to hard limit (950 bytes)
583        let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
584
585        // This should fail (would exceed hard limit)
586        let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
587        assert!(g2.is_none());
588    }
589
590    #[test]
591    fn test_available_memory() {
592        let manager = BufferManager::with_budget(1000);
593
594        assert_eq!(manager.available(), 1000);
595
596        let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
597        assert_eq!(manager.available(), 700);
598    }
599}