Skip to main content

graphos_common/memory/buffer/
consumer.rs

1//! Memory consumer trait for subsystems that use managed memory.
2
3use thiserror::Error;
4
5use super::region::MemoryRegion;
6
7/// Error type for spilling operations.
8#[derive(Error, Debug, Clone)]
9pub enum SpillError {
10    /// Spilling is not supported by this consumer.
11    #[error("spilling not supported")]
12    NotSupported,
13    /// I/O error during spill.
14    #[error("I/O error during spill: {0}")]
15    IoError(String),
16    /// Spill directory not configured.
17    #[error("spill directory not configured")]
18    NoSpillDirectory,
19    /// Insufficient disk space.
20    #[error("insufficient disk space for spill")]
21    InsufficientDiskSpace,
22}
23
24/// Trait for subsystems that consume managed memory.
25///
26/// Memory consumers register with the buffer manager and participate
27/// in eviction when memory pressure is detected. Lower priority consumers
28/// are evicted first.
29pub trait MemoryConsumer: Send + Sync {
30    /// Returns a unique name for this consumer (for debugging/logging).
31    fn name(&self) -> &str;
32
33    /// Returns current memory usage in bytes.
34    fn memory_usage(&self) -> usize;
35
36    /// Returns eviction priority (0 = lowest priority, evict first; 255 = highest, evict last).
37    fn eviction_priority(&self) -> u8;
38
39    /// Returns which memory region this consumer belongs to.
40    fn region(&self) -> MemoryRegion;
41
42    /// Attempts to evict/release memory to reach target usage.
43    ///
44    /// Returns the number of bytes actually freed.
45    fn evict(&self, target_bytes: usize) -> usize;
46
47    /// Returns whether this consumer supports spilling to disk.
48    fn can_spill(&self) -> bool {
49        false
50    }
51
52    /// Spills data to disk to free memory.
53    ///
54    /// Returns the number of bytes freed on success.
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if spilling fails or is not supported.
59    fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
60        Err(SpillError::NotSupported)
61    }
62
63    /// Reloads spilled data from disk (called when memory becomes available).
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if reloading fails.
68    fn reload(&self) -> Result<(), SpillError> {
69        Ok(())
70    }
71}
72
73/// Standard priority levels for common consumer types.
74///
75/// Lower values = evict first, higher values = evict last.
76pub mod priorities {
77    /// Spill staging buffers - evict first (already written to disk).
78    pub const SPILL_STAGING: u8 = 10;
79
80    /// Cached query results - relatively cheap to recompute.
81    pub const QUERY_CACHE: u8 = 30;
82
83    /// Index buffers - can be rebuilt from primary data.
84    pub const INDEX_BUFFERS: u8 = 50;
85
86    /// Idle transaction buffers - not actively in use.
87    pub const IDLE_TRANSACTION: u8 = 70;
88
89    /// Graph storage - persistent data, expensive to reload.
90    pub const GRAPH_STORAGE: u8 = 100;
91
92    /// Active transaction data - highest priority, evict last.
93    pub const ACTIVE_TRANSACTION: u8 = 200;
94}
95
96/// Statistics about a memory consumer.
97#[derive(Debug, Clone)]
98pub struct ConsumerStats {
99    /// Consumer name.
100    pub name: String,
101    /// Memory region.
102    pub region: MemoryRegion,
103    /// Current memory usage in bytes.
104    pub usage_bytes: usize,
105    /// Eviction priority.
106    pub priority: u8,
107    /// Whether spilling is supported.
108    pub can_spill: bool,
109}
110
111impl ConsumerStats {
112    /// Creates stats from a consumer.
113    pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
114        Self {
115            name: consumer.name().to_string(),
116            region: consumer.region(),
117            usage_bytes: consumer.memory_usage(),
118            priority: consumer.eviction_priority(),
119            can_spill: consumer.can_spill(),
120        }
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use std::sync::atomic::{AtomicUsize, Ordering};
128
129    struct TestConsumer {
130        name: String,
131        usage: AtomicUsize,
132        priority: u8,
133        region: MemoryRegion,
134    }
135
136    impl TestConsumer {
137        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
138            Self {
139                name: name.to_string(),
140                usage: AtomicUsize::new(usage),
141                priority,
142                region,
143            }
144        }
145    }
146
147    impl MemoryConsumer for TestConsumer {
148        fn name(&self) -> &str {
149            &self.name
150        }
151
152        fn memory_usage(&self) -> usize {
153            self.usage.load(Ordering::Relaxed)
154        }
155
156        fn eviction_priority(&self) -> u8 {
157            self.priority
158        }
159
160        fn region(&self) -> MemoryRegion {
161            self.region
162        }
163
164        fn evict(&self, target_bytes: usize) -> usize {
165            let current = self.usage.load(Ordering::Relaxed);
166            let to_evict = target_bytes.min(current);
167            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
168            to_evict
169        }
170    }
171
172    #[test]
173    fn test_consumer_stats() {
174        let consumer = TestConsumer::new(
175            "test",
176            1024,
177            priorities::INDEX_BUFFERS,
178            MemoryRegion::IndexBuffers,
179        );
180
181        let stats = ConsumerStats::from_consumer(&consumer);
182        assert_eq!(stats.name, "test");
183        assert_eq!(stats.usage_bytes, 1024);
184        assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
185        assert_eq!(stats.region, MemoryRegion::IndexBuffers);
186        assert!(!stats.can_spill);
187    }
188
189    #[test]
190    fn test_consumer_eviction() {
191        let consumer = TestConsumer::new(
192            "test",
193            1000,
194            priorities::INDEX_BUFFERS,
195            MemoryRegion::IndexBuffers,
196        );
197
198        let freed = consumer.evict(500);
199        assert_eq!(freed, 500);
200        assert_eq!(consumer.memory_usage(), 500);
201
202        // Try to evict more than available
203        let freed = consumer.evict(1000);
204        assert_eq!(freed, 500);
205        assert_eq!(consumer.memory_usage(), 0);
206    }
207
208    #[test]
209    #[allow(clippy::assertions_on_constants)]
210    fn test_priority_ordering() {
211        assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
212        assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
213        assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
214        assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
215        assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
216    }
217}