Skip to main content

grafeo_common/memory/buffer/
consumer.rs

1//! Memory consumer trait for subsystems that use managed memory.
2
3use thiserror::Error;
4
5use super::region::MemoryRegion;
6
7/// Error type for spilling operations.
8#[derive(Error, Debug, Clone)]
9#[non_exhaustive]
10pub enum SpillError {
11    /// Spilling is not supported by this consumer.
12    #[error("spilling not supported")]
13    NotSupported,
14    /// I/O error during spill.
15    #[error("I/O error during spill: {0}")]
16    IoError(String),
17    /// Spill directory not configured.
18    #[error("spill directory not configured")]
19    NoSpillDirectory,
20    /// Insufficient disk space.
21    #[error("insufficient disk space for spill")]
22    InsufficientDiskSpace,
23}
24
25/// Trait for subsystems that consume managed memory.
26///
27/// Memory consumers register with the buffer manager and participate
28/// in eviction when memory pressure is detected. Lower priority consumers
29/// are evicted first.
30pub trait MemoryConsumer: Send + Sync {
31    /// Returns a unique name for this consumer (for debugging/logging).
32    fn name(&self) -> &str;
33
34    /// Returns current memory usage in bytes.
35    fn memory_usage(&self) -> usize;
36
37    /// Returns eviction priority (0 = lowest priority, evict first; 255 = highest, evict last).
38    fn eviction_priority(&self) -> u8;
39
40    /// Returns which memory region this consumer belongs to.
41    fn region(&self) -> MemoryRegion;
42
43    /// Attempts to evict/release memory to reach target usage.
44    ///
45    /// Returns the number of bytes actually freed.
46    fn evict(&self, target_bytes: usize) -> usize;
47
48    /// Returns whether this consumer supports spilling to disk.
49    fn can_spill(&self) -> bool {
50        false
51    }
52
53    /// Spills data to disk to free memory.
54    ///
55    /// Returns the number of bytes freed on success.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if spilling fails or is not supported.
60    fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
61        Err(SpillError::NotSupported)
62    }
63
64    /// Reloads spilled data from disk (called when memory becomes available).
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if reloading fails.
69    fn reload(&self) -> Result<(), SpillError> {
70        Ok(())
71    }
72
73    /// Reports the consumer's current storage tier (Phase 8b introspection).
74    ///
75    /// No default: tier cannot be inferred from `memory_usage()` alone
76    /// because a spillable consumer can be on disk with zero heap usage,
77    /// and [`crate::memory::buffer::BufferManager::reload_eligible`] only
78    /// reloads consumers reporting [`StorageTier::OnDisk`]. Implementors
79    /// that never spill should return [`StorageTier::InMemory`] when
80    /// `memory_usage() > 0` and [`StorageTier::Uninitialized`] otherwise.
81    ///
82    /// [`StorageTier`]: super::tiered::StorageTier
83    /// [`StorageTier::OnDisk`]: super::tiered::StorageTier::OnDisk
84    /// [`StorageTier::InMemory`]: super::tiered::StorageTier::InMemory
85    /// [`StorageTier::Uninitialized`]: super::tiered::StorageTier::Uninitialized
86    ///
87    /// Used by [`crate::memory::buffer::BufferManager::snapshot_consumer_tiers`]
88    /// for observability and tests; not on any hot path.
89    fn current_tier(&self) -> super::tiered::StorageTier;
90}
91
92/// Standard priority levels for common consumer types.
93///
94/// Lower values = evict first, higher values = evict last.
95pub mod priorities {
96    /// Spill staging buffers - evict first (already written to disk).
97    pub const SPILL_STAGING: u8 = 10;
98
99    /// Cached query results - relatively cheap to recompute.
100    pub const QUERY_CACHE: u8 = 30;
101
102    /// Execution operator buffers (sort, aggregate): can be spilled to disk.
103    pub const EXECUTION_BUFFERS: u8 = 40;
104
105    /// Index buffers - can be rebuilt from primary data.
106    pub const INDEX_BUFFERS: u8 = 50;
107
108    /// Idle transaction buffers - not actively in use.
109    pub const IDLE_TRANSACTION: u8 = 70;
110
111    /// Graph storage - persistent data, expensive to reload.
112    pub const GRAPH_STORAGE: u8 = 100;
113
114    /// Active transaction data - highest priority, evict last.
115    pub const ACTIVE_TRANSACTION: u8 = 200;
116}
117
118/// Statistics about a memory consumer.
119#[derive(Debug, Clone)]
120pub struct ConsumerStats {
121    /// Consumer name.
122    pub name: String,
123    /// Memory region.
124    pub region: MemoryRegion,
125    /// Current memory usage in bytes.
126    pub usage_bytes: usize,
127    /// Eviction priority.
128    pub priority: u8,
129    /// Whether spilling is supported.
130    pub can_spill: bool,
131}
132
133impl ConsumerStats {
134    /// Creates stats from a consumer.
135    pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
136        Self {
137            name: consumer.name().to_string(),
138            region: consumer.region(),
139            usage_bytes: consumer.memory_usage(),
140            priority: consumer.eviction_priority(),
141            can_spill: consumer.can_spill(),
142        }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use std::sync::atomic::{AtomicUsize, Ordering};
150
151    struct TestConsumer {
152        name: String,
153        usage: AtomicUsize,
154        priority: u8,
155        region: MemoryRegion,
156    }
157
158    impl TestConsumer {
159        fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
160            Self {
161                name: name.to_string(),
162                usage: AtomicUsize::new(usage),
163                priority,
164                region,
165            }
166        }
167    }
168
169    impl MemoryConsumer for TestConsumer {
170        fn name(&self) -> &str {
171            &self.name
172        }
173
174        fn memory_usage(&self) -> usize {
175            self.usage.load(Ordering::Relaxed)
176        }
177
178        fn eviction_priority(&self) -> u8 {
179            self.priority
180        }
181
182        fn region(&self) -> MemoryRegion {
183            self.region
184        }
185
186        fn evict(&self, target_bytes: usize) -> usize {
187            let current = self.usage.load(Ordering::Relaxed);
188            let to_evict = target_bytes.min(current);
189            self.usage.fetch_sub(to_evict, Ordering::Relaxed);
190            to_evict
191        }
192
193        fn current_tier(&self) -> super::super::tiered::StorageTier {
194            if self.memory_usage() == 0 {
195                super::super::tiered::StorageTier::Uninitialized
196            } else {
197                super::super::tiered::StorageTier::InMemory
198            }
199        }
200    }
201
202    #[test]
203    fn test_consumer_stats() {
204        let consumer = TestConsumer::new(
205            "test",
206            1024,
207            priorities::INDEX_BUFFERS,
208            MemoryRegion::IndexBuffers,
209        );
210
211        let stats = ConsumerStats::from_consumer(&consumer);
212        assert_eq!(stats.name, "test");
213        assert_eq!(stats.usage_bytes, 1024);
214        assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
215        assert_eq!(stats.region, MemoryRegion::IndexBuffers);
216        assert!(!stats.can_spill);
217    }
218
219    #[test]
220    fn test_consumer_eviction() {
221        let consumer = TestConsumer::new(
222            "test",
223            1000,
224            priorities::INDEX_BUFFERS,
225            MemoryRegion::IndexBuffers,
226        );
227
228        let freed = consumer.evict(500);
229        assert_eq!(freed, 500);
230        assert_eq!(consumer.memory_usage(), 500);
231
232        // Try to evict more than available
233        let freed = consumer.evict(1000);
234        assert_eq!(freed, 500);
235        assert_eq!(consumer.memory_usage(), 0);
236    }
237
238    #[test]
239    #[allow(clippy::assertions_on_constants)]
240    fn test_priority_ordering() {
241        assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
242        assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
243        assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
244        assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
245        assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
246    }
247}