use thiserror::Error;
use super::region::MemoryRegion;
#[derive(Error, Debug, Clone)]
pub enum SpillError {
#[error("spilling not supported")]
NotSupported,
#[error("I/O error during spill: {0}")]
IoError(String),
#[error("spill directory not configured")]
NoSpillDirectory,
#[error("insufficient disk space for spill")]
InsufficientDiskSpace,
}
pub trait MemoryConsumer: Send + Sync {
fn name(&self) -> &str;
fn memory_usage(&self) -> usize;
fn eviction_priority(&self) -> u8;
fn region(&self) -> MemoryRegion;
fn evict(&self, target_bytes: usize) -> usize;
fn can_spill(&self) -> bool {
false
}
fn spill(&self, _target_bytes: usize) -> Result<usize, SpillError> {
Err(SpillError::NotSupported)
}
fn reload(&self) -> Result<(), SpillError> {
Ok(())
}
}
pub mod priorities {
pub const SPILL_STAGING: u8 = 10;
pub const QUERY_CACHE: u8 = 30;
pub const INDEX_BUFFERS: u8 = 50;
pub const IDLE_TRANSACTION: u8 = 70;
pub const GRAPH_STORAGE: u8 = 100;
pub const ACTIVE_TRANSACTION: u8 = 200;
}
#[derive(Debug, Clone)]
pub struct ConsumerStats {
pub name: String,
pub region: MemoryRegion,
pub usage_bytes: usize,
pub priority: u8,
pub can_spill: bool,
}
impl ConsumerStats {
pub fn from_consumer(consumer: &dyn MemoryConsumer) -> Self {
Self {
name: consumer.name().to_string(),
region: consumer.region(),
usage_bytes: consumer.memory_usage(),
priority: consumer.eviction_priority(),
can_spill: consumer.can_spill(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
struct TestConsumer {
name: String,
usage: AtomicUsize,
priority: u8,
region: MemoryRegion,
}
impl TestConsumer {
fn new(name: &str, usage: usize, priority: u8, region: MemoryRegion) -> Self {
Self {
name: name.to_string(),
usage: AtomicUsize::new(usage),
priority,
region,
}
}
}
impl MemoryConsumer for TestConsumer {
fn name(&self) -> &str {
&self.name
}
fn memory_usage(&self) -> usize {
self.usage.load(Ordering::Relaxed)
}
fn eviction_priority(&self) -> u8 {
self.priority
}
fn region(&self) -> MemoryRegion {
self.region
}
fn evict(&self, target_bytes: usize) -> usize {
let current = self.usage.load(Ordering::Relaxed);
let to_evict = target_bytes.min(current);
self.usage.fetch_sub(to_evict, Ordering::Relaxed);
to_evict
}
}
#[test]
fn test_consumer_stats() {
let consumer = TestConsumer::new(
"test",
1024,
priorities::INDEX_BUFFERS,
MemoryRegion::IndexBuffers,
);
let stats = ConsumerStats::from_consumer(&consumer);
assert_eq!(stats.name, "test");
assert_eq!(stats.usage_bytes, 1024);
assert_eq!(stats.priority, priorities::INDEX_BUFFERS);
assert_eq!(stats.region, MemoryRegion::IndexBuffers);
assert!(!stats.can_spill);
}
#[test]
fn test_consumer_eviction() {
let consumer = TestConsumer::new(
"test",
1000,
priorities::INDEX_BUFFERS,
MemoryRegion::IndexBuffers,
);
let freed = consumer.evict(500);
assert_eq!(freed, 500);
assert_eq!(consumer.memory_usage(), 500);
let freed = consumer.evict(1000);
assert_eq!(freed, 500);
assert_eq!(consumer.memory_usage(), 0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn test_priority_ordering() {
assert!(priorities::SPILL_STAGING < priorities::QUERY_CACHE);
assert!(priorities::QUERY_CACHE < priorities::INDEX_BUFFERS);
assert!(priorities::INDEX_BUFFERS < priorities::IDLE_TRANSACTION);
assert!(priorities::IDLE_TRANSACTION < priorities::GRAPH_STORAGE);
assert!(priorities::GRAPH_STORAGE < priorities::ACTIVE_TRANSACTION);
}
}