Skip to main content

sochdb_storage/
io_isolation.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # I/O Isolation Policy
19//!
20//! Implements cache partitioning and I/O isolation to prevent:
21//! - Page cache pollution from large scans
22//! - p99 cliffs from compaction I/O
23//! - Memory fragmentation under pressure
24//!
25//! ## Design Principles
26//!
27//! 1. **Workload Classification**: Classify I/O as query, compaction, or backup
28//! 2. **Cache Partitioning**: Separate caches for different workloads
29//! 3. **Direct I/O Policy**: Use O_DIRECT based on access pattern
30//! 4. **Eviction Priority**: Prefer eviction over allocator fragmentation
31//!
32//! ## Algorithm
33//!
34//! Cache eviction uses CLOCK-Pro (or segmented LRU) to approximate
35//! recency+frequency without LRU's pathological scan sensitivity.
36
37use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
38
39/// I/O workload type for classification
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41pub enum IoWorkloadType {
42    /// User query (point reads, small scans)
43    Query,
44    /// Background compaction/merge
45    Compaction,
46    /// Backup/snapshot
47    Backup,
48    /// WAL writes
49    Wal,
50    /// Cache warmup/preload
51    Warmup,
52}
53
54impl IoWorkloadType {
55    /// Should this workload use Direct I/O?
56    pub fn prefers_direct_io(&self) -> bool {
57        match self {
58            IoWorkloadType::Query => false,     // Benefit from cache
59            IoWorkloadType::Compaction => true, // One-time sequential
60            IoWorkloadType::Backup => true,     // One-time sequential
61            IoWorkloadType::Wal => false,       // Small writes, buffered
62            IoWorkloadType::Warmup => false,    // Explicitly filling cache
63        }
64    }
65
66    /// Cache partition weight (higher = more cache share)
67    pub fn cache_weight(&self) -> u32 {
68        match self {
69            IoWorkloadType::Query => 80,      // Most cache to queries
70            IoWorkloadType::Compaction => 10, // Minimal cache
71            IoWorkloadType::Backup => 0,      // No cache
72            IoWorkloadType::Wal => 5,         // Small buffer
73            IoWorkloadType::Warmup => 5,      // Uses query partition
74        }
75    }
76}
77
78/// I/O access pattern for policy decisions
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum AccessPattern {
81    /// Random point reads
82    RandomRead,
83    /// Sequential scan
84    SequentialScan,
85    /// Random writes
86    RandomWrite,
87    /// Sequential writes (WAL, compaction output)
88    SequentialWrite,
89    /// Mixed pattern
90    Mixed,
91}
92
93impl AccessPattern {
94    /// Estimate if this access will benefit from page cache
95    pub fn cache_benefit_probability(&self) -> f64 {
96        match self {
97            AccessPattern::RandomRead => 0.8,      // Likely reused
98            AccessPattern::SequentialScan => 0.2,  // Low reuse probability
99            AccessPattern::RandomWrite => 0.5,     // May be read back
100            AccessPattern::SequentialWrite => 0.1, // Rarely re-read immediately
101            AccessPattern::Mixed => 0.5,
102        }
103    }
104}
105
106/// Cache partition for workload isolation
107pub struct CachePartition {
108    /// Partition name
109    pub name: String,
110    /// Maximum size in bytes
111    pub max_bytes: usize,
112    /// Current size in bytes
113    current_bytes: AtomicUsize,
114    /// Hit count
115    hits: AtomicU64,
116    /// Miss count
117    misses: AtomicU64,
118    /// Eviction count
119    evictions: AtomicU64,
120}
121
122impl CachePartition {
123    /// Create a new partition
124    pub fn new(name: &str, max_bytes: usize) -> Self {
125        Self {
126            name: name.to_string(),
127            max_bytes,
128            current_bytes: AtomicUsize::new(0),
129            hits: AtomicU64::new(0),
130            misses: AtomicU64::new(0),
131            evictions: AtomicU64::new(0),
132        }
133    }
134
135    /// Try to allocate space in this partition
136    pub fn try_allocate(&self, bytes: usize) -> bool {
137        loop {
138            let current = self.current_bytes.load(Ordering::Relaxed);
139            if current + bytes > self.max_bytes {
140                return false;
141            }
142            if self
143                .current_bytes
144                .compare_exchange_weak(
145                    current,
146                    current + bytes,
147                    Ordering::AcqRel,
148                    Ordering::Relaxed,
149                )
150                .is_ok()
151            {
152                return true;
153            }
154        }
155    }
156
157    /// Release space from this partition
158    pub fn release(&self, bytes: usize) {
159        self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
160    }
161
162    /// Record a cache hit
163    pub fn record_hit(&self) {
164        self.hits.fetch_add(1, Ordering::Relaxed);
165    }
166
167    /// Record a cache miss
168    pub fn record_miss(&self) {
169        self.misses.fetch_add(1, Ordering::Relaxed);
170    }
171
172    /// Record an eviction
173    pub fn record_eviction(&self, bytes: usize) {
174        self.evictions.fetch_add(1, Ordering::Relaxed);
175        self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
176    }
177
178    /// Get hit rate
179    pub fn hit_rate(&self) -> f64 {
180        let hits = self.hits.load(Ordering::Relaxed);
181        let misses = self.misses.load(Ordering::Relaxed);
182        let total = hits + misses;
183        if total == 0 {
184            return 1.0;
185        }
186        hits as f64 / total as f64
187    }
188
189    /// Get utilization (0.0 - 1.0)
190    pub fn utilization(&self) -> f64 {
191        self.current_bytes.load(Ordering::Relaxed) as f64 / self.max_bytes as f64
192    }
193
194    /// Get partition stats
195    pub fn stats(&self) -> PartitionStats {
196        PartitionStats {
197            name: self.name.clone(),
198            max_bytes: self.max_bytes,
199            current_bytes: self.current_bytes.load(Ordering::Relaxed),
200            hits: self.hits.load(Ordering::Relaxed),
201            misses: self.misses.load(Ordering::Relaxed),
202            evictions: self.evictions.load(Ordering::Relaxed),
203            hit_rate: self.hit_rate(),
204            utilization: self.utilization(),
205        }
206    }
207}
208
209/// Partition statistics
210#[derive(Debug, Clone)]
211pub struct PartitionStats {
212    pub name: String,
213    pub max_bytes: usize,
214    pub current_bytes: usize,
215    pub hits: u64,
216    pub misses: u64,
217    pub evictions: u64,
218    pub hit_rate: f64,
219    pub utilization: f64,
220}
221
222/// I/O isolation policy configuration
223#[derive(Debug, Clone)]
224pub struct IoIsolationConfig {
225    /// Total cache budget in bytes
226    pub total_cache_bytes: usize,
227    /// Query partition percentage
228    pub query_partition_pct: u8,
229    /// Compaction partition percentage
230    pub compaction_partition_pct: u8,
231    /// WAL partition percentage
232    pub wal_partition_pct: u8,
233    /// Enable automatic Direct I/O for large scans
234    pub auto_direct_io: bool,
235    /// Threshold for switching to Direct I/O (bytes)
236    pub direct_io_threshold: usize,
237    /// Under memory pressure, prefer eviction over OOM
238    pub prefer_eviction: bool,
239    /// Memory pressure threshold (0.0 - 1.0)
240    pub memory_pressure_threshold: f64,
241}
242
243impl Default for IoIsolationConfig {
244    fn default() -> Self {
245        Self {
246            total_cache_bytes: 1024 * 1024 * 1024, // 1GB
247            query_partition_pct: 70,
248            compaction_partition_pct: 20,
249            wal_partition_pct: 10,
250            auto_direct_io: true,
251            direct_io_threshold: 64 * 1024 * 1024, // 64MB
252            prefer_eviction: true,
253            memory_pressure_threshold: 0.85,
254        }
255    }
256}
257
258/// I/O isolation manager
259pub struct IoIsolationManager {
260    config: IoIsolationConfig,
261    /// Query cache partition
262    query_partition: CachePartition,
263    /// Compaction cache partition
264    compaction_partition: CachePartition,
265    /// WAL cache partition
266    wal_partition: CachePartition,
267    /// Total I/O bytes read
268    total_read_bytes: AtomicU64,
269    /// Total I/O bytes written
270    total_write_bytes: AtomicU64,
271    /// Direct I/O bytes
272    direct_io_bytes: AtomicU64,
273    /// Buffered I/O bytes
274    buffered_io_bytes: AtomicU64,
275}
276
277impl IoIsolationManager {
278    /// Create a new I/O isolation manager
279    pub fn new(config: IoIsolationConfig) -> Self {
280        let total = config.total_cache_bytes;
281        let query_bytes = total * config.query_partition_pct as usize / 100;
282        let compaction_bytes = total * config.compaction_partition_pct as usize / 100;
283        let wal_bytes = total * config.wal_partition_pct as usize / 100;
284
285        Self {
286            config,
287            query_partition: CachePartition::new("query", query_bytes),
288            compaction_partition: CachePartition::new("compaction", compaction_bytes),
289            wal_partition: CachePartition::new("wal", wal_bytes),
290            total_read_bytes: AtomicU64::new(0),
291            total_write_bytes: AtomicU64::new(0),
292            direct_io_bytes: AtomicU64::new(0),
293            buffered_io_bytes: AtomicU64::new(0),
294        }
295    }
296
297    /// Get the appropriate cache partition for a workload
298    pub fn partition_for(&self, workload: IoWorkloadType) -> &CachePartition {
299        match workload {
300            IoWorkloadType::Query | IoWorkloadType::Warmup => &self.query_partition,
301            IoWorkloadType::Compaction | IoWorkloadType::Backup => &self.compaction_partition,
302            IoWorkloadType::Wal => &self.wal_partition,
303        }
304    }
305
306    /// Decide whether to use Direct I/O for an operation
307    pub fn should_use_direct_io(
308        &self,
309        workload: IoWorkloadType,
310        pattern: AccessPattern,
311        size_bytes: usize,
312    ) -> bool {
313        // Explicit workload preference
314        if workload.prefers_direct_io() {
315            return true;
316        }
317
318        // Auto-detect based on size and pattern
319        if self.config.auto_direct_io {
320            if size_bytes >= self.config.direct_io_threshold {
321                // Large operation
322                if pattern.cache_benefit_probability() < 0.3 {
323                    return true;
324                }
325            }
326        }
327
328        false
329    }
330
331    /// Record I/O operation
332    pub fn record_io(&self, bytes: usize, is_write: bool, is_direct: bool) {
333        if is_write {
334            self.total_write_bytes
335                .fetch_add(bytes as u64, Ordering::Relaxed);
336        } else {
337            self.total_read_bytes
338                .fetch_add(bytes as u64, Ordering::Relaxed);
339        }
340
341        if is_direct {
342            self.direct_io_bytes
343                .fetch_add(bytes as u64, Ordering::Relaxed);
344        } else {
345            self.buffered_io_bytes
346                .fetch_add(bytes as u64, Ordering::Relaxed);
347        }
348    }
349
350    /// Check if under memory pressure
351    pub fn under_memory_pressure(&self) -> bool {
352        let total_util = (self.query_partition.utilization()
353            + self.compaction_partition.utilization()
354            + self.wal_partition.utilization())
355            / 3.0;
356        total_util > self.config.memory_pressure_threshold
357    }
358
359    /// Trigger emergency eviction if under pressure
360    pub fn maybe_evict(&self, target_bytes: usize) -> usize {
361        if !self.under_memory_pressure() {
362            return 0;
363        }
364
365        if !self.config.prefer_eviction {
366            return 0;
367        }
368
369        // In a real implementation, this would trigger cache eviction
370        // For now, just signal how much should be evicted
371        target_bytes
372    }
373
374    /// Get all partition stats
375    pub fn all_stats(&self) -> Vec<PartitionStats> {
376        vec![
377            self.query_partition.stats(),
378            self.compaction_partition.stats(),
379            self.wal_partition.stats(),
380        ]
381    }
382
383    /// Get I/O stats
384    pub fn io_stats(&self) -> IoStats {
385        IoStats {
386            total_read_bytes: self.total_read_bytes.load(Ordering::Relaxed),
387            total_write_bytes: self.total_write_bytes.load(Ordering::Relaxed),
388            direct_io_bytes: self.direct_io_bytes.load(Ordering::Relaxed),
389            buffered_io_bytes: self.buffered_io_bytes.load(Ordering::Relaxed),
390            direct_io_ratio: {
391                let direct = self.direct_io_bytes.load(Ordering::Relaxed);
392                let buffered = self.buffered_io_bytes.load(Ordering::Relaxed);
393                let total = direct + buffered;
394                if total == 0 {
395                    0.0
396                } else {
397                    direct as f64 / total as f64
398                }
399            },
400        }
401    }
402}
403
404/// I/O statistics
405#[derive(Debug, Clone)]
406pub struct IoStats {
407    pub total_read_bytes: u64,
408    pub total_write_bytes: u64,
409    pub direct_io_bytes: u64,
410    pub buffered_io_bytes: u64,
411    pub direct_io_ratio: f64,
412}
413
414/// Alignment contract for Direct I/O
415pub struct AlignmentContract {
416    /// Required buffer alignment
417    pub buffer_alignment: usize,
418    /// Required offset alignment
419    pub offset_alignment: usize,
420    /// Required size alignment
421    pub size_alignment: usize,
422}
423
424impl AlignmentContract {
425    /// Platform-specific contract
426    #[cfg(target_os = "linux")]
427    pub fn platform_default() -> Self {
428        Self {
429            buffer_alignment: 512,
430            offset_alignment: 512,
431            size_alignment: 512,
432        }
433    }
434
435    #[cfg(target_os = "macos")]
436    pub fn platform_default() -> Self {
437        Self {
438            buffer_alignment: 4096,
439            offset_alignment: 4096,
440            size_alignment: 4096,
441        }
442    }
443
444    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
445    pub fn platform_default() -> Self {
446        Self {
447            buffer_alignment: 4096,
448            offset_alignment: 4096,
449            size_alignment: 4096,
450        }
451    }
452
453    /// Validate buffer alignment
454    pub fn validate_buffer(&self, ptr: *const u8) -> Result<(), AlignmentError> {
455        if (ptr as usize).is_multiple_of(self.buffer_alignment) {
456            Ok(())
457        } else {
458            Err(AlignmentError::BufferMisaligned {
459                actual: ptr as usize % self.buffer_alignment,
460                required: self.buffer_alignment,
461            })
462        }
463    }
464
465    /// Validate offset alignment
466    pub fn validate_offset(&self, offset: u64) -> Result<(), AlignmentError> {
467        if (offset as usize).is_multiple_of(self.offset_alignment) {
468            Ok(())
469        } else {
470            Err(AlignmentError::OffsetMisaligned {
471                actual: offset,
472                required: self.offset_alignment,
473            })
474        }
475    }
476
477    /// Validate size alignment
478    pub fn validate_size(&self, size: usize) -> Result<(), AlignmentError> {
479        if size.is_multiple_of(self.size_alignment) {
480            Ok(())
481        } else {
482            Err(AlignmentError::SizeMisaligned {
483                actual: size,
484                required: self.size_alignment,
485            })
486        }
487    }
488
489    /// Round up size to alignment
490    pub fn align_size(&self, size: usize) -> usize {
491        size.div_ceil(self.size_alignment) * self.size_alignment
492    }
493}
494
495/// Alignment error
496#[derive(Debug)]
497pub enum AlignmentError {
498    BufferMisaligned { actual: usize, required: usize },
499    OffsetMisaligned { actual: u64, required: usize },
500    SizeMisaligned { actual: usize, required: usize },
501}
502
503impl std::fmt::Display for AlignmentError {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        match self {
506            AlignmentError::BufferMisaligned { actual, required } => {
507                write!(
508                    f,
509                    "Buffer misaligned: offset {} not multiple of {}",
510                    actual, required
511                )
512            }
513            AlignmentError::OffsetMisaligned { actual, required } => {
514                write!(f, "Offset {} not aligned to {} bytes", actual, required)
515            }
516            AlignmentError::SizeMisaligned { actual, required } => {
517                write!(f, "Size {} not aligned to {} bytes", actual, required)
518            }
519        }
520    }
521}
522
523impl std::error::Error for AlignmentError {}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528
529    #[test]
530    fn test_cache_partition_allocation() {
531        let partition = CachePartition::new("test", 1024);
532
533        assert!(partition.try_allocate(512));
534        assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 512);
535
536        assert!(partition.try_allocate(512));
537        assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 1024);
538
539        // Should fail - over capacity
540        assert!(!partition.try_allocate(1));
541
542        // Release and try again
543        partition.release(512);
544        assert!(partition.try_allocate(512));
545    }
546
547    #[test]
548    fn test_partition_stats() {
549        let partition = CachePartition::new("test", 1000);
550
551        partition.try_allocate(500);
552        partition.record_hit();
553        partition.record_hit();
554        partition.record_miss();
555
556        let stats = partition.stats();
557        assert_eq!(stats.current_bytes, 500);
558        assert_eq!(stats.hits, 2);
559        assert_eq!(stats.misses, 1);
560        assert!((stats.hit_rate - 0.666).abs() < 0.01);
561        assert!((stats.utilization - 0.5).abs() < 0.01);
562    }
563
564    #[test]
565    fn test_direct_io_decision() {
566        let manager = IoIsolationManager::new(IoIsolationConfig::default());
567
568        // Compaction always uses direct I/O
569        assert!(manager.should_use_direct_io(
570            IoWorkloadType::Compaction,
571            AccessPattern::SequentialScan,
572            1024
573        ));
574
575        // Query with small size uses buffered
576        assert!(!manager.should_use_direct_io(
577            IoWorkloadType::Query,
578            AccessPattern::RandomRead,
579            4096
580        ));
581
582        // Query with large size and low reuse probability uses direct
583        assert!(manager.should_use_direct_io(
584            IoWorkloadType::Query,
585            AccessPattern::SequentialScan,
586            100 * 1024 * 1024 // 100MB
587        ));
588    }
589
590    #[test]
591    fn test_alignment_contract() {
592        let contract = AlignmentContract::platform_default();
593
594        // Valid alignment
595        assert!(contract.validate_offset(4096).is_ok());
596        assert!(contract.validate_size(4096).is_ok());
597
598        // Invalid alignment (on most platforms)
599        if contract.offset_alignment > 1 {
600            assert!(contract.validate_offset(1).is_err());
601        }
602
603        // Align size
604        let aligned = contract.align_size(5000);
605        assert!(aligned >= 5000);
606        assert!(aligned.is_multiple_of(contract.size_alignment));
607    }
608}