Skip to main content

graphos_core/execution/
memory.rs

1//! Execution memory context for memory-aware query execution.
2
3use graphos_common::memory::buffer::{BufferManager, MemoryGrant, MemoryRegion, PressureLevel};
4use std::sync::Arc;
5
6/// Default chunk size for execution buffers.
7pub const DEFAULT_CHUNK_SIZE: usize = 2048;
8
9/// Chunk size under moderate memory pressure.
10pub const MODERATE_PRESSURE_CHUNK_SIZE: usize = 1024;
11
12/// Chunk size under high memory pressure.
13pub const HIGH_PRESSURE_CHUNK_SIZE: usize = 512;
14
15/// Chunk size under critical memory pressure.
16pub const CRITICAL_PRESSURE_CHUNK_SIZE: usize = 256;
17
18/// Execution context with memory awareness.
19///
20/// This context provides memory allocation for query execution operators
21/// and adjusts chunk sizes based on memory pressure.
22pub struct ExecutionMemoryContext {
23    /// Reference to the buffer manager.
24    manager: Arc<BufferManager>,
25    /// Total bytes allocated for this execution context.
26    allocated: usize,
27    /// Grants held by this context.
28    grants: Vec<MemoryGrant>,
29}
30
31impl ExecutionMemoryContext {
32    /// Creates a new execution memory context.
33    #[must_use]
34    pub fn new(manager: Arc<BufferManager>) -> Self {
35        Self {
36            manager,
37            allocated: 0,
38            grants: Vec::new(),
39        }
40    }
41
42    /// Requests memory for execution buffers.
43    ///
44    /// Returns `None` if the allocation cannot be satisfied.
45    pub fn allocate(&mut self, size: usize) -> Option<MemoryGrant> {
46        let grant = self
47            .manager
48            .try_allocate(size, MemoryRegion::ExecutionBuffers)?;
49        self.allocated += size;
50        Some(grant)
51    }
52
53    /// Allocates and stores a grant internally.
54    ///
55    /// The grant will be released when this context is dropped.
56    pub fn allocate_tracked(&mut self, size: usize) -> bool {
57        if let Some(grant) = self
58            .manager
59            .try_allocate(size, MemoryRegion::ExecutionBuffers)
60        {
61            self.allocated += size;
62            self.grants.push(grant);
63            true
64        } else {
65            false
66        }
67    }
68
69    /// Returns the current pressure level.
70    #[must_use]
71    pub fn pressure_level(&self) -> PressureLevel {
72        self.manager.pressure_level()
73    }
74
75    /// Returns whether chunk size should be reduced due to memory pressure.
76    #[must_use]
77    pub fn should_reduce_chunk_size(&self) -> bool {
78        matches!(
79            self.pressure_level(),
80            PressureLevel::High | PressureLevel::Critical
81        )
82    }
83
84    /// Computes adjusted chunk size based on memory pressure.
85    #[must_use]
86    pub fn adjusted_chunk_size(&self, requested: usize) -> usize {
87        match self.pressure_level() {
88            PressureLevel::Normal => requested,
89            PressureLevel::Moderate => requested.min(MODERATE_PRESSURE_CHUNK_SIZE),
90            PressureLevel::High => requested.min(HIGH_PRESSURE_CHUNK_SIZE),
91            PressureLevel::Critical => requested.min(CRITICAL_PRESSURE_CHUNK_SIZE),
92        }
93    }
94
95    /// Returns the optimal chunk size for the current memory state.
96    #[must_use]
97    pub fn optimal_chunk_size(&self) -> usize {
98        self.adjusted_chunk_size(DEFAULT_CHUNK_SIZE)
99    }
100
101    /// Returns total bytes allocated through this context.
102    #[must_use]
103    pub fn total_allocated(&self) -> usize {
104        self.allocated
105    }
106
107    /// Returns the buffer manager.
108    #[must_use]
109    pub fn manager(&self) -> &Arc<BufferManager> {
110        &self.manager
111    }
112
113    /// Releases all tracked grants.
114    pub fn release_all(&mut self) {
115        self.grants.clear();
116        self.allocated = 0;
117    }
118}
119
120impl Drop for ExecutionMemoryContext {
121    fn drop(&mut self) {
122        // Grants are automatically released when dropped
123        self.grants.clear();
124    }
125}
126
127/// Builder for execution memory contexts with pre-allocation.
128pub struct ExecutionMemoryContextBuilder {
129    manager: Arc<BufferManager>,
130    initial_allocation: usize,
131}
132
133impl ExecutionMemoryContextBuilder {
134    /// Creates a new builder with the given buffer manager.
135    #[must_use]
136    pub fn new(manager: Arc<BufferManager>) -> Self {
137        Self {
138            manager,
139            initial_allocation: 0,
140        }
141    }
142
143    /// Sets the initial allocation size.
144    #[must_use]
145    pub fn with_initial_allocation(mut self, size: usize) -> Self {
146        self.initial_allocation = size;
147        self
148    }
149
150    /// Builds the execution memory context.
151    ///
152    /// Returns `None` if the initial allocation cannot be satisfied.
153    pub fn build(self) -> Option<ExecutionMemoryContext> {
154        let mut ctx = ExecutionMemoryContext::new(self.manager);
155
156        if self.initial_allocation > 0 && !ctx.allocate_tracked(self.initial_allocation) {
157            return None;
158        }
159
160        Some(ctx)
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use graphos_common::memory::buffer::BufferManagerConfig;
168
169    #[test]
170    fn test_execution_context_creation() {
171        let manager = BufferManager::with_budget(1024 * 1024);
172        let ctx = ExecutionMemoryContext::new(manager);
173
174        assert_eq!(ctx.total_allocated(), 0);
175        assert_eq!(ctx.pressure_level(), PressureLevel::Normal);
176    }
177
178    #[test]
179    fn test_execution_context_allocation() {
180        let manager = BufferManager::with_budget(1024 * 1024);
181        let mut ctx = ExecutionMemoryContext::new(manager);
182
183        let grant = ctx.allocate(1024);
184        assert!(grant.is_some());
185        assert_eq!(ctx.total_allocated(), 1024);
186    }
187
188    #[test]
189    fn test_execution_context_tracked_allocation() {
190        let manager = BufferManager::with_budget(1024 * 1024);
191        let mut ctx = ExecutionMemoryContext::new(manager);
192
193        assert!(ctx.allocate_tracked(1024));
194        assert_eq!(ctx.total_allocated(), 1024);
195
196        ctx.release_all();
197        assert_eq!(ctx.total_allocated(), 0);
198    }
199
200    #[test]
201    fn test_adjusted_chunk_size_normal() {
202        let manager = BufferManager::with_budget(1024 * 1024);
203        let ctx = ExecutionMemoryContext::new(manager);
204
205        assert_eq!(ctx.adjusted_chunk_size(2048), 2048);
206        assert_eq!(ctx.optimal_chunk_size(), 2048);
207    }
208
209    #[test]
210    fn test_adjusted_chunk_size_under_pressure() {
211        let config = BufferManagerConfig {
212            budget: 1000,
213            soft_limit_fraction: 0.70,
214            evict_limit_fraction: 0.85,
215            hard_limit_fraction: 0.95,
216            background_eviction: false,
217            spill_path: None,
218        };
219        let manager = BufferManager::new(config);
220
221        // Allocate to reach high pressure (>85%)
222        let _g = manager.try_allocate(860, MemoryRegion::ExecutionBuffers);
223
224        let ctx = ExecutionMemoryContext::new(manager);
225        assert_eq!(ctx.pressure_level(), PressureLevel::High);
226        assert_eq!(ctx.adjusted_chunk_size(2048), HIGH_PRESSURE_CHUNK_SIZE);
227        assert!(ctx.should_reduce_chunk_size());
228    }
229
230    #[test]
231    fn test_builder() {
232        let manager = BufferManager::with_budget(1024 * 1024);
233
234        let ctx = ExecutionMemoryContextBuilder::new(manager)
235            .with_initial_allocation(4096)
236            .build();
237
238        assert!(ctx.is_some());
239        let ctx = ctx.unwrap();
240        assert_eq!(ctx.total_allocated(), 4096);
241    }
242
243    #[test]
244    fn test_builder_insufficient_memory() {
245        let manager = BufferManager::with_budget(1000);
246
247        // Try to allocate more than available
248        let ctx = ExecutionMemoryContextBuilder::new(manager)
249            .with_initial_allocation(10000)
250            .build();
251
252        assert!(ctx.is_none());
253    }
254}