use grafeo_common::memory::buffer::{BufferManager, MemoryGrant, MemoryRegion, PressureLevel};
use std::sync::Arc;
pub const DEFAULT_CHUNK_SIZE: usize = 2048;
pub const MODERATE_PRESSURE_CHUNK_SIZE: usize = 1024;
pub const HIGH_PRESSURE_CHUNK_SIZE: usize = 512;
pub const CRITICAL_PRESSURE_CHUNK_SIZE: usize = 256;
pub struct ExecutionMemoryContext {
manager: Arc<BufferManager>,
allocated: usize,
grants: Vec<MemoryGrant>,
}
impl ExecutionMemoryContext {
#[must_use]
pub fn new(manager: Arc<BufferManager>) -> Self {
Self {
manager,
allocated: 0,
grants: Vec::new(),
}
}
pub fn allocate(&mut self, size: usize) -> Option<MemoryGrant> {
let grant = self
.manager
.try_allocate(size, MemoryRegion::ExecutionBuffers)?;
self.allocated += size;
Some(grant)
}
pub fn allocate_tracked(&mut self, size: usize) -> bool {
if let Some(grant) = self
.manager
.try_allocate(size, MemoryRegion::ExecutionBuffers)
{
self.allocated += size;
self.grants.push(grant);
true
} else {
false
}
}
#[must_use]
pub fn pressure_level(&self) -> PressureLevel {
self.manager.pressure_level()
}
#[must_use]
pub fn should_reduce_chunk_size(&self) -> bool {
matches!(
self.pressure_level(),
PressureLevel::High | PressureLevel::Critical
)
}
#[must_use]
pub fn adjusted_chunk_size(&self, requested: usize) -> usize {
match self.pressure_level() {
PressureLevel::Normal => requested,
PressureLevel::Moderate => requested.min(MODERATE_PRESSURE_CHUNK_SIZE),
PressureLevel::High => requested.min(HIGH_PRESSURE_CHUNK_SIZE),
PressureLevel::Critical => requested.min(CRITICAL_PRESSURE_CHUNK_SIZE),
_ => requested.min(CRITICAL_PRESSURE_CHUNK_SIZE),
}
}
#[must_use]
pub fn optimal_chunk_size(&self) -> usize {
self.adjusted_chunk_size(DEFAULT_CHUNK_SIZE)
}
#[must_use]
pub fn total_allocated(&self) -> usize {
self.allocated
}
#[must_use]
pub fn manager(&self) -> &Arc<BufferManager> {
&self.manager
}
pub fn release_all(&mut self) {
self.grants.clear();
self.allocated = 0;
}
}
impl Drop for ExecutionMemoryContext {
fn drop(&mut self) {
self.grants.clear();
}
}
pub struct ExecutionMemoryContextBuilder {
manager: Arc<BufferManager>,
initial_allocation: usize,
}
impl ExecutionMemoryContextBuilder {
#[must_use]
pub fn new(manager: Arc<BufferManager>) -> Self {
Self {
manager,
initial_allocation: 0,
}
}
#[must_use]
pub fn with_initial_allocation(mut self, size: usize) -> Self {
self.initial_allocation = size;
self
}
pub fn build(self) -> Option<ExecutionMemoryContext> {
let mut ctx = ExecutionMemoryContext::new(self.manager);
if self.initial_allocation > 0 && !ctx.allocate_tracked(self.initial_allocation) {
return None;
}
Some(ctx)
}
}
#[cfg(feature = "spill")]
#[derive(Clone)]
pub struct OperatorMemoryContext {
buffer_manager: Arc<BufferManager>,
spill_manager: Arc<super::spill::SpillManager>,
}
#[cfg(feature = "spill")]
impl OperatorMemoryContext {
#[must_use]
pub fn new(
buffer_manager: Arc<BufferManager>,
spill_manager: Arc<super::spill::SpillManager>,
) -> Self {
Self {
buffer_manager,
spill_manager,
}
}
#[must_use]
pub fn pressure_level(&self) -> PressureLevel {
self.buffer_manager.pressure_level()
}
#[must_use]
pub fn should_spill(&self) -> bool {
self.pressure_level().should_spill()
}
pub fn register_consumer(
&self,
consumer: Arc<dyn grafeo_common::memory::buffer::MemoryConsumer>,
) {
self.buffer_manager.register_consumer(consumer);
}
pub fn unregister_consumer(&self, name: &str) {
self.buffer_manager.unregister_consumer(name);
}
#[must_use]
pub fn buffer_manager(&self) -> &Arc<BufferManager> {
&self.buffer_manager
}
#[must_use]
pub fn spill_manager(&self) -> &Arc<super::spill::SpillManager> {
&self.spill_manager
}
}
#[cfg(test)]
mod tests {
use super::*;
use grafeo_common::memory::buffer::BufferManagerConfig;
#[test]
fn test_execution_context_creation() {
let manager = BufferManager::with_budget(1024 * 1024);
let ctx = ExecutionMemoryContext::new(manager);
assert_eq!(ctx.total_allocated(), 0);
assert_eq!(ctx.pressure_level(), PressureLevel::Normal);
}
#[test]
fn test_execution_context_allocation() {
let manager = BufferManager::with_budget(1024 * 1024);
let mut ctx = ExecutionMemoryContext::new(manager);
let grant = ctx.allocate(1024);
assert!(grant.is_some());
assert_eq!(ctx.total_allocated(), 1024);
}
#[test]
fn test_execution_context_tracked_allocation() {
let manager = BufferManager::with_budget(1024 * 1024);
let mut ctx = ExecutionMemoryContext::new(manager);
assert!(ctx.allocate_tracked(1024));
assert_eq!(ctx.total_allocated(), 1024);
ctx.release_all();
assert_eq!(ctx.total_allocated(), 0);
}
#[test]
fn test_adjusted_chunk_size_normal() {
let manager = BufferManager::with_budget(1024 * 1024);
let ctx = ExecutionMemoryContext::new(manager);
assert_eq!(ctx.adjusted_chunk_size(2048), 2048);
assert_eq!(ctx.optimal_chunk_size(), 2048);
}
#[test]
fn test_adjusted_chunk_size_under_pressure() {
let config = BufferManagerConfig {
budget: 1000,
soft_limit_fraction: 0.70,
evict_limit_fraction: 0.85,
hard_limit_fraction: 0.95,
background_eviction: false,
spill_path: None,
};
let manager = BufferManager::new(config);
let _g = manager.try_allocate(860, MemoryRegion::ExecutionBuffers);
let ctx = ExecutionMemoryContext::new(manager);
assert_eq!(ctx.pressure_level(), PressureLevel::High);
assert_eq!(ctx.adjusted_chunk_size(2048), HIGH_PRESSURE_CHUNK_SIZE);
assert!(ctx.should_reduce_chunk_size());
}
#[test]
fn test_builder() {
let manager = BufferManager::with_budget(1024 * 1024);
let ctx = ExecutionMemoryContextBuilder::new(manager)
.with_initial_allocation(4096)
.build();
assert!(ctx.is_some());
let ctx = ctx.unwrap();
assert_eq!(ctx.total_allocated(), 4096);
}
#[test]
fn test_builder_insufficient_memory() {
let manager = BufferManager::with_budget(1000);
let ctx = ExecutionMemoryContextBuilder::new(manager)
.with_initial_allocation(10000)
.build();
assert!(ctx.is_none());
}
}