Skip to main content

grafeo_common/memory/buffer/
consumer.rs

1//! Memory consumer trait for subsystems that use managed memory.
2
3use thiserror::Error;
4
5use super::region::MemoryRegion;
6
7/// Error type for spilling operations.
8#[derive(Error, Debug, Clone)]
9#[non_exhaustive]
10pub enum SpillError {
11    /// Spilling is not supported by this consumer.
12    #[error("spilling not supported")]
13    NotSupported,
14    /// I/O error during spill.
15    #[error("I/O error during spill: {0}")]
16    IoError(String),
17    /// Spill directory not configured.
18    #[error("spill directory not configured")]
19    NoSpillDirectory,
20    /// Insufficient disk space.
21    #[error("insufficient disk space for spill")]
22    InsufficientDiskSpace,
23}
24
25/// Trait for subsystems that consume managed memory.
26///
27/// Memory consumers register with the buffer manager and participate
28/// in eviction when memory pressure is detected. Lower priority consumers
29/// are evicted first.
30pub trait MemoryConsumer: Send + Sync {
31    /// Returns a unique name for this consumer (for debugging/logging).
32    fn name(&self) -> &str;
33
34    /// Returns current memory usage in bytes.
35    fn memory_usage(&self) -> usize;
36
37    /// Returns eviction priority (0 = lowest priority, evict first; 255 = highest, evict last).
38    fn eviction_priority(&self) -> u8;
39
40    /// Returns which memory region this consumer belongs to.
41    fn region(&self) -> MemoryRegion;
42
43    /// Attempts to evict/release memory to reach target usage.
44    ///
45    /// Returns the number of bytes actually freed.
46    fn evict(&self, target_bytes: usize) -> usize;
47
48    /// Returns whether this consumer supports spilling to disk.
49    fn can_spill(&self) -> bool {
50        false
51    }
52
53    /// Spills data to disk to free memory.
54    ///
55    /// Returns the number of bytes freed on success.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if spilling fails or is not supported.
60    fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
61        Err(SpillError::NotSupported)
62    }
63
64    /// Reloads spilled data from disk (called when memory becomes available).
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if reloading fails.
69    fn reload(&self) -> Result<(), SpillError> {
70        Ok(())
71    }
72}
73
74/// Standard priority levels for common consumer types.
75///
76/// Lower values = evict first, higher values = evict last.
77pub mod priorities {
78    /// Spill staging buffers - evict first (already written to disk).
79    pub const SPILL_STAGING: u8 = 10;
80
81    /// Cached query results - relatively cheap to recompute.
82    pub const QUERY_CACHE: u8 = 30;
83
84    /// Execution operator buffers (sort, aggregate): can be spilled to disk.
85    pub const EXECUTION_BUFFERS: u8 = 40;
86
87    /// Index buffers - can be rebuilt from primary data.
88    pub const INDEX_BUFFERS: u8 = 50;
89
90    /// Idle transaction buffers - not actively in use.
91    pub const IDLE_TRANSACTION: u8 = 70;
92
93    /// Graph storage - persistent data, expensive to reload.
94    pub const GRAPH_STORAGE: u8 = 100;
95
96    /// Active transaction data - highest priority, evict last.
97    pub const ACTIVE_TRANSACTION: u8 = 200;
98}
99
100/// Statistics about a memory consumer.
101#[derive(Debug, Clone)]
102pub struct ConsumerStats {
103    /// Consumer name.
104    pub name: String,
105    /// Memory region.
106    pub region: MemoryRegion,
107    /// Current memory usage in bytes.
108    pub usage_bytes: usize,
109    /// Eviction priority.
110    pub priority: u8,
111    /// Whether spilling is supported.
112    pub can_spill: bool,
113}
114
115impl ConsumerStats {
116    /// Creates stats from a consumer.
117    pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
118        Self {
119            name: consumer.name().to_string(),
120            region: consumer.region(),
121            usage_bytes: consumer.memory_usage(),
122            priority: consumer.eviction_priority(),
123            can_spill: consumer.can_spill(),
124        }
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use std::sync::atomic::{AtomicUsize, Ordering};
132
133    struct TestConsumer {
134        name: String,
135        usage: AtomicUsize,
136        priority: u8,
137        region: MemoryRegion,
138    }
139
140    impl TestConsumer {
141        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
142            Self {
143                name: name.to_string(),
144                usage: AtomicUsize::new(usage),
145                priority,
146                region,
147            }
148        }
149    }
150
151    impl MemoryConsumer for TestConsumer {
152        fn name(&self) -> &str {
153            &self.name
154        }
155
156        fn memory_usage(&self) -> usize {
157            self.usage.load(Ordering::Relaxed)
158        }
159
160        fn eviction_priority(&self) -> u8 {
161            self.priority
162        }
163
164        fn region(&self) -> MemoryRegion {
165            self.region
166        }
167
168        fn evict(&self, target_bytes: usize) -> usize {
169            let current = self.usage.load(Ordering::Relaxed);
170            let to_evict = target_bytes.min(current);
171            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
172            to_evict
173        }
174    }
175
176    #[test]
177    fn test_consumer_stats() {
178        let consumer = TestConsumer::new(
179            "test",
180            1024,
181            priorities::INDEX_BUFFERS,
182            MemoryRegion::IndexBuffers,
183        );
184
185        let stats = ConsumerStats::from_consumer(&consumer);
186        assert_eq!(stats.name, "test");
187        assert_eq!(stats.usage_bytes, 1024);
188        assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
189        assert_eq!(stats.region, MemoryRegion::IndexBuffers);
190        assert!(!stats.can_spill);
191    }
192
193    #[test]
194    fn test_consumer_eviction() {
195        let consumer = TestConsumer::new(
196            "test",
197            1000,
198            priorities::INDEX_BUFFERS,
199            MemoryRegion::IndexBuffers,
200        );
201
202        let freed = consumer.evict(500);
203        assert_eq!(freed, 500);
204        assert_eq!(consumer.memory_usage(), 500);
205
206        // Try to evict more than available
207        let freed = consumer.evict(1000);
208        assert_eq!(freed, 500);
209        assert_eq!(consumer.memory_usage(), 0);
210    }
211
212    #[test]
213    #[allow(clippy::assertions_on_constants)]
214    fn test_priority_ordering() {
215        assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
216        assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
217        assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
218        assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
219        assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
220    }
221}