grafeo_common/memory/buffer/
consumer.rs1use thiserror::Error;
4
5use super::region::MemoryRegion;
6
7#[derive(Error, Debug, Clone)]
9#[non_exhaustive]
10pub enum SpillError {
11 #[error("spilling not supported")]
13 NotSupported,
14 #[error("I/O error during spill: {0}")]
16 IoError(String),
17 #[error("spill directory not configured")]
19 NoSpillDirectory,
20 #[error("insufficient disk space for spill")]
22 InsufficientDiskSpace,
23}
24
25pub trait MemoryConsumer: Send + Sync {
31 fn name(&self) -> &str;
33
34 fn memory_usage(&self) -> usize;
36
37 fn eviction_priority(&self) -> u8;
39
40 fn region(&self) -> MemoryRegion;
42
43 fn evict(&self, target_bytes: usize) -> usize;
47
48 fn can_spill(&self) -> bool {
50 false
51 }
52
53 fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
61 Err(SpillError::NotSupported)
62 }
63
64 fn reload(&self) -> Result<(), SpillError> {
70 Ok(())
71 }
72}
73
74pub mod priorities {
78 pub const SPILL_STAGING: u8 = 10;
80
81 pub const QUERY_CACHE: u8 = 30;
83
84 pub const EXECUTION_BUFFERS: u8 = 40;
86
87 pub const INDEX_BUFFERS: u8 = 50;
89
90 pub const IDLE_TRANSACTION: u8 = 70;
92
93 pub const GRAPH_STORAGE: u8 = 100;
95
96 pub const ACTIVE_TRANSACTION: u8 = 200;
98}
99
100#[derive(Debug, Clone)]
102pub struct ConsumerStats {
103 pub name: String,
105 pub region: MemoryRegion,
107 pub usage_bytes: usize,
109 pub priority: u8,
111 pub can_spill: bool,
113}
114
115impl ConsumerStats {
116 pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
118 Self {
119 name: consumer.name().to_string(),
120 region: consumer.region(),
121 usage_bytes: consumer.memory_usage(),
122 priority: consumer.eviction_priority(),
123 can_spill: consumer.can_spill(),
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use std::sync::atomic::{AtomicUsize, Ordering};
132
133 struct TestConsumer {
134 name: String,
135 usage: AtomicUsize,
136 priority: u8,
137 region: MemoryRegion,
138 }
139
140 impl TestConsumer {
141 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
142 Self {
143 name: name.to_string(),
144 usage: AtomicUsize::new(usage),
145 priority,
146 region,
147 }
148 }
149 }
150
151 impl MemoryConsumer for TestConsumer {
152 fn name(&self) -> &str {
153 &self.name
154 }
155
156 fn memory_usage(&self) -> usize {
157 self.usage.load(Ordering::Relaxed)
158 }
159
160 fn eviction_priority(&self) -> u8 {
161 self.priority
162 }
163
164 fn region(&self) -> MemoryRegion {
165 self.region
166 }
167
168 fn evict(&self, target_bytes: usize) -> usize {
169 let current = self.usage.load(Ordering::Relaxed);
170 let to_evict = target_bytes.min(current);
171 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
172 to_evict
173 }
174 }
175
176 #[test]
177 fn test_consumer_stats() {
178 let consumer = TestConsumer::new(
179 "test",
180 1024,
181 priorities::INDEX_BUFFERS,
182 MemoryRegion::IndexBuffers,
183 );
184
185 let stats = ConsumerStats::from_consumer(&consumer);
186 assert_eq!(stats.name, "test");
187 assert_eq!(stats.usage_bytes, 1024);
188 assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
189 assert_eq!(stats.region, MemoryRegion::IndexBuffers);
190 assert!(!stats.can_spill);
191 }
192
193 #[test]
194 fn test_consumer_eviction() {
195 let consumer = TestConsumer::new(
196 "test",
197 1000,
198 priorities::INDEX_BUFFERS,
199 MemoryRegion::IndexBuffers,
200 );
201
202 let freed = consumer.evict(500);
203 assert_eq!(freed, 500);
204 assert_eq!(consumer.memory_usage(), 500);
205
206 let freed = consumer.evict(1000);
208 assert_eq!(freed, 500);
209 assert_eq!(consumer.memory_usage(), 0);
210 }
211
212 #[test]
213 #[allow(clippy::assertions_on_constants)]
214 fn test_priority_ordering() {
215 assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
216 assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
217 assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
218 assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
219 assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
220 }
221}