graphos_common/memory/buffer/
consumer.rs1use thiserror::Error;
4
5use super::region::MemoryRegion;
6
7#[derive(Error, Debug, Clone)]
9pub enum SpillError {
10 #[error("spilling not supported")]
12 NotSupported,
13 #[error("I/O error during spill: {0}")]
15 IoError(String),
16 #[error("spill directory not configured")]
18 NoSpillDirectory,
19 #[error("insufficient disk space for spill")]
21 InsufficientDiskSpace,
22}
23
24pub trait MemoryConsumer: Send + Sync {
30 fn name(&self) -> &str;
32
33 fn memory_usage(&self) -> usize;
35
36 fn eviction_priority(&self) -> u8;
38
39 fn region(&self) -> MemoryRegion;
41
42 fn evict(&self, target_bytes: usize) -> usize;
46
47 fn can_spill(&self) -> bool {
49 false
50 }
51
52 fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
60 Err(SpillError::NotSupported)
61 }
62
63 fn reload(&self) -> Result<(), SpillError> {
69 Ok(())
70 }
71}
72
73pub mod priorities {
77 pub const SPILL_STAGING: u8 = 10;
79
80 pub const QUERY_CACHE: u8 = 30;
82
83 pub const INDEX_BUFFERS: u8 = 50;
85
86 pub const IDLE_TRANSACTION: u8 = 70;
88
89 pub const GRAPH_STORAGE: u8 = 100;
91
92 pub const ACTIVE_TRANSACTION: u8 = 200;
94}
95
96#[derive(Debug, Clone)]
98pub struct ConsumerStats {
99 pub name: String,
101 pub region: MemoryRegion,
103 pub usage_bytes: usize,
105 pub priority: u8,
107 pub can_spill: bool,
109}
110
111impl ConsumerStats {
112 pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
114 Self {
115 name: consumer.name().to_string(),
116 region: consumer.region(),
117 usage_bytes: consumer.memory_usage(),
118 priority: consumer.eviction_priority(),
119 can_spill: consumer.can_spill(),
120 }
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use std::sync::atomic::{AtomicUsize, Ordering};
128
129 struct TestConsumer {
130 name: String,
131 usage: AtomicUsize,
132 priority: u8,
133 region: MemoryRegion,
134 }
135
136 impl TestConsumer {
137 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
138 Self {
139 name: name.to_string(),
140 usage: AtomicUsize::new(usage),
141 priority,
142 region,
143 }
144 }
145 }
146
147 impl MemoryConsumer for TestConsumer {
148 fn name(&self) -> &str {
149 &self.name
150 }
151
152 fn memory_usage(&self) -> usize {
153 self.usage.load(Ordering::Relaxed)
154 }
155
156 fn eviction_priority(&self) -> u8 {
157 self.priority
158 }
159
160 fn region(&self) -> MemoryRegion {
161 self.region
162 }
163
164 fn evict(&self, target_bytes: usize) -> usize {
165 let current = self.usage.load(Ordering::Relaxed);
166 let to_evict = target_bytes.min(current);
167 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
168 to_evict
169 }
170 }
171
172 #[test]
173 fn test_consumer_stats() {
174 let consumer = TestConsumer::new(
175 "test",
176 1024,
177 priorities::INDEX_BUFFERS,
178 MemoryRegion::IndexBuffers,
179 );
180
181 let stats = ConsumerStats::from_consumer(&consumer);
182 assert_eq!(stats.name, "test");
183 assert_eq!(stats.usage_bytes, 1024);
184 assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
185 assert_eq!(stats.region, MemoryRegion::IndexBuffers);
186 assert!(!stats.can_spill);
187 }
188
189 #[test]
190 fn test_consumer_eviction() {
191 let consumer = TestConsumer::new(
192 "test",
193 1000,
194 priorities::INDEX_BUFFERS,
195 MemoryRegion::IndexBuffers,
196 );
197
198 let freed = consumer.evict(500);
199 assert_eq!(freed, 500);
200 assert_eq!(consumer.memory_usage(), 500);
201
202 let freed = consumer.evict(1000);
204 assert_eq!(freed, 500);
205 assert_eq!(consumer.memory_usage(), 0);
206 }
207
208 #[test]
209 #[allow(clippy::assertions_on_constants)]
210 fn test_priority_ordering() {
211 assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
212 assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
213 assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
214 assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
215 assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
216 }
217}