Skip to main content

graphos_common/memory/buffer/
manager.rs

1//! Unified buffer manager implementation.
2
3use super::consumer::MemoryConsumer;
4use super::grant::{GrantReleaser, MemoryGrant};
5use super::region::MemoryRegion;
6use super::stats::{BufferStats, PressureLevel};
7use parking_lot::RwLock;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11
12/// Default memory budget as a fraction of system memory.
13const DEFAULT_MEMORY_FRACTION: f64 = 0.75;
14
15/// Configuration for the buffer manager.
16#[derive(Debug, Clone)]
17pub struct BufferManagerConfig {
18    /// Total memory budget in bytes.
19    pub budget: usize,
20    /// Soft limit threshold (default: 70%).
21    pub soft_limit_fraction: f64,
22    /// Eviction threshold (default: 85%).
23    pub evict_limit_fraction: f64,
24    /// Hard limit threshold (default: 95%).
25    pub hard_limit_fraction: f64,
26    /// Enable background eviction thread.
27    pub background_eviction: bool,
28    /// Directory for spilling data to disk.
29    pub spill_path: Option<PathBuf>,
30}
31
32impl BufferManagerConfig {
33    /// Detects system memory size.
34    ///
35    /// Returns a conservative estimate if detection fails.
36    #[must_use]
37    pub fn detect_system_memory() -> usize {
38        // Try to detect system memory
39        // On failure, return a conservative 1GB default
40        #[cfg(target_os = "windows")]
41        {
42            // Windows: Use GetPhysicallyInstalledSystemMemory or GlobalMemoryStatusEx
43            // For now, use a fallback
44            Self::fallback_system_memory()
45        }
46
47        #[cfg(target_os = "linux")]
48        {
49            // Linux: Read from /proc/meminfo
50            if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") {
51                for line in contents.lines() {
52                    if line.starts_with("MemTotal:") {
53                        if let Some(kb_str) = line.split_whitespace().nth(1) {
54                            if let Ok(kb) = kb_str.parse::<usize>() {
55                                return kb * 1024;
56                            }
57                        }
58                    }
59                }
60            }
61            Self::fallback_system_memory()
62        }
63
64        #[cfg(target_os = "macos")]
65        {
66            // macOS: Use sysctl
67            Self::fallback_system_memory()
68        }
69
70        #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
71        {
72            Self::fallback_system_memory()
73        }
74    }
75
76    fn fallback_system_memory() -> usize {
77        // Default to 1GB if detection fails
78        1024 * 1024 * 1024
79    }
80
81    /// Creates a config with the given budget.
82    #[must_use]
83    pub fn with_budget(budget: usize) -> Self {
84        Self {
85            budget,
86            ..Default::default()
87        }
88    }
89}
90
91impl Default for BufferManagerConfig {
92    fn default() -> Self {
93        let system_memory = Self::detect_system_memory();
94        Self {
95            budget: (system_memory as f64 * DEFAULT_MEMORY_FRACTION) as usize,
96            soft_limit_fraction: 0.70,
97            evict_limit_fraction: 0.85,
98            hard_limit_fraction: 0.95,
99            background_eviction: false, // Disabled by default for simplicity
100            spill_path: None,
101        }
102    }
103}
104
105/// The central unified buffer manager.
106///
107/// Manages memory allocation across all subsystems with pressure-aware
108/// eviction and optional spilling support.
109pub struct BufferManager {
110    /// Configuration.
111    config: BufferManagerConfig,
112    /// Total allocated bytes.
113    allocated: AtomicUsize,
114    /// Per-region allocated bytes.
115    region_allocated: [AtomicUsize; 4],
116    /// Registered memory consumers.
117    consumers: RwLock<Vec<Arc<dyn MemoryConsumer>>>,
118    /// Computed soft limit in bytes.
119    soft_limit: usize,
120    /// Computed eviction limit in bytes.
121    evict_limit: usize,
122    /// Computed hard limit in bytes.
123    hard_limit: usize,
124    /// Shutdown flag.
125    shutdown: AtomicBool,
126}
127
128impl BufferManager {
129    /// Creates a new buffer manager with the given configuration.
130    #[must_use]
131    pub fn new(config: BufferManagerConfig) -> Arc<Self> {
132        let soft_limit = (config.budget as f64 * config.soft_limit_fraction) as usize;
133        let evict_limit = (config.budget as f64 * config.evict_limit_fraction) as usize;
134        let hard_limit = (config.budget as f64 * config.hard_limit_fraction) as usize;
135
136        Arc::new(Self {
137            config,
138            allocated: AtomicUsize::new(0),
139            region_allocated: [
140                AtomicUsize::new(0),
141                AtomicUsize::new(0),
142                AtomicUsize::new(0),
143                AtomicUsize::new(0),
144            ],
145            consumers: RwLock::new(Vec::new()),
146            soft_limit,
147            evict_limit,
148            hard_limit,
149            shutdown: AtomicBool::new(false),
150        })
151    }
152
153    /// Creates a buffer manager with default configuration.
154    #[must_use]
155    pub fn with_defaults() -> Arc<Self> {
156        Self::new(BufferManagerConfig::default())
157    }
158
159    /// Creates a buffer manager with a specific budget.
160    #[must_use]
161    pub fn with_budget(budget: usize) -> Arc<Self> {
162        Self::new(BufferManagerConfig::with_budget(budget))
163    }
164
165    /// Attempts to allocate memory for the given region.
166    ///
167    /// Returns `None` if allocation would exceed the hard limit after
168    /// eviction attempts.
169    pub fn try_allocate(
170        self: &Arc<Self>,
171        size: usize,
172        region: MemoryRegion,
173    ) -> Option<MemoryGrant> {
174        // Check if we can allocate
175        let current = self.allocated.load(Ordering::Relaxed);
176
177        if current + size > self.hard_limit {
178            // Try eviction first
179            self.run_eviction_cycle(true);
180
181            // Check again
182            let current = self.allocated.load(Ordering::Relaxed);
183            if current + size > self.hard_limit {
184                return None;
185            }
186        }
187
188        // Perform allocation
189        self.allocated.fetch_add(size, Ordering::Relaxed);
190        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
191
192        // Check pressure and potentially trigger background eviction
193        self.check_pressure();
194
195        Some(MemoryGrant::new(
196            Arc::clone(self) as Arc<dyn GrantReleaser>,
197            size,
198            region,
199        ))
200    }
201
202    /// Returns the current pressure level.
203    #[must_use]
204    pub fn pressure_level(&self) -> PressureLevel {
205        let current = self.allocated.load(Ordering::Relaxed);
206        self.compute_pressure_level(current)
207    }
208
209    /// Returns current buffer statistics.
210    #[must_use]
211    pub fn stats(&self) -> BufferStats {
212        let total_allocated = self.allocated.load(Ordering::Relaxed);
213        BufferStats {
214            budget: self.config.budget,
215            total_allocated,
216            region_allocated: [
217                self.region_allocated[0].load(Ordering::Relaxed),
218                self.region_allocated[1].load(Ordering::Relaxed),
219                self.region_allocated[2].load(Ordering::Relaxed),
220                self.region_allocated[3].load(Ordering::Relaxed),
221            ],
222            pressure_level: self.compute_pressure_level(total_allocated),
223            consumer_count: self.consumers.read().len(),
224        }
225    }
226
227    /// Registers a memory consumer for eviction callbacks.
228    pub fn register_consumer(&self, consumer: Arc<dyn MemoryConsumer>) {
229        self.consumers.write().push(consumer);
230    }
231
232    /// Unregisters a memory consumer by name.
233    pub fn unregister_consumer(&self, name: &str) {
234        self.consumers.write().retain(|c| c.name() != name);
235    }
236
237    /// Forces eviction to reach the target usage.
238    ///
239    /// Returns the number of bytes actually freed.
240    pub fn evict_to_target(&self, target_bytes: usize) -> usize {
241        let current = self.allocated.load(Ordering::Relaxed);
242        if current <= target_bytes {
243            return 0;
244        }
245
246        let to_free = current - target_bytes;
247        self.run_eviction_internal(to_free)
248    }
249
250    /// Returns the configuration.
251    #[must_use]
252    pub fn config(&self) -> &BufferManagerConfig {
253        &self.config
254    }
255
256    /// Returns the memory budget.
257    #[must_use]
258    pub fn budget(&self) -> usize {
259        self.config.budget
260    }
261
262    /// Returns currently allocated bytes.
263    #[must_use]
264    pub fn allocated(&self) -> usize {
265        self.allocated.load(Ordering::Relaxed)
266    }
267
268    /// Returns available bytes.
269    #[must_use]
270    pub fn available(&self) -> usize {
271        self.config
272            .budget
273            .saturating_sub(self.allocated.load(Ordering::Relaxed))
274    }
275
276    /// Shuts down the buffer manager.
277    pub fn shutdown(&self) {
278        self.shutdown.store(true, Ordering::Relaxed);
279    }
280
281    // === Internal methods ===
282
283    fn compute_pressure_level(&self, current: usize) -> PressureLevel {
284        if current >= self.hard_limit {
285            PressureLevel::Critical
286        } else if current >= self.evict_limit {
287            PressureLevel::High
288        } else if current >= self.soft_limit {
289            PressureLevel::Moderate
290        } else {
291            PressureLevel::Normal
292        }
293    }
294
295    fn check_pressure(&self) {
296        let level = self.pressure_level();
297        if level.requires_eviction() {
298            // In a more complete implementation, this would signal
299            // a background thread. For now, do synchronous eviction.
300            let aggressive = level >= PressureLevel::High;
301            self.run_eviction_cycle(aggressive);
302        }
303    }
304
305    fn run_eviction_cycle(&self, aggressive: bool) -> usize {
306        let target = if aggressive {
307            self.soft_limit
308        } else {
309            self.evict_limit
310        };
311
312        let current = self.allocated.load(Ordering::Relaxed);
313        if current <= target {
314            return 0;
315        }
316
317        let to_free = current - target;
318        self.run_eviction_internal(to_free)
319    }
320
321    fn run_eviction_internal(&self, to_free: usize) -> usize {
322        let consumers = self.consumers.read();
323
324        // Sort consumers by priority (lowest first = evict first)
325        let mut sorted: Vec<_> = consumers.iter().collect();
326        sorted.sort_by_key(|c| c.eviction_priority());
327
328        let mut total_freed = 0;
329        for consumer in sorted {
330            if total_freed >= to_free {
331                break;
332            }
333
334            let remaining = to_free - total_freed;
335            let consumer_usage = consumer.memory_usage();
336
337            // Ask consumer to evict up to half its usage or remaining needed
338            let target_evict = remaining.min(consumer_usage / 2);
339            if target_evict > 0 {
340                let freed = consumer.evict(target_evict);
341                total_freed += freed;
342                // Note: consumers should call release through their grants,
343                // so we don't double-decrement here.
344            }
345        }
346
347        total_freed
348    }
349}
350
351impl GrantReleaser for BufferManager {
352    fn release(&self, size: usize, region: MemoryRegion) {
353        self.allocated.fetch_sub(size, Ordering::Relaxed);
354        self.region_allocated[region.index()].fetch_sub(size, Ordering::Relaxed);
355    }
356
357    fn try_allocate_raw(&self, size: usize, region: MemoryRegion) -> bool {
358        let current = self.allocated.load(Ordering::Relaxed);
359
360        if current + size > self.hard_limit {
361            // Try eviction
362            self.run_eviction_cycle(true);
363
364            let current = self.allocated.load(Ordering::Relaxed);
365            if current + size > self.hard_limit {
366                return false;
367            }
368        }
369
370        self.allocated.fetch_add(size, Ordering::Relaxed);
371        self.region_allocated[region.index()].fetch_add(size, Ordering::Relaxed);
372        true
373    }
374}
375
376impl Drop for BufferManager {
377    fn drop(&mut self) {
378        self.shutdown.store(true, Ordering::Relaxed);
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use crate::memory::buffer::consumer::priorities;
386    use std::sync::atomic::AtomicUsize;
387
388    struct TestConsumer {
389        name: String,
390        usage: AtomicUsize,
391        priority: u8,
392        region: MemoryRegion,
393        evicted: AtomicUsize,
394    }
395
396    impl TestConsumer {
397        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Arc<Self> {
398            Arc::new(Self {
399                name: name.to_string(),
400                usage: AtomicUsize::new(usage),
401                priority,
402                region,
403                evicted: AtomicUsize::new(0),
404            })
405        }
406    }
407
408    impl MemoryConsumer for TestConsumer {
409        fn name(&self) -> &str {
410            &self.name
411        }
412
413        fn memory_usage(&self) -> usize {
414            self.usage.load(Ordering::Relaxed)
415        }
416
417        fn eviction_priority(&self) -> u8 {
418            self.priority
419        }
420
421        fn region(&self) -> MemoryRegion {
422            self.region
423        }
424
425        fn evict(&self, target_bytes: usize) -> usize {
426            let current = self.usage.load(Ordering::Relaxed);
427            let to_evict = target_bytes.min(current);
428            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
429            self.evicted.fetch_add(to_evict, Ordering::Relaxed);
430            to_evict
431        }
432    }
433
434    #[test]
435    fn test_basic_allocation() {
436        let config = BufferManagerConfig {
437            budget: 1024 * 1024, // 1MB
438            ..Default::default()
439        };
440        let manager = BufferManager::new(config);
441
442        let grant = manager.try_allocate(1024, MemoryRegion::ExecutionBuffers);
443        assert!(grant.is_some());
444        assert_eq!(manager.stats().total_allocated, 1024);
445    }
446
447    #[test]
448    fn test_grant_raii_release() {
449        let config = BufferManagerConfig {
450            budget: 1024,
451            ..Default::default()
452        };
453        let manager = BufferManager::new(config);
454
455        {
456            let _grant = manager.try_allocate(512, MemoryRegion::ExecutionBuffers);
457            assert_eq!(manager.stats().total_allocated, 512);
458        }
459
460        // Grant dropped, memory should be released
461        assert_eq!(manager.stats().total_allocated, 0);
462    }
463
464    #[test]
465    fn test_pressure_levels() {
466        let config = BufferManagerConfig {
467            budget: 1000,
468            soft_limit_fraction: 0.70,
469            evict_limit_fraction: 0.85,
470            hard_limit_fraction: 0.95,
471            background_eviction: false,
472            spill_path: None,
473        };
474        let manager = BufferManager::new(config);
475
476        assert_eq!(manager.pressure_level(), PressureLevel::Normal);
477
478        // Allocate to 70% (soft limit)
479        let _g1 = manager.try_allocate(700, MemoryRegion::ExecutionBuffers);
480        assert_eq!(manager.pressure_level(), PressureLevel::Moderate);
481
482        // Allocate to 85% (evict limit)
483        let _g2 = manager.try_allocate(150, MemoryRegion::ExecutionBuffers);
484        assert_eq!(manager.pressure_level(), PressureLevel::High);
485
486        // Note: Can't easily test Critical without blocking
487    }
488
489    #[test]
490    fn test_region_tracking() {
491        let config = BufferManagerConfig {
492            budget: 10000,
493            ..Default::default()
494        };
495        let manager = BufferManager::new(config);
496
497        let _g1 = manager.try_allocate(100, MemoryRegion::GraphStorage);
498        let _g2 = manager.try_allocate(200, MemoryRegion::IndexBuffers);
499        let _g3 = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
500
501        let stats = manager.stats();
502        assert_eq!(stats.region_usage(MemoryRegion::GraphStorage), 100);
503        assert_eq!(stats.region_usage(MemoryRegion::IndexBuffers), 200);
504        assert_eq!(stats.region_usage(MemoryRegion::ExecutionBuffers), 300);
505        assert_eq!(stats.total_allocated, 600);
506    }
507
508    #[test]
509    fn test_consumer_registration() {
510        let manager = BufferManager::with_budget(10000);
511
512        let consumer = TestConsumer::new(
513            "test",
514            1000,
515            priorities::INDEX_BUFFERS,
516            MemoryRegion::IndexBuffers,
517        );
518
519        manager.register_consumer(consumer);
520        assert_eq!(manager.stats().consumer_count, 1);
521
522        manager.unregister_consumer("test");
523        assert_eq!(manager.stats().consumer_count, 0);
524    }
525
526    #[test]
527    fn test_eviction_ordering() {
528        let manager = BufferManager::with_budget(10000);
529
530        // Low priority consumer (evict first)
531        let low_priority = TestConsumer::new(
532            "low",
533            500,
534            priorities::SPILL_STAGING,
535            MemoryRegion::SpillStaging,
536        );
537
538        // High priority consumer (evict last)
539        let high_priority = TestConsumer::new(
540            "high",
541            500,
542            priorities::ACTIVE_TRANSACTION,
543            MemoryRegion::ExecutionBuffers,
544        );
545
546        manager.register_consumer(Arc::clone(&low_priority) as Arc<dyn MemoryConsumer>);
547        manager.register_consumer(Arc::clone(&high_priority) as Arc<dyn MemoryConsumer>);
548
549        // Manually set allocated to simulate memory usage
550        // (consumers track their own usage separately from manager's allocation tracking)
551        manager.allocated.store(1000, Ordering::Relaxed);
552
553        // Request eviction to target 700 (need to free 300 bytes)
554        let freed = manager.evict_to_target(700);
555
556        // Low priority should be evicted first (up to half = 250)
557        assert!(low_priority.evicted.load(Ordering::Relaxed) > 0);
558        assert!(freed > 0);
559    }
560
561    #[test]
562    fn test_hard_limit_blocking() {
563        let config = BufferManagerConfig {
564            budget: 1000,
565            soft_limit_fraction: 0.70,
566            evict_limit_fraction: 0.85,
567            hard_limit_fraction: 0.95,
568            background_eviction: false,
569            spill_path: None,
570        };
571        let manager = BufferManager::new(config);
572
573        // Allocate up to hard limit (950 bytes)
574        let _g1 = manager.try_allocate(950, MemoryRegion::ExecutionBuffers);
575
576        // This should fail (would exceed hard limit)
577        let g2 = manager.try_allocate(100, MemoryRegion::ExecutionBuffers);
578        assert!(g2.is_none());
579    }
580
581    #[test]
582    fn test_available_memory() {
583        let manager = BufferManager::with_budget(1000);
584
585        assert_eq!(manager.available(), 1000);
586
587        let _g = manager.try_allocate(300, MemoryRegion::ExecutionBuffers);
588        assert_eq!(manager.available(), 700);
589    }
590}