grafeo_core/execution/
memory.rs1use grafeo_common::memory::buffer::{BufferManager, MemoryGrant, MemoryRegion, PressureLevel};
4use std::sync::Arc;
5
6pub const DEFAULT_CHUNK_SIZE: usize = 2048;
8
9pub const MODERATE_PRESSURE_CHUNK_SIZE: usize = 1024;
11
12pub const HIGH_PRESSURE_CHUNK_SIZE: usize = 512;
14
15pub const CRITICAL_PRESSURE_CHUNK_SIZE: usize = 256;
17
18pub struct ExecutionMemoryContext {
23 manager: Arc<BufferManager>,
25 allocated: usize,
27 grants: Vec<MemoryGrant>,
29}
30
31impl ExecutionMemoryContext {
32 #[must_use]
34 pub fn new(manager: Arc<BufferManager>) -> Self {
35 Self {
36 manager,
37 allocated: 0,
38 grants: Vec::new(),
39 }
40 }
41
42 pub fn allocate(&mut self, size: usize) -> Option<MemoryGrant> {
46 let grant = self
47 .manager
48 .try_allocate(size, MemoryRegion::ExecutionBuffers)?;
49 self.allocated += size;
50 Some(grant)
51 }
52
53 pub fn allocate_tracked(&mut self, size: usize) -> bool {
57 if let Some(grant) = self
58 .manager
59 .try_allocate(size, MemoryRegion::ExecutionBuffers)
60 {
61 self.allocated += size;
62 self.grants.push(grant);
63 true
64 } else {
65 false
66 }
67 }
68
69 #[must_use]
71 pub fn pressure_level(&self) -> PressureLevel {
72 self.manager.pressure_level()
73 }
74
75 #[must_use]
77 pub fn should_reduce_chunk_size(&self) -> bool {
78 matches!(
79 self.pressure_level(),
80 PressureLevel::High | PressureLevel::Critical
81 )
82 }
83
84 #[must_use]
86 pub fn adjusted_chunk_size(&self, requested: usize) -> usize {
87 match self.pressure_level() {
88 PressureLevel::Normal => requested,
89 PressureLevel::Moderate => requested.min(MODERATE_PRESSURE_CHUNK_SIZE),
90 PressureLevel::High => requested.min(HIGH_PRESSURE_CHUNK_SIZE),
91 PressureLevel::Critical => requested.min(CRITICAL_PRESSURE_CHUNK_SIZE),
92 }
93 }
94
95 #[must_use]
97 pub fn optimal_chunk_size(&self) -> usize {
98 self.adjusted_chunk_size(DEFAULT_CHUNK_SIZE)
99 }
100
101 #[must_use]
103 pub fn total_allocated(&self) -> usize {
104 self.allocated
105 }
106
107 #[must_use]
109 pub fn manager(&self) -> &Arc<BufferManager> {
110 &self.manager
111 }
112
113 pub fn release_all(&mut self) {
115 self.grants.clear();
116 self.allocated = 0;
117 }
118}
119
120impl Drop for ExecutionMemoryContext {
121 fn drop(&mut self) {
122 self.grants.clear();
124 }
125}
126
127pub struct ExecutionMemoryContextBuilder {
129 manager: Arc<BufferManager>,
130 initial_allocation: usize,
131}
132
133impl ExecutionMemoryContextBuilder {
134 #[must_use]
136 pub fn new(manager: Arc<BufferManager>) -> Self {
137 Self {
138 manager,
139 initial_allocation: 0,
140 }
141 }
142
143 #[must_use]
145 pub fn with_initial_allocation(mut self, size: usize) -> Self {
146 self.initial_allocation = size;
147 self
148 }
149
150 pub fn build(self) -> Option<ExecutionMemoryContext> {
154 let mut ctx = ExecutionMemoryContext::new(self.manager);
155
156 if self.initial_allocation > 0 && !ctx.allocate_tracked(self.initial_allocation) {
157 return None;
158 }
159
160 Some(ctx)
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use grafeo_common::memory::buffer::BufferManagerConfig;
168
169 #[test]
170 fn test_execution_context_creation() {
171 let manager = BufferManager::with_budget(1024 * 1024);
172 let ctx = ExecutionMemoryContext::new(manager);
173
174 assert_eq!(ctx.total_allocated(), 0);
175 assert_eq!(ctx.pressure_level(), PressureLevel::Normal);
176 }
177
178 #[test]
179 fn test_execution_context_allocation() {
180 let manager = BufferManager::with_budget(1024 * 1024);
181 let mut ctx = ExecutionMemoryContext::new(manager);
182
183 let grant = ctx.allocate(1024);
184 assert!(grant.is_some());
185 assert_eq!(ctx.total_allocated(), 1024);
186 }
187
188 #[test]
189 fn test_execution_context_tracked_allocation() {
190 let manager = BufferManager::with_budget(1024 * 1024);
191 let mut ctx = ExecutionMemoryContext::new(manager);
192
193 assert!(ctx.allocate_tracked(1024));
194 assert_eq!(ctx.total_allocated(), 1024);
195
196 ctx.release_all();
197 assert_eq!(ctx.total_allocated(), 0);
198 }
199
200 #[test]
201 fn test_adjusted_chunk_size_normal() {
202 let manager = BufferManager::with_budget(1024 * 1024);
203 let ctx = ExecutionMemoryContext::new(manager);
204
205 assert_eq!(ctx.adjusted_chunk_size(2048), 2048);
206 assert_eq!(ctx.optimal_chunk_size(), 2048);
207 }
208
209 #[test]
210 fn test_adjusted_chunk_size_under_pressure() {
211 let config = BufferManagerConfig {
212 budget: 1000,
213 soft_limit_fraction: 0.70,
214 evict_limit_fraction: 0.85,
215 hard_limit_fraction: 0.95,
216 background_eviction: false,
217 spill_path: None,
218 };
219 let manager = BufferManager::new(config);
220
221 let _g = manager.try_allocate(860, MemoryRegion::ExecutionBuffers);
223
224 let ctx = ExecutionMemoryContext::new(manager);
225 assert_eq!(ctx.pressure_level(), PressureLevel::High);
226 assert_eq!(ctx.adjusted_chunk_size(2048), HIGH_PRESSURE_CHUNK_SIZE);
227 assert!(ctx.should_reduce_chunk_size());
228 }
229
230 #[test]
231 fn test_builder() {
232 let manager = BufferManager::with_budget(1024 * 1024);
233
234 let ctx = ExecutionMemoryContextBuilder::new(manager)
235 .with_initial_allocation(4096)
236 .build();
237
238 assert!(ctx.is_some());
239 let ctx = ctx.unwrap();
240 assert_eq!(ctx.total_allocated(), 4096);
241 }
242
243 #[test]
244 fn test_builder_insufficient_memory() {
245 let manager = BufferManager::with_budget(1000);
246
247 let ctx = ExecutionMemoryContextBuilder::new(manager)
249 .with_initial_allocation(10000)
250 .build();
251
252 assert!(ctx.is_none());
253 }
254}