Skip to main content

grafeo_common/memory/buffer/
consumer.rs

1//! Memory consumer trait for subsystems that use managed memory.
2
3use thiserror::Error;
4
5use super::region::MemoryRegion;
6
7/// Error type for spilling operations.
8#[derive(Error, Debug, Clone)]
9#[non_exhaustive]
10pub enum SpillError {
11    /// Spilling is not supported by this consumer.
12    #[error("spilling not supported")]
13    NotSupported,
14    /// I/O error during spill.
15    #[error("I/O error during spill: {0}")]
16    IoError(String),
17    /// Spill directory not configured.
18    #[error("spill directory not configured")]
19    NoSpillDirectory,
20    /// Insufficient disk space.
21    #[error("insufficient disk space for spill")]
22    InsufficientDiskSpace,
23}
24
25/// Trait for subsystems that consume managed memory.
26///
27/// Memory consumers register with the buffer manager and participate
28/// in eviction when memory pressure is detected. Lower priority consumers
29/// are evicted first.
30pub trait MemoryConsumer: Send + Sync {
31    /// Returns a unique name for this consumer (for debugging/logging).
32    fn name(&self) -> &str;
33
34    /// Returns current memory usage in bytes.
35    fn memory_usage(&self) -> usize;
36
37    /// Returns eviction priority (0 = lowest priority, evict first; 255 = highest, evict last).
38    fn eviction_priority(&self) -> u8;
39
40    /// Returns which memory region this consumer belongs to.
41    fn region(&self) -> MemoryRegion;
42
43    /// Attempts to evict/release memory to reach target usage.
44    ///
45    /// Returns the number of bytes actually freed.
46    fn evict(&self, target_bytes: usize) -> usize;
47
48    /// Returns whether this consumer supports spilling to disk.
49    fn can_spill(&self) -> bool {
50        false
51    }
52
53    /// Spills data to disk to free memory.
54    ///
55    /// Returns the number of bytes freed on success.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if spilling fails or is not supported.
60    fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
61        Err(SpillError::NotSupported)
62    }
63
64    /// Reloads spilled data from disk (called when memory becomes available).
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if reloading fails.
69    fn reload(&self) -> Result<(), SpillError> {
70        Ok(())
71    }
72}
73
74/// Standard priority levels for common consumer types.
75///
76/// Lower values = evict first, higher values = evict last.
77pub mod priorities {
78    /// Spill staging buffers - evict first (already written to disk).
79    pub const SPILL_STAGING: u8 = 10;
80
81    /// Cached query results - relatively cheap to recompute.
82    pub const QUERY_CACHE: u8 = 30;
83
84    /// Index buffers - can be rebuilt from primary data.
85    pub const INDEX_BUFFERS: u8 = 50;
86
87    /// Idle transaction buffers - not actively in use.
88    pub const IDLE_TRANSACTION: u8 = 70;
89
90    /// Graph storage - persistent data, expensive to reload.
91    pub const GRAPH_STORAGE: u8 = 100;
92
93    /// Active transaction data - highest priority, evict last.
94    pub const ACTIVE_TRANSACTION: u8 = 200;
95}
96
97/// Statistics about a memory consumer.
98#[derive(Debug, Clone)]
99pub struct ConsumerStats {
100    /// Consumer name.
101    pub name: String,
102    /// Memory region.
103    pub region: MemoryRegion,
104    /// Current memory usage in bytes.
105    pub usage_bytes: usize,
106    /// Eviction priority.
107    pub priority: u8,
108    /// Whether spilling is supported.
109    pub can_spill: bool,
110}
111
112impl ConsumerStats {
113    /// Creates stats from a consumer.
114    pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
115        Self {
116            name: consumer.name().to_string(),
117            region: consumer.region(),
118            usage_bytes: consumer.memory_usage(),
119            priority: consumer.eviction_priority(),
120            can_spill: consumer.can_spill(),
121        }
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use std::sync::atomic::{AtomicUsize, Ordering};
129
130    struct TestConsumer {
131        name: String,
132        usage: AtomicUsize,
133        priority: u8,
134        region: MemoryRegion,
135    }
136
137    impl TestConsumer {
138        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
139            Self {
140                name: name.to_string(),
141                usage: AtomicUsize::new(usage),
142                priority,
143                region,
144            }
145        }
146    }
147
148    impl MemoryConsumer for TestConsumer {
149        fn name(&self) -> &str {
150            &self.name
151        }
152
153        fn memory_usage(&self) -> usize {
154            self.usage.load(Ordering::Relaxed)
155        }
156
157        fn eviction_priority(&self) -> u8 {
158            self.priority
159        }
160
161        fn region(&self) -> MemoryRegion {
162            self.region
163        }
164
165        fn evict(&self, target_bytes: usize) -> usize {
166            let current = self.usage.load(Ordering::Relaxed);
167            let to_evict = target_bytes.min(current);
168            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
169            to_evict
170        }
171    }
172
173    #[test]
174    fn test_consumer_stats() {
175        let consumer = TestConsumer::new(
176            "test",
177            1024,
178            priorities::INDEX_BUFFERS,
179            MemoryRegion::IndexBuffers,
180        );
181
182        let stats = ConsumerStats::from_consumer(&consumer);
183        assert_eq!(stats.name, "test");
184        assert_eq!(stats.usage_bytes, 1024);
185        assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
186        assert_eq!(stats.region, MemoryRegion::IndexBuffers);
187        assert!(!stats.can_spill);
188    }
189
190    #[test]
191    fn test_consumer_eviction() {
192        let consumer = TestConsumer::new(
193            "test",
194            1000,
195            priorities::INDEX_BUFFERS,
196            MemoryRegion::IndexBuffers,
197        );
198
199        let freed = consumer.evict(500);
200        assert_eq!(freed, 500);
201        assert_eq!(consumer.memory_usage(), 500);
202
203        // Try to evict more than available
204        let freed = consumer.evict(1000);
205        assert_eq!(freed, 500);
206        assert_eq!(consumer.memory_usage(), 0);
207    }
208
209    #[test]
210    #[allow(clippy::assertions_on_constants)]
211    fn test_priority_ordering() {
212        assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
213        assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
214        assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
215        assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
216        assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
217    }
218}