use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum IoWorkloadType {
Query,
Compaction,
Backup,
Wal,
Warmup,
}
impl IoWorkloadType {
pub fn prefers_direct_io(&self) -> bool {
match self {
IoWorkloadType::Query => false, IoWorkloadType::Compaction => true, IoWorkloadType::Backup => true, IoWorkloadType::Wal => false, IoWorkloadType::Warmup => false, }
}
pub fn cache_weight(&self) -> u32 {
match self {
IoWorkloadType::Query => 80, IoWorkloadType::Compaction => 10, IoWorkloadType::Backup => 0, IoWorkloadType::Wal => 5, IoWorkloadType::Warmup => 5, }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AccessPattern {
RandomRead,
SequentialScan,
RandomWrite,
SequentialWrite,
Mixed,
}
impl AccessPattern {
pub fn cache_benefit_probability(&self) -> f64 {
match self {
AccessPattern::RandomRead => 0.8, AccessPattern::SequentialScan => 0.2, AccessPattern::RandomWrite => 0.5, AccessPattern::SequentialWrite => 0.1, AccessPattern::Mixed => 0.5,
}
}
}
pub struct CachePartition {
pub name: String,
pub max_bytes: usize,
current_bytes: AtomicUsize,
hits: AtomicU64,
misses: AtomicU64,
evictions: AtomicU64,
}
impl CachePartition {
pub fn new(name: &str, max_bytes: usize) -> Self {
Self {
name: name.to_string(),
max_bytes,
current_bytes: AtomicUsize::new(0),
hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
evictions: AtomicU64::new(0),
}
}
pub fn try_allocate(&self, bytes: usize) -> bool {
loop {
let current = self.current_bytes.load(Ordering::Relaxed);
if current + bytes > self.max_bytes {
return false;
}
if self
.current_bytes
.compare_exchange_weak(
current,
current + bytes,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
return true;
}
}
}
pub fn release(&self, bytes: usize) {
self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
}
pub fn record_hit(&self) {
self.hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_miss(&self) {
self.misses.fetch_add(1, Ordering::Relaxed);
}
pub fn record_eviction(&self, bytes: usize) {
self.evictions.fetch_add(1, Ordering::Relaxed);
self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
}
pub fn hit_rate(&self) -> f64 {
let hits = self.hits.load(Ordering::Relaxed);
let misses = self.misses.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 {
return 1.0;
}
hits as f64 / total as f64
}
pub fn utilization(&self) -> f64 {
self.current_bytes.load(Ordering::Relaxed) as f64 / self.max_bytes as f64
}
pub fn stats(&self) -> PartitionStats {
PartitionStats {
name: self.name.clone(),
max_bytes: self.max_bytes,
current_bytes: self.current_bytes.load(Ordering::Relaxed),
hits: self.hits.load(Ordering::Relaxed),
misses: self.misses.load(Ordering::Relaxed),
evictions: self.evictions.load(Ordering::Relaxed),
hit_rate: self.hit_rate(),
utilization: self.utilization(),
}
}
}
#[derive(Debug, Clone)]
pub struct PartitionStats {
pub name: String,
pub max_bytes: usize,
pub current_bytes: usize,
pub hits: u64,
pub misses: u64,
pub evictions: u64,
pub hit_rate: f64,
pub utilization: f64,
}
#[derive(Debug, Clone)]
pub struct IoIsolationConfig {
pub total_cache_bytes: usize,
pub query_partition_pct: u8,
pub compaction_partition_pct: u8,
pub wal_partition_pct: u8,
pub auto_direct_io: bool,
pub direct_io_threshold: usize,
pub prefer_eviction: bool,
pub memory_pressure_threshold: f64,
}
impl Default for IoIsolationConfig {
fn default() -> Self {
Self {
total_cache_bytes: 1024 * 1024 * 1024, query_partition_pct: 70,
compaction_partition_pct: 20,
wal_partition_pct: 10,
auto_direct_io: true,
direct_io_threshold: 64 * 1024 * 1024, prefer_eviction: true,
memory_pressure_threshold: 0.85,
}
}
}
pub struct IoIsolationManager {
config: IoIsolationConfig,
query_partition: CachePartition,
compaction_partition: CachePartition,
wal_partition: CachePartition,
total_read_bytes: AtomicU64,
total_write_bytes: AtomicU64,
direct_io_bytes: AtomicU64,
buffered_io_bytes: AtomicU64,
}
impl IoIsolationManager {
pub fn new(config: IoIsolationConfig) -> Self {
let total = config.total_cache_bytes;
let query_bytes = total * config.query_partition_pct as usize / 100;
let compaction_bytes = total * config.compaction_partition_pct as usize / 100;
let wal_bytes = total * config.wal_partition_pct as usize / 100;
Self {
config,
query_partition: CachePartition::new("query", query_bytes),
compaction_partition: CachePartition::new("compaction", compaction_bytes),
wal_partition: CachePartition::new("wal", wal_bytes),
total_read_bytes: AtomicU64::new(0),
total_write_bytes: AtomicU64::new(0),
direct_io_bytes: AtomicU64::new(0),
buffered_io_bytes: AtomicU64::new(0),
}
}
pub fn partition_for(&self, workload: IoWorkloadType) -> &CachePartition {
match workload {
IoWorkloadType::Query | IoWorkloadType::Warmup => &self.query_partition,
IoWorkloadType::Compaction | IoWorkloadType::Backup => &self.compaction_partition,
IoWorkloadType::Wal => &self.wal_partition,
}
}
pub fn should_use_direct_io(
&self,
workload: IoWorkloadType,
pattern: AccessPattern,
size_bytes: usize,
) -> bool {
if workload.prefers_direct_io() {
return true;
}
if self.config.auto_direct_io {
if size_bytes >= self.config.direct_io_threshold {
if pattern.cache_benefit_probability() < 0.3 {
return true;
}
}
}
false
}
pub fn record_io(&self, bytes: usize, is_write: bool, is_direct: bool) {
if is_write {
self.total_write_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
} else {
self.total_read_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
}
if is_direct {
self.direct_io_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
} else {
self.buffered_io_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
}
}
pub fn under_memory_pressure(&self) -> bool {
let total_util = (self.query_partition.utilization()
+ self.compaction_partition.utilization()
+ self.wal_partition.utilization())
/ 3.0;
total_util > self.config.memory_pressure_threshold
}
pub fn maybe_evict(&self, target_bytes: usize) -> usize {
if !self.under_memory_pressure() {
return 0;
}
if !self.config.prefer_eviction {
return 0;
}
target_bytes
}
pub fn all_stats(&self) -> Vec<PartitionStats> {
vec![
self.query_partition.stats(),
self.compaction_partition.stats(),
self.wal_partition.stats(),
]
}
pub fn io_stats(&self) -> IoStats {
IoStats {
total_read_bytes: self.total_read_bytes.load(Ordering::Relaxed),
total_write_bytes: self.total_write_bytes.load(Ordering::Relaxed),
direct_io_bytes: self.direct_io_bytes.load(Ordering::Relaxed),
buffered_io_bytes: self.buffered_io_bytes.load(Ordering::Relaxed),
direct_io_ratio: {
let direct = self.direct_io_bytes.load(Ordering::Relaxed);
let buffered = self.buffered_io_bytes.load(Ordering::Relaxed);
let total = direct + buffered;
if total == 0 {
0.0
} else {
direct as f64 / total as f64
}
},
}
}
}
#[derive(Debug, Clone)]
pub struct IoStats {
pub total_read_bytes: u64,
pub total_write_bytes: u64,
pub direct_io_bytes: u64,
pub buffered_io_bytes: u64,
pub direct_io_ratio: f64,
}
pub struct AlignmentContract {
pub buffer_alignment: usize,
pub offset_alignment: usize,
pub size_alignment: usize,
}
impl AlignmentContract {
#[cfg(target_os = "linux")]
pub fn platform_default() -> Self {
Self {
buffer_alignment: 512,
offset_alignment: 512,
size_alignment: 512,
}
}
#[cfg(target_os = "macos")]
pub fn platform_default() -> Self {
Self {
buffer_alignment: 4096,
offset_alignment: 4096,
size_alignment: 4096,
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
pub fn platform_default() -> Self {
Self {
buffer_alignment: 4096,
offset_alignment: 4096,
size_alignment: 4096,
}
}
pub fn validate_buffer(&self, ptr: *const u8) -> Result<(), AlignmentError> {
if (ptr as usize).is_multiple_of(self.buffer_alignment) {
Ok(())
} else {
Err(AlignmentError::BufferMisaligned {
actual: ptr as usize % self.buffer_alignment,
required: self.buffer_alignment,
})
}
}
pub fn validate_offset(&self, offset: u64) -> Result<(), AlignmentError> {
if (offset as usize).is_multiple_of(self.offset_alignment) {
Ok(())
} else {
Err(AlignmentError::OffsetMisaligned {
actual: offset,
required: self.offset_alignment,
})
}
}
pub fn validate_size(&self, size: usize) -> Result<(), AlignmentError> {
if size.is_multiple_of(self.size_alignment) {
Ok(())
} else {
Err(AlignmentError::SizeMisaligned {
actual: size,
required: self.size_alignment,
})
}
}
pub fn align_size(&self, size: usize) -> usize {
size.div_ceil(self.size_alignment) * self.size_alignment
}
}
#[derive(Debug)]
pub enum AlignmentError {
BufferMisaligned { actual: usize, required: usize },
OffsetMisaligned { actual: u64, required: usize },
SizeMisaligned { actual: usize, required: usize },
}
impl std::fmt::Display for AlignmentError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AlignmentError::BufferMisaligned { actual, required } => {
write!(
f,
"Buffer misaligned: offset {} not multiple of {}",
actual, required
)
}
AlignmentError::OffsetMisaligned { actual, required } => {
write!(f, "Offset {} not aligned to {} bytes", actual, required)
}
AlignmentError::SizeMisaligned { actual, required } => {
write!(f, "Size {} not aligned to {} bytes", actual, required)
}
}
}
}
impl std::error::Error for AlignmentError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cache_partition_allocation() {
let partition = CachePartition::new("test", 1024);
assert!(partition.try_allocate(512));
assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 512);
assert!(partition.try_allocate(512));
assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 1024);
assert!(!partition.try_allocate(1));
partition.release(512);
assert!(partition.try_allocate(512));
}
#[test]
fn test_partition_stats() {
let partition = CachePartition::new("test", 1000);
partition.try_allocate(500);
partition.record_hit();
partition.record_hit();
partition.record_miss();
let stats = partition.stats();
assert_eq!(stats.current_bytes, 500);
assert_eq!(stats.hits, 2);
assert_eq!(stats.misses, 1);
assert!((stats.hit_rate - 0.666).abs() < 0.01);
assert!((stats.utilization - 0.5).abs() < 0.01);
}
#[test]
fn test_direct_io_decision() {
let manager = IoIsolationManager::new(IoIsolationConfig::default());
assert!(manager.should_use_direct_io(
IoWorkloadType::Compaction,
AccessPattern::SequentialScan,
1024
));
assert!(!manager.should_use_direct_io(
IoWorkloadType::Query,
AccessPattern::RandomRead,
4096
));
assert!(manager.should_use_direct_io(
IoWorkloadType::Query,
AccessPattern::SequentialScan,
100 * 1024 * 1024 ));
}
#[test]
fn test_alignment_contract() {
let contract = AlignmentContract::platform_default();
assert!(contract.validate_offset(4096).is_ok());
assert!(contract.validate_size(4096).is_ok());
if contract.offset_alignment > 1 {
assert!(contract.validate_offset(1).is_err());
}
let aligned = contract.align_size(5000);
assert!(aligned >= 5000);
assert!(aligned.is_multiple_of(contract.size_alignment));
}
}