grafeo_common/memory/buffer/
consumer.rs1use thiserror::Error;
4
5use super::region::MemoryRegion;
6
7#[derive(Error, Debug, Clone)]
9#[non_exhaustive]
10pub enum SpillError {
11 #[error("spilling not supported")]
13 NotSupported,
14 #[error("I/O error during spill: {0}")]
16 IoError(String),
17 #[error("spill directory not configured")]
19 NoSpillDirectory,
20 #[error("insufficient disk space for spill")]
22 InsufficientDiskSpace,
23}
24
25pub trait MemoryConsumer: Send + Sync {
31 fn name(&self) -> &str;
33
34 fn memory_usage(&self) -> usize;
36
37 fn eviction_priority(&self) -> u8;
39
40 fn region(&self) -> MemoryRegion;
42
43 fn evict(&self, target_bytes: usize) -> usize;
47
48 fn can_spill(&self) -> bool {
50 false
51 }
52
53 fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
61 Err(SpillError::NotSupported)
62 }
63
64 fn reload(&self) -> Result<(), SpillError> {
70 Ok(())
71 }
72
73 fn current_tier(&self) -> super::tiered::StorageTier;
90}
91
92pub mod priorities {
96 pub const SPILL_STAGING: u8 = 10;
98
99 pub const QUERY_CACHE: u8 = 30;
101
102 pub const EXECUTION_BUFFERS: u8 = 40;
104
105 pub const INDEX_BUFFERS: u8 = 50;
107
108 pub const IDLE_TRANSACTION: u8 = 70;
110
111 pub const GRAPH_STORAGE: u8 = 100;
113
114 pub const ACTIVE_TRANSACTION: u8 = 200;
116}
117
118#[derive(Debug, Clone)]
120pub struct ConsumerStats {
121 pub name: String,
123 pub region: MemoryRegion,
125 pub usage_bytes: usize,
127 pub priority: u8,
129 pub can_spill: bool,
131}
132
133impl ConsumerStats {
134 pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
136 Self {
137 name: consumer.name().to_string(),
138 region: consumer.region(),
139 usage_bytes: consumer.memory_usage(),
140 priority: consumer.eviction_priority(),
141 can_spill: consumer.can_spill(),
142 }
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use std::sync::atomic::{AtomicUsize, Ordering};
150
151 struct TestConsumer {
152 name: String,
153 usage: AtomicUsize,
154 priority: u8,
155 region: MemoryRegion,
156 }
157
158 impl TestConsumer {
159 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
160 Self {
161 name: name.to_string(),
162 usage: AtomicUsize::new(usage),
163 priority,
164 region,
165 }
166 }
167 }
168
169 impl MemoryConsumer for TestConsumer {
170 fn name(&self) -> &str {
171 &self.name
172 }
173
174 fn memory_usage(&self) -> usize {
175 self.usage.load(Ordering::Relaxed)
176 }
177
178 fn eviction_priority(&self) -> u8 {
179 self.priority
180 }
181
182 fn region(&self) -> MemoryRegion {
183 self.region
184 }
185
186 fn evict(&self, target_bytes: usize) -> usize {
187 let current = self.usage.load(Ordering::Relaxed);
188 let to_evict = target_bytes.min(current);
189 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
190 to_evict
191 }
192
193 fn current_tier(&self) -> super::super::tiered::StorageTier {
194 if self.memory_usage() == 0 {
195 super::super::tiered::StorageTier::Uninitialized
196 } else {
197 super::super::tiered::StorageTier::InMemory
198 }
199 }
200 }
201
202 #[test]
203 fn test_consumer_stats() {
204 let consumer = TestConsumer::new(
205 "test",
206 1024,
207 priorities::INDEX_BUFFERS,
208 MemoryRegion::IndexBuffers,
209 );
210
211 let stats = ConsumerStats::from_consumer(&consumer);
212 assert_eq!(stats.name, "test");
213 assert_eq!(stats.usage_bytes, 1024);
214 assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
215 assert_eq!(stats.region, MemoryRegion::IndexBuffers);
216 assert!(!stats.can_spill);
217 }
218
219 #[test]
220 fn test_consumer_eviction() {
221 let consumer = TestConsumer::new(
222 "test",
223 1000,
224 priorities::INDEX_BUFFERS,
225 MemoryRegion::IndexBuffers,
226 );
227
228 let freed = consumer.evict(500);
229 assert_eq!(freed, 500);
230 assert_eq!(consumer.memory_usage(), 500);
231
232 let freed = consumer.evict(1000);
234 assert_eq!(freed, 500);
235 assert_eq!(consumer.memory_usage(), 0);
236 }
237
238 #[test]
239 #[allow(clippy::assertions_on_constants)]
240 fn test_priority_ordering() {
241 assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
242 assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
243 assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
244 assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
245 assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
246 }
247}