Skip to main content

grafeo_common/memory/buffer/
manager.rs

1//! Unified buffer manager implementation.
2
3use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12/// Default memory budget as a fraction of system memory.
13const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15/// Configuration for the buffer manager.
16#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18    /// Total memory budget in bytes.
19    pub budget: usize,
20    /// Soft limit threshold (default: 70%).
21    pub soft_limit_fraction: f64,
22    /// Eviction threshold (default: 85%).
23    pub evict_limit_fraction: f64,
24    /// Hard limit threshold (default: 95%).
25    pub hard_limit_fraction: f64,
26    /// Enable background eviction thread.
27    pub background_eviction: bool,
28    /// Directory for spilling data to disk.
29    pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33    /// Detects system memory size.
34    ///
35    /// Returns a conservative estimate if detection fails.
36    #[must_use]
37    pub fn detect_system_memory() -> usize {
38        // Under Miri, file I/O is blocked by isolation — use fallback directly
39        #[cfg(miri)]
40        {
41            return Self::fallback_system_memory();
42        }
43
44        // Try to detect system memory
45        // On failure, return a conservative 1GB default
46        #[cfg(not(miri))]
47        {
48            #[cfg(target_os = "windows")]
49            {
50                // Windows: Use GetPhysicallyInstalledSystemMemory or GlobalMemoryStatusEx
51                // For now, use a fallback
52                Self::fallback_system_memory()
53            }
54
55            #[cfg(target_os = "linux")]
56            {
57                // Linux: Read from /proc/meminfo
58                if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
59                    for line in contents.lines() {
60                        if line.starts_with("MemTotal:")
61                            && let Some(kb_str) = line.split_whitespace().nth(1)
62                            && let Ok(kb) = kb_str.parse::<usize>()
63                        {
64                            return kb * 1024;
65                        }
66                    }
67                }
68                Self::fallback_system_memory()
69            }
70
71            #[cfg(target_os = "macos")]
72            {
73                // macOS: Use sysctl
74                Self::fallback_system_memory()
75            }
76
77            #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
78            {
79                Self::fallback_system_memory()
80            }
81        }
82    }
83
84    fn fallback_system_memory() -> usize {
85        // Default to 1GB if detection fails
86        1024 * 1024 * 1024
87    }
88
89    /// Creates a config with the given budget.
90    #[must_use]
91    pub fn with_budget(budget: usize) -> Self {
92        Self {
93            budget,
94            ..Default::default()
95        }
96    }
97}
98
99impl Default for BufferManagerConfig {
100    fn default() -> Self {
101        let system_memory = Self::detect_system_memory();
102        Self {
103            budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
104            soft_limit_fraction: 0.70,
105            evict_limit_fraction: 0.85,
106            hard_limit_fraction: 0.95,
107            background_eviction: false, // Disabled by default for simplicity
108            spill_path: None,
109        }
110    }
111}
112
113/// The central unified buffer manager.
114///
115/// Manages memory allocation across all subsystems with pressure-aware
116/// eviction and optional spilling support.
117pub struct BufferManager {
118    /// Configuration.
119    config: BufferManagerConfig,
120    /// Total allocated bytes.
121    allocated: AtomicUsize,
122    /// Per-region allocated bytes.
123    region_allocated: [AtomicUsize; 4],
124    /// Registered memory consumers.
125    consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
126    /// Computed soft limit in bytes.
127    soft_limit: usize,
128    /// Computed eviction limit in bytes.
129    evict_limit: usize,
130    /// Computed hard limit in bytes.
131    hard_limit: usize,
132    /// Shutdown flag.
133    shutdown: AtomicBool,
134}
135
136impl BufferManager {
137    /// Creates a new buffer manager with the given configuration.
138    #[must_use]
139    pub fn new(config: BufferManagerConfig) -> Arc<Self> {
140        let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
141        let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
142        let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
143
144        Arc::new(Self {
145            config,
146            allocated: AtomicUsize::new(0),
147            region_allocated: [
148                AtomicUsize::new(0),
149                AtomicUsize::new(0),
150                AtomicUsize::new(0),
151                AtomicUsize::new(0),
152            ],
153            consumers: RwLock::new(Vec::new()),
154            soft_limit,
155            evict_limit,
156            hard_limit,
157            shutdown: AtomicBool::new(false),
158        })
159    }
160
161    /// Creates a buffer manager with default configuration.
162    #[must_use]
163    pub fn with_defaults() -> Arc<Self> {
164        Self::new(BufferManagerConfig::default())
165    }
166
167    /// Creates a buffer manager with a specific budget.
168    #[must_use]
169    pub fn with_budget(budget: usize) -> Arc<Self> {
170        Self::new(BufferManagerConfig::with_budget(budget))
171    }
172
173    /// Attempts to allocate memory for the given region.
174    ///
175    /// Returns `None` if allocation would exceed the hard limit after
176    /// eviction attempts.
177    pub fn try_allocate(
178        self: &Arc<Self>,
179        size: usize,
180        region: MemoryRegion,
181    ) -> Option<MemoryGrant> {
182        // Check if we can allocate
183        let current = self.allocated.load(Ordering::Relaxed);
184
185        if current + size > self.hard_limit {
186            // Try eviction first
187            self.run_eviction_cycle(true);
188
189            // Check again
190            let current = self.allocated.load(Ordering::Relaxed);
191            if current + size > self.hard_limit {
192                return None;
193            }
194        }
195
196        // Perform allocation
197        self.allocated.fetch_add(size, Ordering::Relaxed);
198        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
199
200        // Check pressure and potentially trigger background eviction
201        self.check_pressure();
202
203        Some(MemoryGrant::new(
204            Arc::clone(self) as Arc<dyn GrantReleaser>,
205            size,
206            region,
207        ))
208    }
209
210    /// Returns the current pressure level.
211    #[must_use]
212    pub fn pressure_level(&self) -> PressureLevel {
213        let current = self.allocated.load(Ordering::Relaxed);
214        self.compute_pressure_level(current)
215    }
216
217    /// Returns current buffer statistics.
218    #[must_use]
219    pub fn stats(&self) -> BufferStats {
220        let total_allocated = self.allocated.load(Ordering::Relaxed);
221        BufferStats {
222            budget: self.config.budget,
223            total_allocated,
224            region_allocated: [
225                self.region_allocated[0].load(Ordering::Relaxed),
226                self.region_allocated[1].load(Ordering::Relaxed),
227                self.region_allocated[2].load(Ordering::Relaxed),
228                self.region_allocated[3].load(Ordering::Relaxed),
229            ],
230            pressure_level: self.compute_pressure_level(total_allocated),
231            consumer_count: self.consumers.read().len(),
232        }
233    }
234
235    /// Registers a memory consumer for eviction callbacks.
236    pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
237        self.consumers.write().push(consumer);
238    }
239
240    /// Unregisters a memory consumer by name.
241    pub fn unregister_consumer(&self, name: &str) {
242        self.consumers.write().retain(|c| c.name() != name);
243    }
244
245    /// Forces eviction to reach the target usage.
246    ///
247    /// Returns the number of bytes actually freed.
248    pub fn evict_to_target(&self, target_bytes: usize) -> usize {
249        let current = self.allocated.load(Ordering::Relaxed);
250        if current <= target_bytes {
251            return 0;
252        }
253
254        let to_free = current - target_bytes;
255        self.run_eviction_internal(to_free)
256    }
257
258    /// Returns the configuration.
259    #[must_use]
260    pub fn config(&self) -> &BufferManagerConfig {
261        &self.config
262    }
263
264    /// Returns the memory budget.
265    #[must_use]
266    pub fn budget(&self) -> usize {
267        self.config.budget
268    }
269
270    /// Returns currently allocated bytes.
271    #[must_use]
272    pub fn allocated(&self) -> usize {
273        self.allocated.load(Ordering::Relaxed)
274    }
275
276    /// Returns available bytes.
277    #[must_use]
278    pub fn available(&self) -> usize {
279        self.config
280            .budget
281            .saturating_sub(self.allocated.load(Ordering::Relaxed))
282    }
283
284    /// Shuts down the buffer manager.
285    pub fn shutdown(&self) {
286        self.shutdown.store(true, Ordering::Relaxed);
287    }
288
289    // === Internal methods ===
290
291    fn compute_pressure_level(&self, current: usize) -> PressureLevel {
292        if current >= self.hard_limit {
293            PressureLevel::Critical
294        } else if current >= self.evict_limit {
295            PressureLevel::High
296        } else if current >= self.soft_limit {
297            PressureLevel::Moderate
298        } else {
299            PressureLevel::Normal
300        }
301    }
302
303    fn check_pressure(&self) {
304        let level = self.pressure_level();
305        if level.requires_eviction() {
306            // In a more complete implementation, this would signal
307            // a background thread. For now, do synchronous eviction.
308            let aggressive = level >= PressureLevel::High;
309            self.run_eviction_cycle(aggressive);
310        }
311    }
312
313    fn run_eviction_cycle(&self, aggressive: bool) -> usize {
314        let target = if aggressive {
315            self.soft_limit
316        } else {
317            self.evict_limit
318        };
319
320        let current = self.allocated.load(Ordering::Relaxed);
321        if current <= target {
322            return 0;
323        }
324
325        let to_free = current - target;
326        self.run_eviction_internal(to_free)
327    }
328
329    fn run_eviction_internal(&self, to_free: usize) -> usize {
330        let consumers = self.consumers.read();
331
332        // Sort consumers by priority (lowest first = evict first)
333        let mut sorted: Vec<_> = consumers.iter().collect();
334        sorted.sort_by_key(|c| c.eviction_priority());
335
336        let mut total_freed = 0;
337        for consumer in sorted {
338            if total_freed >= to_free {
339                break;
340            }
341
342            let remaining = to_free - total_freed;
343            let consumer_usage = consumer.memory_usage();
344
345            // Ask consumer to evict up to half its usage or remaining needed
346            let target_evict = remaining.min(consumer_usage / 2);
347            if target_evict > 0 {
348                let freed = consumer.evict(target_evict);
349                total_freed += freed;
350                // Note: consumers should call release through their grants,
351                // so we don't double-decrement here.
352            }
353        }
354
355        total_freed
356    }
357}
358
359impl GrantReleaser for BufferManager {
360    fn release(&self, size: usize, region: MemoryRegion) {
361        self.allocated.fetch_sub(size, Ordering::Relaxed);
362        self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
363    }
364
365    fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
366        let current = self.allocated.load(Ordering::Relaxed);
367
368        if current + size > self.hard_limit {
369            // Try eviction
370            self.run_eviction_cycle(true);
371
372            let current = self.allocated.load(Ordering::Relaxed);
373            if current + size > self.hard_limit {
374                return false;
375            }
376        }
377
378        self.allocated.fetch_add(size, Ordering::Relaxed);
379        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
380        true
381    }
382}
383
384impl Drop for BufferManager {
385    fn drop(&mut self) {
386        self.shutdown.store(true, Ordering::Relaxed);
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use crate::memory::buffer::consumer::priorities;
394    use std::sync::atomic::AtomicUsize;
395
396    struct TestConsumer {
397        name: String,
398        usage: AtomicUsize,
399        priority: u8,
400        region: MemoryRegion,
401        evicted: AtomicUsize,
402    }
403
404    impl TestConsumer {
405        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
406            Arc::new(Self {
407                name: name.to_string(),
408                usage: AtomicUsize::new(usage),
409                priority,
410                region,
411                evicted: AtomicUsize::new(0),
412            })
413        }
414    }
415
416    impl MemoryConsumer for TestConsumer {
417        fn name(&self) -> &str {
418            &self.name
419        }
420
421        fn memory_usage(&self) -> usize {
422            self.usage.load(Ordering::Relaxed)
423        }
424
425        fn eviction_priority(&self) -> u8 {
426            self.priority
427        }
428
429        fn region(&self) -> MemoryRegion {
430            self.region
431        }
432
433        fn evict(&self, target_bytes: usize) -> usize {
434            let current = self.usage.load(Ordering::Relaxed);
435            let to_evict = target_bytes.min(current);
436            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
437            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
438            to_evict
439        }
440    }
441
442    #[test]
443    fn test_basic_allocation() {
444        let config = BufferManagerConfig {
445            budget: 1024 * 1024, // 1MB
446            ..Default::default()
447        };
448        let manager = BufferManager::new(config);
449
450        let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
451        assert!(grant.is_some());
452        assert_eq!(manager.stats().total_allocated, 1024);
453    }
454
455    #[test]
456    fn test_grant_raii_release() {
457        let config = BufferManagerConfig {
458            budget: 1024,
459            ..Default::default()
460        };
461        let manager = BufferManager::new(config);
462
463        {
464            let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
465            assert_eq!(manager.stats().total_allocated, 512);
466        }
467
468        // Grant dropped, memory should be released
469        assert_eq!(manager.stats().total_allocated, 0);
470    }
471
472    #[test]
473    fn test_pressure_levels() {
474        let config = BufferManagerConfig {
475            budget: 1000,
476            soft_limit_fraction: 0.70,
477            evict_limit_fraction: 0.85,
478            hard_limit_fraction: 0.95,
479            background_eviction: false,
480            spill_path: None,
481        };
482        let manager = BufferManager::new(config);
483
484        assert_eq!(manager.pressure_level(), PressureLevel::Normal);
485
486        // Allocate to 70% (soft limit)
487        let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
488        assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
489
490        // Allocate to 85% (evict limit)
491        let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
492        assert_eq!(manager.pressure_level(), PressureLevel::High);
493
494        // Note: Can't easily test Critical without blocking
495    }
496
497    #[test]
498    fn test_region_tracking() {
499        let config = BufferManagerConfig {
500            budget: 10000,
501            ..Default::default()
502        };
503        let manager = BufferManager::new(config);
504
505        let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
506        let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
507        let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
508
509        let stats = manager.stats();
510        assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
511        assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
512        assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
513        assert_eq!(stats.total_allocated, 600);
514    }
515
516    #[test]
517    fn test_consumer_registration() {
518        let manager = BufferManager::with_budget(10000);
519
520        let consumer = TestConsumer::new(
521            "test",
522            1000,
523            priorities::INDEX_BUFFERS,
524            MemoryRegion::IndexBuffers,
525        );
526
527        manager.register_consumer(consumer);
528        assert_eq!(manager.stats().consumer_count, 1);
529
530        manager.unregister_consumer("test");
531        assert_eq!(manager.stats().consumer_count, 0);
532    }
533
534    #[test]
535    fn test_eviction_ordering() {
536        let manager = BufferManager::with_budget(10000);
537
538        // Low priority consumer (evict first)
539        let low_priority = TestConsumer::new(
540            "low",
541            500,
542            priorities::SPILL_STAGING,
543            MemoryRegion::SpillStaging,
544        );
545
546        // High priority consumer (evict last)
547        let high_priority = TestConsumer::new(
548            "high",
549            500,
550            priorities::ACTIVE_TRANSACTION,
551            MemoryRegion::ExecutionBuffers,
552        );
553
554        manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
555        manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
556
557        // Manually set allocated to simulate memory usage
558        // (consumers track their own usage separately from manager's allocation tracking)
559        manager.allocated.store(1000, Ordering::Relaxed);
560
561        // Request eviction to target 700 (need to free 300 bytes)
562        let freed = manager.evict_to_target(700);
563
564        // Low priority should be evicted first (up to half = 250)
565        assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
566        assert!(freed > 0);
567    }
568
569    #[test]
570    fn test_hard_limit_blocking() {
571        let config = BufferManagerConfig {
572            budget: 1000,
573            soft_limit_fraction: 0.70,
574            evict_limit_fraction: 0.85,
575            hard_limit_fraction: 0.95,
576            background_eviction: false,
577            spill_path: None,
578        };
579        let manager = BufferManager::new(config);
580
581        // Allocate up to hard limit (950 bytes)
582        let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
583
584        // This should fail (would exceed hard limit)
585        let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
586        assert!(g2.is_none());
587    }
588
589    #[test]
590    fn test_available_memory() {
591        let manager = BufferManager::with_budget(1000);
592
593        assert_eq!(manager.available(), 1000);
594
595        let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
596        assert_eq!(manager.available(), 700);
597    }
598}