grafeo_common/memory/buffer/
consumer.rs1use thiserror::Error;
4
5use super::region::MemoryRegion;
6
7#[derive(Error, Debug, Clone)]
9#[non_exhaustive]
10pub enum SpillError {
11 #[error("spilling not supported")]
13 NotSupported,
14 #[error("I/O error during spill: {0}")]
16 IoError(String),
17 #[error("spill directory not configured")]
19 NoSpillDirectory,
20 #[error("insufficient disk space for spill")]
22 InsufficientDiskSpace,
23}
24
25pub trait MemoryConsumer: Send + Sync {
31 fn name(&self) -> &str;
33
34 fn memory_usage(&self) -> usize;
36
37 fn eviction_priority(&self) -> u8;
39
40 fn region(&self) -> MemoryRegion;
42
43 fn evict(&self, target_bytes: usize) -> usize;
47
48 fn can_spill(&self) -> bool {
50 false
51 }
52
53 fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
61 Err(SpillError::NotSupported)
62 }
63
64 fn reload(&self) -> Result<(), SpillError> {
70 Ok(())
71 }
72}
73
74pub mod priorities {
78 pub const SPILL_STAGING: u8 = 10;
80
81 pub const QUERY_CACHE: u8 = 30;
83
84 pub const INDEX_BUFFERS: u8 = 50;
86
87 pub const IDLE_TRANSACTION: u8 = 70;
89
90 pub const GRAPH_STORAGE: u8 = 100;
92
93 pub const ACTIVE_TRANSACTION: u8 = 200;
95}
96
97#[derive(Debug, Clone)]
99pub struct ConsumerStats {
100 pub name: String,
102 pub region: MemoryRegion,
104 pub usage_bytes: usize,
106 pub priority: u8,
108 pub can_spill: bool,
110}
111
112impl ConsumerStats {
113 pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
115 Self {
116 name: consumer.name().to_string(),
117 region: consumer.region(),
118 usage_bytes: consumer.memory_usage(),
119 priority: consumer.eviction_priority(),
120 can_spill: consumer.can_spill(),
121 }
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use std::sync::atomic::{AtomicUsize, Ordering};
129
130 struct TestConsumer {
131 name: String,
132 usage: AtomicUsize,
133 priority: u8,
134 region: MemoryRegion,
135 }
136
137 impl TestConsumer {
138 fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
139 Self {
140 name: name.to_string(),
141 usage: AtomicUsize::new(usage),
142 priority,
143 region,
144 }
145 }
146 }
147
148 impl MemoryConsumer for TestConsumer {
149 fn name(&self) -> &str {
150 &self.name
151 }
152
153 fn memory_usage(&self) -> usize {
154 self.usage.load(Ordering::Relaxed)
155 }
156
157 fn eviction_priority(&self) -> u8 {
158 self.priority
159 }
160
161 fn region(&self) -> MemoryRegion {
162 self.region
163 }
164
165 fn evict(&self, target_bytes: usize) -> usize {
166 let current = self.usage.load(Ordering::Relaxed);
167 let to_evict = target_bytes.min(current);
168 self.usage.fetch_sub(to_evict, Ordering::Relaxed);
169 to_evict
170 }
171 }
172
173 #[test]
174 fn test_consumer_stats() {
175 let consumer = TestConsumer::new(
176 "test",
177 1024,
178 priorities::INDEX_BUFFERS,
179 MemoryRegion::IndexBuffers,
180 );
181
182 let stats = ConsumerStats::from_consumer(&consumer);
183 assert_eq!(stats.name, "test");
184 assert_eq!(stats.usage_bytes, 1024);
185 assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
186 assert_eq!(stats.region, MemoryRegion::IndexBuffers);
187 assert!(!stats.can_spill);
188 }
189
190 #[test]
191 fn test_consumer_eviction() {
192 let consumer = TestConsumer::new(
193 "test",
194 1000,
195 priorities::INDEX_BUFFERS,
196 MemoryRegion::IndexBuffers,
197 );
198
199 let freed = consumer.evict(500);
200 assert_eq!(freed, 500);
201 assert_eq!(consumer.memory_usage(), 500);
202
203 let freed = consumer.evict(1000);
205 assert_eq!(freed, 500);
206 assert_eq!(consumer.memory_usage(), 0);
207 }
208
209 #[test]
210 #[allow(clippy::assertions_on_constants)]
211 fn test_priority_ordering() {
212 assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
213 assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
214 assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
215 assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
216 assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
217 }
218}