Skip to main content

sochdb_storage/
io_isolation.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # I/O Isolation Policy
19//!
20//! Implements cache partitioning and I/O isolation to prevent:
21//! - Page cache pollution from large scans
22//! - p99 cliffs from compaction I/O
23//! - Memory fragmentation under pressure
24//!
25//! ## Design Principles
26//!
27//! 1. **Workload Classification**: Classify I/O as query, compaction, or backup
28//! 2. **Cache Partitioning**: Separate caches for different workloads
29//! 3. **Direct I/O Policy**: Use O_DIRECT based on access pattern
30//! 4. **Eviction Priority**: Prefer eviction over allocator fragmentation
31//!
32//! ## Algorithm
33//!
34//! Cache eviction uses CLOCK-Pro (or segmented LRU) to approximate
35//! recency+frequency without LRU's pathological scan sensitivity.
36
37use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
38use std::sync::Arc;
39
40use parking_lot::RwLock;
41
42/// I/O workload type for classification
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub enum IoWorkloadType {
45    /// User query (point reads, small scans)
46    Query,
47    /// Background compaction/merge
48    Compaction,
49    /// Backup/snapshot
50    Backup,
51    /// WAL writes
52    Wal,
53    /// Cache warmup/preload
54    Warmup,
55}
56
57impl IoWorkloadType {
58    /// Should this workload use Direct I/O?
59    pub fn prefers_direct_io(&self) -> bool {
60        match self {
61            IoWorkloadType::Query => false,        // Benefit from cache
62            IoWorkloadType::Compaction => true,    // One-time sequential
63            IoWorkloadType::Backup => true,        // One-time sequential
64            IoWorkloadType::Wal => false,          // Small writes, buffered
65            IoWorkloadType::Warmup => false,       // Explicitly filling cache
66        }
67    }
68
69    /// Cache partition weight (higher = more cache share)
70    pub fn cache_weight(&self) -> u32 {
71        match self {
72            IoWorkloadType::Query => 80,       // Most cache to queries
73            IoWorkloadType::Compaction => 10,  // Minimal cache
74            IoWorkloadType::Backup => 0,       // No cache
75            IoWorkloadType::Wal => 5,          // Small buffer
76            IoWorkloadType::Warmup => 5,       // Uses query partition
77        }
78    }
79}
80
81/// I/O access pattern for policy decisions
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum AccessPattern {
84    /// Random point reads
85    RandomRead,
86    /// Sequential scan
87    SequentialScan,
88    /// Random writes
89    RandomWrite,
90    /// Sequential writes (WAL, compaction output)
91    SequentialWrite,
92    /// Mixed pattern
93    Mixed,
94}
95
96impl AccessPattern {
97    /// Estimate if this access will benefit from page cache
98    pub fn cache_benefit_probability(&self) -> f64 {
99        match self {
100            AccessPattern::RandomRead => 0.8,     // Likely reused
101            AccessPattern::SequentialScan => 0.2, // Low reuse probability
102            AccessPattern::RandomWrite => 0.5,    // May be read back
103            AccessPattern::SequentialWrite => 0.1,// Rarely re-read immediately
104            AccessPattern::Mixed => 0.5,
105        }
106    }
107}
108
109/// Cache partition for workload isolation
110pub struct CachePartition {
111    /// Partition name
112    pub name: String,
113    /// Maximum size in bytes
114    pub max_bytes: usize,
115    /// Current size in bytes
116    current_bytes: AtomicUsize,
117    /// Hit count
118    hits: AtomicU64,
119    /// Miss count
120    misses: AtomicU64,
121    /// Eviction count
122    evictions: AtomicU64,
123}
124
125impl CachePartition {
126    /// Create a new partition
127    pub fn new(name: &str, max_bytes: usize) -> Self {
128        Self {
129            name: name.to_string(),
130            max_bytes,
131            current_bytes: AtomicUsize::new(0),
132            hits: AtomicU64::new(0),
133            misses: AtomicU64::new(0),
134            evictions: AtomicU64::new(0),
135        }
136    }
137
138    /// Try to allocate space in this partition
139    pub fn try_allocate(&self, bytes: usize) -> bool {
140        loop {
141            let current = self.current_bytes.load(Ordering::Relaxed);
142            if current + bytes > self.max_bytes {
143                return false;
144            }
145            if self.current_bytes
146                .compare_exchange_weak(
147                    current,
148                    current + bytes,
149                    Ordering::AcqRel,
150                    Ordering::Relaxed,
151                )
152                .is_ok()
153            {
154                return true;
155            }
156        }
157    }
158
159    /// Release space from this partition
160    pub fn release(&self, bytes: usize) {
161        self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
162    }
163
164    /// Record a cache hit
165    pub fn record_hit(&self) {
166        self.hits.fetch_add(1, Ordering::Relaxed);
167    }
168
169    /// Record a cache miss
170    pub fn record_miss(&self) {
171        self.misses.fetch_add(1, Ordering::Relaxed);
172    }
173
174    /// Record an eviction
175    pub fn record_eviction(&self, bytes: usize) {
176        self.evictions.fetch_add(1, Ordering::Relaxed);
177        self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
178    }
179
180    /// Get hit rate
181    pub fn hit_rate(&self) -> f64 {
182        let hits = self.hits.load(Ordering::Relaxed);
183        let misses = self.misses.load(Ordering::Relaxed);
184        let total = hits + misses;
185        if total == 0 {
186            return 1.0;
187        }
188        hits as f64 / total as f64
189    }
190
191    /// Get utilization (0.0 - 1.0)
192    pub fn utilization(&self) -> f64 {
193        self.current_bytes.load(Ordering::Relaxed) as f64 / self.max_bytes as f64
194    }
195
196    /// Get partition stats
197    pub fn stats(&self) -> PartitionStats {
198        PartitionStats {
199            name: self.name.clone(),
200            max_bytes: self.max_bytes,
201            current_bytes: self.current_bytes.load(Ordering::Relaxed),
202            hits: self.hits.load(Ordering::Relaxed),
203            misses: self.misses.load(Ordering::Relaxed),
204            evictions: self.evictions.load(Ordering::Relaxed),
205            hit_rate: self.hit_rate(),
206            utilization: self.utilization(),
207        }
208    }
209}
210
211/// Partition statistics
212#[derive(Debug, Clone)]
213pub struct PartitionStats {
214    pub name: String,
215    pub max_bytes: usize,
216    pub current_bytes: usize,
217    pub hits: u64,
218    pub misses: u64,
219    pub evictions: u64,
220    pub hit_rate: f64,
221    pub utilization: f64,
222}
223
224/// I/O isolation policy configuration
225#[derive(Debug, Clone)]
226pub struct IoIsolationConfig {
227    /// Total cache budget in bytes
228    pub total_cache_bytes: usize,
229    /// Query partition percentage
230    pub query_partition_pct: u8,
231    /// Compaction partition percentage
232    pub compaction_partition_pct: u8,
233    /// WAL partition percentage
234    pub wal_partition_pct: u8,
235    /// Enable automatic Direct I/O for large scans
236    pub auto_direct_io: bool,
237    /// Threshold for switching to Direct I/O (bytes)
238    pub direct_io_threshold: usize,
239    /// Under memory pressure, prefer eviction over OOM
240    pub prefer_eviction: bool,
241    /// Memory pressure threshold (0.0 - 1.0)
242    pub memory_pressure_threshold: f64,
243}
244
245impl Default for IoIsolationConfig {
246    fn default() -> Self {
247        Self {
248            total_cache_bytes: 1024 * 1024 * 1024, // 1GB
249            query_partition_pct: 70,
250            compaction_partition_pct: 20,
251            wal_partition_pct: 10,
252            auto_direct_io: true,
253            direct_io_threshold: 64 * 1024 * 1024, // 64MB
254            prefer_eviction: true,
255            memory_pressure_threshold: 0.85,
256        }
257    }
258}
259
260/// I/O isolation manager
261pub struct IoIsolationManager {
262    config: IoIsolationConfig,
263    /// Query cache partition
264    query_partition: CachePartition,
265    /// Compaction cache partition
266    compaction_partition: CachePartition,
267    /// WAL cache partition
268    wal_partition: CachePartition,
269    /// Total I/O bytes read
270    total_read_bytes: AtomicU64,
271    /// Total I/O bytes written
272    total_write_bytes: AtomicU64,
273    /// Direct I/O bytes
274    direct_io_bytes: AtomicU64,
275    /// Buffered I/O bytes
276    buffered_io_bytes: AtomicU64,
277}
278
279impl IoIsolationManager {
280    /// Create a new I/O isolation manager
281    pub fn new(config: IoIsolationConfig) -> Self {
282        let total = config.total_cache_bytes;
283        let query_bytes = total * config.query_partition_pct as usize / 100;
284        let compaction_bytes = total * config.compaction_partition_pct as usize / 100;
285        let wal_bytes = total * config.wal_partition_pct as usize / 100;
286
287        Self {
288            config,
289            query_partition: CachePartition::new("query", query_bytes),
290            compaction_partition: CachePartition::new("compaction", compaction_bytes),
291            wal_partition: CachePartition::new("wal", wal_bytes),
292            total_read_bytes: AtomicU64::new(0),
293            total_write_bytes: AtomicU64::new(0),
294            direct_io_bytes: AtomicU64::new(0),
295            buffered_io_bytes: AtomicU64::new(0),
296        }
297    }
298
299    /// Get the appropriate cache partition for a workload
300    pub fn partition_for(&self, workload: IoWorkloadType) -> &CachePartition {
301        match workload {
302            IoWorkloadType::Query | IoWorkloadType::Warmup => &self.query_partition,
303            IoWorkloadType::Compaction | IoWorkloadType::Backup => &self.compaction_partition,
304            IoWorkloadType::Wal => &self.wal_partition,
305        }
306    }
307
308    /// Decide whether to use Direct I/O for an operation
309    pub fn should_use_direct_io(
310        &self,
311        workload: IoWorkloadType,
312        pattern: AccessPattern,
313        size_bytes: usize,
314    ) -> bool {
315        // Explicit workload preference
316        if workload.prefers_direct_io() {
317            return true;
318        }
319
320        // Auto-detect based on size and pattern
321        if self.config.auto_direct_io {
322            if size_bytes >= self.config.direct_io_threshold {
323                // Large operation
324                if pattern.cache_benefit_probability() < 0.3 {
325                    return true;
326                }
327            }
328        }
329
330        false
331    }
332
333    /// Record I/O operation
334    pub fn record_io(&self, bytes: usize, is_write: bool, is_direct: bool) {
335        if is_write {
336            self.total_write_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
337        } else {
338            self.total_read_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
339        }
340
341        if is_direct {
342            self.direct_io_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
343        } else {
344            self.buffered_io_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
345        }
346    }
347
348    /// Check if under memory pressure
349    pub fn under_memory_pressure(&self) -> bool {
350        let total_util = (self.query_partition.utilization()
351            + self.compaction_partition.utilization()
352            + self.wal_partition.utilization())
353            / 3.0;
354        total_util > self.config.memory_pressure_threshold
355    }
356
357    /// Trigger emergency eviction if under pressure
358    pub fn maybe_evict(&self, target_bytes: usize) -> usize {
359        if !self.under_memory_pressure() {
360            return 0;
361        }
362
363        if !self.config.prefer_eviction {
364            return 0;
365        }
366
367        // In a real implementation, this would trigger cache eviction
368        // For now, just signal how much should be evicted
369        target_bytes
370    }
371
372    /// Get all partition stats
373    pub fn all_stats(&self) -> Vec<PartitionStats> {
374        vec![
375            self.query_partition.stats(),
376            self.compaction_partition.stats(),
377            self.wal_partition.stats(),
378        ]
379    }
380
381    /// Get I/O stats
382    pub fn io_stats(&self) -> IoStats {
383        IoStats {
384            total_read_bytes: self.total_read_bytes.load(Ordering::Relaxed),
385            total_write_bytes: self.total_write_bytes.load(Ordering::Relaxed),
386            direct_io_bytes: self.direct_io_bytes.load(Ordering::Relaxed),
387            buffered_io_bytes: self.buffered_io_bytes.load(Ordering::Relaxed),
388            direct_io_ratio: {
389                let direct = self.direct_io_bytes.load(Ordering::Relaxed);
390                let buffered = self.buffered_io_bytes.load(Ordering::Relaxed);
391                let total = direct + buffered;
392                if total == 0 { 0.0 } else { direct as f64 / total as f64 }
393            },
394        }
395    }
396}
397
398/// I/O statistics
399#[derive(Debug, Clone)]
400pub struct IoStats {
401    pub total_read_bytes: u64,
402    pub total_write_bytes: u64,
403    pub direct_io_bytes: u64,
404    pub buffered_io_bytes: u64,
405    pub direct_io_ratio: f64,
406}
407
408/// Alignment contract for Direct I/O
409pub struct AlignmentContract {
410    /// Required buffer alignment
411    pub buffer_alignment: usize,
412    /// Required offset alignment
413    pub offset_alignment: usize,
414    /// Required size alignment
415    pub size_alignment: usize,
416}
417
418impl AlignmentContract {
419    /// Platform-specific contract
420    #[cfg(target_os = "linux")]
421    pub fn platform_default() -> Self {
422        Self {
423            buffer_alignment: 512,
424            offset_alignment: 512,
425            size_alignment: 512,
426        }
427    }
428
429    #[cfg(target_os = "macos")]
430    pub fn platform_default() -> Self {
431        Self {
432            buffer_alignment: 4096,
433            offset_alignment: 4096,
434            size_alignment: 4096,
435        }
436    }
437
438    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
439    pub fn platform_default() -> Self {
440        Self {
441            buffer_alignment: 4096,
442            offset_alignment: 4096,
443            size_alignment: 4096,
444        }
445    }
446
447    /// Validate buffer alignment
448    pub fn validate_buffer(&self, ptr: *const u8) -> Result<(), AlignmentError> {
449        if (ptr as usize).is_multiple_of(self.buffer_alignment) {
450            Ok(())
451        } else {
452            Err(AlignmentError::BufferMisaligned {
453                actual: ptr as usize % self.buffer_alignment,
454                required: self.buffer_alignment,
455            })
456        }
457    }
458
459    /// Validate offset alignment
460    pub fn validate_offset(&self, offset: u64) -> Result<(), AlignmentError> {
461        if (offset as usize).is_multiple_of(self.offset_alignment) {
462            Ok(())
463        } else {
464            Err(AlignmentError::OffsetMisaligned {
465                actual: offset,
466                required: self.offset_alignment,
467            })
468        }
469    }
470
471    /// Validate size alignment
472    pub fn validate_size(&self, size: usize) -> Result<(), AlignmentError> {
473        if size.is_multiple_of(self.size_alignment) {
474            Ok(())
475        } else {
476            Err(AlignmentError::SizeMisaligned {
477                actual: size,
478                required: self.size_alignment,
479            })
480        }
481    }
482
483    /// Round up size to alignment
484    pub fn align_size(&self, size: usize) -> usize {
485        size.div_ceil(self.size_alignment) * self.size_alignment
486    }
487}
488
489/// Alignment error
490#[derive(Debug)]
491pub enum AlignmentError {
492    BufferMisaligned { actual: usize, required: usize },
493    OffsetMisaligned { actual: u64, required: usize },
494    SizeMisaligned { actual: usize, required: usize },
495}
496
497impl std::fmt::Display for AlignmentError {
498    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499        match self {
500            AlignmentError::BufferMisaligned { actual, required } => {
501                write!(
502                    f,
503                    "Buffer misaligned: offset {} not multiple of {}",
504                    actual, required
505                )
506            }
507            AlignmentError::OffsetMisaligned { actual, required } => {
508                write!(
509                    f,
510                    "Offset {} not aligned to {} bytes",
511                    actual, required
512                )
513            }
514            AlignmentError::SizeMisaligned { actual, required } => {
515                write!(
516                    f,
517                    "Size {} not aligned to {} bytes",
518                    actual, required
519                )
520            }
521        }
522    }
523}
524
525impl std::error::Error for AlignmentError {}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530
531    #[test]
532    fn test_cache_partition_allocation() {
533        let partition = CachePartition::new("test", 1024);
534
535        assert!(partition.try_allocate(512));
536        assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 512);
537
538        assert!(partition.try_allocate(512));
539        assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 1024);
540
541        // Should fail - over capacity
542        assert!(!partition.try_allocate(1));
543
544        // Release and try again
545        partition.release(512);
546        assert!(partition.try_allocate(512));
547    }
548
549    #[test]
550    fn test_partition_stats() {
551        let partition = CachePartition::new("test", 1000);
552        
553        partition.try_allocate(500);
554        partition.record_hit();
555        partition.record_hit();
556        partition.record_miss();
557
558        let stats = partition.stats();
559        assert_eq!(stats.current_bytes, 500);
560        assert_eq!(stats.hits, 2);
561        assert_eq!(stats.misses, 1);
562        assert!((stats.hit_rate - 0.666).abs() < 0.01);
563        assert!((stats.utilization - 0.5).abs() < 0.01);
564    }
565
566    #[test]
567    fn test_direct_io_decision() {
568        let manager = IoIsolationManager::new(IoIsolationConfig::default());
569
570        // Compaction always uses direct I/O
571        assert!(manager.should_use_direct_io(
572            IoWorkloadType::Compaction,
573            AccessPattern::SequentialScan,
574            1024
575        ));
576
577        // Query with small size uses buffered
578        assert!(!manager.should_use_direct_io(
579            IoWorkloadType::Query,
580            AccessPattern::RandomRead,
581            4096
582        ));
583
584        // Query with large size and low reuse probability uses direct
585        assert!(manager.should_use_direct_io(
586            IoWorkloadType::Query,
587            AccessPattern::SequentialScan,
588            100 * 1024 * 1024 // 100MB
589        ));
590    }
591
592    #[test]
593    fn test_alignment_contract() {
594        let contract = AlignmentContract::platform_default();
595
596        // Valid alignment
597        assert!(contract.validate_offset(4096).is_ok());
598        assert!(contract.validate_size(4096).is_ok());
599
600        // Invalid alignment (on most platforms)
601        if contract.offset_alignment > 1 {
602            assert!(contract.validate_offset(1).is_err());
603        }
604
605        // Align size
606        let aligned = contract.align_size(5000);
607        assert!(aligned >= 5000);
608        assert!(aligned.is_multiple_of(contract.size_alignment));
609    }
610}