Skip to main content

grafeo_core/execution/
memory.rs

1//! Execution memory context for memory-aware query execution.
2
3use grafeo_common::memory::buffer::{BufferManager, MemoryGrant, MemoryRegion, PressureLevel};
4use std::sync::Arc;
5
6/// Default chunk size for execution buffers.
7pub const DEFAULT_CHUNK_SIZE: usize = 2048;
8
9/// Chunk size under moderate memory pressure.
10pub const MODERATE_PRESSURE_CHUNK_SIZE: usize = 1024;
11
12/// Chunk size under high memory pressure.
13pub const HIGH_PRESSURE_CHUNK_SIZE: usize = 512;
14
15/// Chunk size under critical memory pressure.
16pub const CRITICAL_PRESSURE_CHUNK_SIZE: usize = 256;
17
18/// Execution context with memory awareness.
19///
20/// This context provides memory allocation for query execution operators
21/// and adjusts chunk sizes based on memory pressure.
22pub struct ExecutionMemoryContext {
23    /// Reference to the buffer manager.
24    manager: Arc<BufferManager>,
25    /// Total bytes allocated for this execution context.
26    allocated: usize,
27    /// Grants held by this context.
28    grants: Vec<MemoryGrant>,
29}
30
31impl ExecutionMemoryContext {
32    /// Creates a new execution memory context.
33    #[must_use]
34    pub fn new(manager: Arc<BufferManager>) -> Self {
35        Self {
36            manager,
37            allocated: 0,
38            grants: Vec::new(),
39        }
40    }
41
42    /// Requests memory for execution buffers.
43    ///
44    /// Returns `None` if the allocation cannot be satisfied.
45    pub fn allocate(&mut self, size: usize) -> Option<MemoryGrant> {
46        let grant = self
47            .manager
48            .try_allocate(size, MemoryRegion::ExecutionBuffers)?;
49        self.allocated += size;
50        Some(grant)
51    }
52
53    /// Allocates and stores a grant internally.
54    ///
55    /// The grant will be released when this context is dropped.
56    pub fn allocate_tracked(&mut self, size: usize) -> bool {
57        if let Some(grant) = self
58            .manager
59            .try_allocate(size, MemoryRegion::ExecutionBuffers)
60        {
61            self.allocated += size;
62            self.grants.push(grant);
63            true
64        } else {
65            false
66        }
67    }
68
69    /// Returns the current pressure level.
70    #[must_use]
71    pub fn pressure_level(&self) -> PressureLevel {
72        self.manager.pressure_level()
73    }
74
75    /// Returns whether chunk size should be reduced due to memory pressure.
76    #[must_use]
77    pub fn should_reduce_chunk_size(&self) -> bool {
78        matches!(
79            self.pressure_level(),
80            PressureLevel::High | PressureLevel::Critical
81        )
82    }
83
84    /// Computes adjusted chunk size based on memory pressure.
85    #[must_use]
86    pub fn adjusted_chunk_size(&self, requested: usize) -> usize {
87        match self.pressure_level() {
88            PressureLevel::Normal => requested,
89            PressureLevel::Moderate => requested.min(MODERATE_PRESSURE_CHUNK_SIZE),
90            PressureLevel::High => requested.min(HIGH_PRESSURE_CHUNK_SIZE),
91            PressureLevel::Critical => requested.min(CRITICAL_PRESSURE_CHUNK_SIZE),
92            _ => requested.min(CRITICAL_PRESSURE_CHUNK_SIZE),
93        }
94    }
95
96    /// Returns the optimal chunk size for the current memory state.
97    #[must_use]
98    pub fn optimal_chunk_size(&self) -> usize {
99        self.adjusted_chunk_size(DEFAULT_CHUNK_SIZE)
100    }
101
102    /// Returns total bytes allocated through this context.
103    #[must_use]
104    pub fn total_allocated(&self) -> usize {
105        self.allocated
106    }
107
108    /// Returns the buffer manager.
109    #[must_use]
110    pub fn manager(&self) -> &Arc<BufferManager> {
111        &self.manager
112    }
113
114    /// Releases all tracked grants.
115    pub fn release_all(&mut self) {
116        self.grants.clear();
117        self.allocated = 0;
118    }
119}
120
121impl Drop for ExecutionMemoryContext {
122    fn drop(&mut self) {
123        // Grants are automatically released when dropped
124        self.grants.clear();
125    }
126}
127
128/// Builder for execution memory contexts with pre-allocation.
129pub struct ExecutionMemoryContextBuilder {
130    manager: Arc<BufferManager>,
131    initial_allocation: usize,
132}
133
134impl ExecutionMemoryContextBuilder {
135    /// Creates a new builder with the given buffer manager.
136    #[must_use]
137    pub fn new(manager: Arc<BufferManager>) -> Self {
138        Self {
139            manager,
140            initial_allocation: 0,
141        }
142    }
143
144    /// Sets the initial allocation size.
145    #[must_use]
146    pub fn with_initial_allocation(mut self, size: usize) -> Self {
147        self.initial_allocation = size;
148        self
149    }
150
151    /// Builds the execution memory context.
152    ///
153    /// Returns `None` if the initial allocation cannot be satisfied.
154    pub fn build(self) -> Option<ExecutionMemoryContext> {
155        let mut ctx = ExecutionMemoryContext::new(self.manager);
156
157        if self.initial_allocation > 0 && !ctx.allocate_tracked(self.initial_allocation) {
158            return None;
159        }
160
161        Some(ctx)
162    }
163}
164
165/// Memory context passed to pipeline operators for pressure-aware spill decisions.
166///
167/// Lightweight and `Clone`-able (two `Arc` references). Operators that support
168/// memory-aware spilling receive this at construction time. It provides access
169/// to the `BufferManager` for pressure queries and consumer registration, and
170/// to a per-query `SpillManager` for creating spill files.
171#[cfg(feature = "spill")]
172#[derive(Clone)]
173pub struct OperatorMemoryContext {
174    /// The buffer manager for pressure queries and consumer registration.
175    buffer_manager: Arc<BufferManager>,
176    /// Per-query spill manager for creating spill files.
177    spill_manager: Arc<super::spill::SpillManager>,
178}
179
180#[cfg(feature = "spill")]
181impl OperatorMemoryContext {
182    /// Creates a new operator memory context.
183    #[must_use]
184    pub fn new(
185        buffer_manager: Arc<BufferManager>,
186        spill_manager: Arc<super::spill::SpillManager>,
187    ) -> Self {
188        Self {
189            buffer_manager,
190            spill_manager,
191        }
192    }
193
194    /// Returns the current pressure level.
195    #[must_use]
196    pub fn pressure_level(&self) -> PressureLevel {
197        self.buffer_manager.pressure_level()
198    }
199
200    /// Returns whether system pressure warrants spilling (High or Critical).
201    #[must_use]
202    pub fn should_spill(&self) -> bool {
203        self.pressure_level().should_spill()
204    }
205
206    /// Registers a memory consumer with the buffer manager.
207    pub fn register_consumer(
208        &self,
209        consumer: Arc<dyn grafeo_common::memory::buffer::MemoryConsumer>,
210    ) {
211        self.buffer_manager.register_consumer(consumer);
212    }
213
214    /// Unregisters a memory consumer by name.
215    pub fn unregister_consumer(&self, name: &str) {
216        self.buffer_manager.unregister_consumer(name);
217    }
218
219    /// Returns a reference to the buffer manager.
220    #[must_use]
221    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
222        &self.buffer_manager
223    }
224
225    /// Returns a reference to the spill manager.
226    #[must_use]
227    pub fn spill_manager(&self) -> &Arc<super::spill::SpillManager> {
228        &self.spill_manager
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use grafeo_common::memory::buffer::BufferManagerConfig;
236
237    #[test]
238    fn test_execution_context_creation() {
239        let manager = BufferManager::with_budget(1024 * 1024);
240        let ctx = ExecutionMemoryContext::new(manager);
241
242        assert_eq!(ctx.total_allocated(), 0);
243        assert_eq!(ctx.pressure_level(), PressureLevel::Normal);
244    }
245
246    #[test]
247    fn test_execution_context_allocation() {
248        let manager = BufferManager::with_budget(1024 * 1024);
249        let mut ctx = ExecutionMemoryContext::new(manager);
250
251        let grant = ctx.allocate(1024);
252        assert!(grant.is_some());
253        assert_eq!(ctx.total_allocated(), 1024);
254    }
255
256    #[test]
257    fn test_execution_context_tracked_allocation() {
258        let manager = BufferManager::with_budget(1024 * 1024);
259        let mut ctx = ExecutionMemoryContext::new(manager);
260
261        assert!(ctx.allocate_tracked(1024));
262        assert_eq!(ctx.total_allocated(), 1024);
263
264        ctx.release_all();
265        assert_eq!(ctx.total_allocated(), 0);
266    }
267
268    #[test]
269    fn test_adjusted_chunk_size_normal() {
270        let manager = BufferManager::with_budget(1024 * 1024);
271        let ctx = ExecutionMemoryContext::new(manager);
272
273        assert_eq!(ctx.adjusted_chunk_size(2048), 2048);
274        assert_eq!(ctx.optimal_chunk_size(), 2048);
275    }
276
277    #[test]
278    fn test_adjusted_chunk_size_under_pressure() {
279        let config = BufferManagerConfig {
280            budget: 1000,
281            soft_limit_fraction: 0.70,
282            evict_limit_fraction: 0.85,
283            hard_limit_fraction: 0.95,
284            background_eviction: false,
285            spill_path: None,
286        };
287        let manager = BufferManager::new(config);
288
289        // Allocate to reach high pressure (>85%)
290        let _g = manager.try_allocate(860, MemoryRegion::ExecutionBuffers);
291
292        let ctx = ExecutionMemoryContext::new(manager);
293        assert_eq!(ctx.pressure_level(), PressureLevel::High);
294        assert_eq!(ctx.adjusted_chunk_size(2048), HIGH_PRESSURE_CHUNK_SIZE);
295        assert!(ctx.should_reduce_chunk_size());
296    }
297
298    #[test]
299    fn test_builder() {
300        let manager = BufferManager::with_budget(1024 * 1024);
301
302        let ctx = ExecutionMemoryContextBuilder::new(manager)
303            .with_initial_allocation(4096)
304            .build();
305
306        assert!(ctx.is_some());
307        let ctx = ctx.unwrap();
308        assert_eq!(ctx.total_allocated(), 4096);
309    }
310
311    #[test]
312    fn test_builder_insufficient_memory() {
313        let manager = BufferManager::with_budget(1000);
314
315        // Try to allocate more than available
316        let ctx = ExecutionMemoryContextBuilder::new(manager)
317            .with_initial_allocation(10000)
318            .build();
319
320        assert!(ctx.is_none());
321    }
322}