grafeo_core/execution/
memory.rs1use grafeo_common::memory::buffer::{BufferManager, MemoryGrant, MemoryRegion, PressureLevel};
4use std::sync::Arc;
5
6pub const DEFAULT_CHUNK_SIZE: usize = 2048;
8
9pub const MODERATE_PRESSURE_CHUNK_SIZE: usize = 1024;
11
12pub const HIGH_PRESSURE_CHUNK_SIZE: usize = 512;
14
15pub const CRITICAL_PRESSURE_CHUNK_SIZE: usize = 256;
17
18pub struct ExecutionMemoryContext {
23 manager: Arc<BufferManager>,
25 allocated: usize,
27 grants: Vec<MemoryGrant>,
29}
30
31impl ExecutionMemoryContext {
32 #[must_use]
34 pub fn new(manager: Arc<BufferManager>) -> Self {
35 Self {
36 manager,
37 allocated: 0,
38 grants: Vec::new(),
39 }
40 }
41
42 pub fn allocate(&mut self, size: usize) -> Option<MemoryGrant> {
46 let grant = self
47 .manager
48 .try_allocate(size, MemoryRegion::ExecutionBuffers)?;
49 self.allocated += size;
50 Some(grant)
51 }
52
53 pub fn allocate_tracked(&mut self, size: usize) -> bool {
57 if let Some(grant) = self
58 .manager
59 .try_allocate(size, MemoryRegion::ExecutionBuffers)
60 {
61 self.allocated += size;
62 self.grants.push(grant);
63 true
64 } else {
65 false
66 }
67 }
68
69 #[must_use]
71 pub fn pressure_level(&self) -> PressureLevel {
72 self.manager.pressure_level()
73 }
74
75 #[must_use]
77 pub fn should_reduce_chunk_size(&self) -> bool {
78 matches!(
79 self.pressure_level(),
80 PressureLevel::High | PressureLevel::Critical
81 )
82 }
83
84 #[must_use]
86 pub fn adjusted_chunk_size(&self, requested: usize) -> usize {
87 match self.pressure_level() {
88 PressureLevel::Normal => requested,
89 PressureLevel::Moderate => requested.min(MODERATE_PRESSURE_CHUNK_SIZE),
90 PressureLevel::High => requested.min(HIGH_PRESSURE_CHUNK_SIZE),
91 PressureLevel::Critical => requested.min(CRITICAL_PRESSURE_CHUNK_SIZE),
92 _ => requested.min(CRITICAL_PRESSURE_CHUNK_SIZE),
93 }
94 }
95
96 #[must_use]
98 pub fn optimal_chunk_size(&self) -> usize {
99 self.adjusted_chunk_size(DEFAULT_CHUNK_SIZE)
100 }
101
102 #[must_use]
104 pub fn total_allocated(&self) -> usize {
105 self.allocated
106 }
107
108 #[must_use]
110 pub fn manager(&self) -> &Arc<BufferManager> {
111 &self.manager
112 }
113
114 pub fn release_all(&mut self) {
116 self.grants.clear();
117 self.allocated = 0;
118 }
119}
120
121impl Drop for ExecutionMemoryContext {
122 fn drop(&mut self) {
123 self.grants.clear();
125 }
126}
127
128pub struct ExecutionMemoryContextBuilder {
130 manager: Arc<BufferManager>,
131 initial_allocation: usize,
132}
133
134impl ExecutionMemoryContextBuilder {
135 #[must_use]
137 pub fn new(manager: Arc<BufferManager>) -> Self {
138 Self {
139 manager,
140 initial_allocation: 0,
141 }
142 }
143
144 #[must_use]
146 pub fn with_initial_allocation(mut self, size: usize) -> Self {
147 self.initial_allocation = size;
148 self
149 }
150
151 pub fn build(self) -> Option<ExecutionMemoryContext> {
155 let mut ctx = ExecutionMemoryContext::new(self.manager);
156
157 if self.initial_allocation > 0 && !ctx.allocate_tracked(self.initial_allocation) {
158 return None;
159 }
160
161 Some(ctx)
162 }
163}
164
165#[cfg(feature = "spill")]
172#[derive(Clone)]
173pub struct OperatorMemoryContext {
174 buffer_manager: Arc<BufferManager>,
176 spill_manager: Arc<super::spill::SpillManager>,
178}
179
180#[cfg(feature = "spill")]
181impl OperatorMemoryContext {
182 #[must_use]
184 pub fn new(
185 buffer_manager: Arc<BufferManager>,
186 spill_manager: Arc<super::spill::SpillManager>,
187 ) -> Self {
188 Self {
189 buffer_manager,
190 spill_manager,
191 }
192 }
193
194 #[must_use]
196 pub fn pressure_level(&self) -> PressureLevel {
197 self.buffer_manager.pressure_level()
198 }
199
200 #[must_use]
202 pub fn should_spill(&self) -> bool {
203 self.pressure_level().should_spill()
204 }
205
206 pub fn register_consumer(
208 &self,
209 consumer: Arc<dyn grafeo_common::memory::buffer::MemoryConsumer>,
210 ) {
211 self.buffer_manager.register_consumer(consumer);
212 }
213
214 pub fn unregister_consumer(&self, name: &str) {
216 self.buffer_manager.unregister_consumer(name);
217 }
218
219 #[must_use]
221 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
222 &self.buffer_manager
223 }
224
225 #[must_use]
227 pub fn spill_manager(&self) -> &Arc<super::spill::SpillManager> {
228 &self.spill_manager
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use grafeo_common::memory::buffer::BufferManagerConfig;
236
237 #[test]
238 fn test_execution_context_creation() {
239 let manager = BufferManager::with_budget(1024 * 1024);
240 let ctx = ExecutionMemoryContext::new(manager);
241
242 assert_eq!(ctx.total_allocated(), 0);
243 assert_eq!(ctx.pressure_level(), PressureLevel::Normal);
244 }
245
246 #[test]
247 fn test_execution_context_allocation() {
248 let manager = BufferManager::with_budget(1024 * 1024);
249 let mut ctx = ExecutionMemoryContext::new(manager);
250
251 let grant = ctx.allocate(1024);
252 assert!(grant.is_some());
253 assert_eq!(ctx.total_allocated(), 1024);
254 }
255
256 #[test]
257 fn test_execution_context_tracked_allocation() {
258 let manager = BufferManager::with_budget(1024 * 1024);
259 let mut ctx = ExecutionMemoryContext::new(manager);
260
261 assert!(ctx.allocate_tracked(1024));
262 assert_eq!(ctx.total_allocated(), 1024);
263
264 ctx.release_all();
265 assert_eq!(ctx.total_allocated(), 0);
266 }
267
268 #[test]
269 fn test_adjusted_chunk_size_normal() {
270 let manager = BufferManager::with_budget(1024 * 1024);
271 let ctx = ExecutionMemoryContext::new(manager);
272
273 assert_eq!(ctx.adjusted_chunk_size(2048), 2048);
274 assert_eq!(ctx.optimal_chunk_size(), 2048);
275 }
276
277 #[test]
278 fn test_adjusted_chunk_size_under_pressure() {
279 let config = BufferManagerConfig {
280 budget: 1000,
281 soft_limit_fraction: 0.70,
282 evict_limit_fraction: 0.85,
283 hard_limit_fraction: 0.95,
284 background_eviction: false,
285 spill_path: None,
286 };
287 let manager = BufferManager::new(config);
288
289 let _g = manager.try_allocate(860, MemoryRegion::ExecutionBuffers);
291
292 let ctx = ExecutionMemoryContext::new(manager);
293 assert_eq!(ctx.pressure_level(), PressureLevel::High);
294 assert_eq!(ctx.adjusted_chunk_size(2048), HIGH_PRESSURE_CHUNK_SIZE);
295 assert!(ctx.should_reduce_chunk_size());
296 }
297
298 #[test]
299 fn test_builder() {
300 let manager = BufferManager::with_budget(1024 * 1024);
301
302 let ctx = ExecutionMemoryContextBuilder::new(manager)
303 .with_initial_allocation(4096)
304 .build();
305
306 assert!(ctx.is_some());
307 let ctx = ctx.unwrap();
308 assert_eq!(ctx.total_allocated(), 4096);
309 }
310
311 #[test]
312 fn test_builder_insufficient_memory() {
313 let manager = BufferManager::with_budget(1000);
314
315 let ctx = ExecutionMemoryContextBuilder::new(manager)
317 .with_initial_allocation(10000)
318 .build();
319
320 assert!(ctx.is_none());
321 }
322}