Skip to main content

sochdb_storage/
direct_io.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Direct I/O Support for Cache-Bypass Scenarios
19//!
20//! Implements O_DIRECT path for SSTable reads to prevent page cache pollution
21//! during large scans, compaction, and backup operations.
22//!
23//! ## jj.md Task 14: Direct I/O
24//!
25//! Goals:
26//! - Prevent cache pollution from large scans
27//! - Predictable latency (no page cache contention)
28//! - Better control of memory usage
29//!
30//! ## When to Use Direct I/O
31//!
32//! - Full table scans (would evict entire cache)
33//! - Compaction reads (data processed once)
34//! - Backup operations (one-time sequential read)
35//! - User-configured per-query
36//!
37//! ## Platform Support
38//!
39//! - Linux: O_DIRECT flag
40//! - macOS: F_NOCACHE via fcntl
41//! - Windows: FILE_FLAG_NO_BUFFERING (not implemented yet)
42
43use std::alloc::{Layout, alloc, dealloc};
44use std::fs::{File, OpenOptions};
45use std::io::{self, Read, Seek, SeekFrom};
46use std::path::Path;
47
48/// Alignment required for Direct I/O
49#[cfg(target_os = "linux")]
50pub const DIRECT_IO_ALIGNMENT: usize = 512;
51
52#[cfg(target_os = "macos")]
53pub const DIRECT_IO_ALIGNMENT: usize = 4096;
54
55#[cfg(not(any(target_os = "linux", target_os = "macos")))]
56pub const DIRECT_IO_ALIGNMENT: usize = 4096;
57
58/// Page size for Direct I/O buffers
59pub const PAGE_SIZE: usize = 4096;
60
61/// Direct I/O mode configuration
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum DirectIoMode {
64    /// Use buffered I/O (default)
65    #[default]
66    Buffered,
67    /// Use Direct I/O (bypass page cache)
68    Direct,
69    /// Auto-detect based on access pattern
70    Auto,
71}
72
73/// Configuration for Direct I/O operations
74#[derive(Debug, Clone)]
75pub struct DirectIoConfig {
76    /// I/O mode
77    pub mode: DirectIoMode,
78    /// Buffer size for Direct I/O reads (must be aligned)
79    pub buffer_size: usize,
80    /// Threshold for switching to Direct I/O in Auto mode (bytes)
81    pub auto_threshold: usize,
82}
83
84impl Default for DirectIoConfig {
85    fn default() -> Self {
86        Self {
87            mode: DirectIoMode::Buffered,
88            buffer_size: 256 * 1024,          // 256KB
89            auto_threshold: 64 * 1024 * 1024, // 64MB
90        }
91    }
92}
93
94impl DirectIoConfig {
95    /// Create a Direct I/O config
96    pub fn direct() -> Self {
97        Self {
98            mode: DirectIoMode::Direct,
99            ..Default::default()
100        }
101    }
102
103    /// Create an auto-detect config
104    pub fn auto() -> Self {
105        Self {
106            mode: DirectIoMode::Auto,
107            ..Default::default()
108        }
109    }
110}
111
112/// Aligned buffer for Direct I/O operations
113///
114/// Direct I/O requires buffers to be aligned to the filesystem block size
115/// (typically 512 bytes or 4KB).
116pub struct AlignedBuffer {
117    ptr: *mut u8,
118    len: usize,
119    capacity: usize,
120    alignment: usize,
121}
122
123impl AlignedBuffer {
124    /// Create a new aligned buffer
125    pub fn new(capacity: usize) -> Self {
126        Self::with_alignment(capacity, DIRECT_IO_ALIGNMENT)
127    }
128
129    /// Create with specific alignment
130    pub fn with_alignment(capacity: usize, alignment: usize) -> Self {
131        // Round up capacity to alignment boundary
132        let aligned_capacity = capacity.div_ceil(alignment) * alignment;
133
134        let layout =
135            Layout::from_size_align(aligned_capacity, alignment).expect("Invalid alignment");
136
137        let ptr = unsafe { alloc(layout) };
138        if ptr.is_null() {
139            panic!("Failed to allocate aligned buffer");
140        }
141
142        Self {
143            ptr,
144            len: 0,
145            capacity: aligned_capacity,
146            alignment,
147        }
148    }
149
150    /// Get the buffer contents
151    pub fn as_slice(&self) -> &[u8] {
152        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
153    }
154
155    /// Get mutable buffer contents
156    pub fn as_mut_slice(&mut self) -> &mut [u8] {
157        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.capacity) }
158    }
159
160    /// Set the length of valid data
161    pub fn set_len(&mut self, len: usize) {
162        assert!(len <= self.capacity);
163        self.len = len;
164    }
165
166    /// Get capacity
167    pub fn capacity(&self) -> usize {
168        self.capacity
169    }
170
171    /// Get length
172    pub fn len(&self) -> usize {
173        self.len
174    }
175
176    /// Check if empty
177    pub fn is_empty(&self) -> bool {
178        self.len == 0
179    }
180
181    /// Get alignment
182    pub fn alignment(&self) -> usize {
183        self.alignment
184    }
185
186    /// Check if pointer is properly aligned
187    pub fn is_aligned(&self) -> bool {
188        (self.ptr as usize).is_multiple_of(self.alignment)
189    }
190}
191
192impl Drop for AlignedBuffer {
193    fn drop(&mut self) {
194        let layout = Layout::from_size_align(self.capacity, self.alignment)
195            .expect("Invalid alignment in drop");
196        unsafe { dealloc(self.ptr, layout) };
197    }
198}
199
200// AlignedBuffer is Send + Sync because the pointer is exclusively owned
201unsafe impl Send for AlignedBuffer {}
202unsafe impl Sync for AlignedBuffer {}
203
204/// Open a file with Direct I/O enabled (if supported)
205#[cfg(target_os = "linux")]
206pub fn open_direct(path: &Path) -> io::Result<File> {
207    use std::os::unix::fs::OpenOptionsExt;
208
209    OpenOptions::new()
210        .read(true)
211        .custom_flags(libc::O_DIRECT)
212        .open(path)
213}
214
215#[cfg(target_os = "macos")]
216pub fn open_direct(path: &Path) -> io::Result<File> {
217    let file = OpenOptions::new().read(true).open(path)?;
218
219    // On macOS, we use F_NOCACHE to disable caching
220    unsafe {
221        let fd = std::os::unix::io::AsRawFd::as_raw_fd(&file);
222        if libc::fcntl(fd, libc::F_NOCACHE, 1) == -1 {
223            return Err(io::Error::last_os_error());
224        }
225    }
226
227    Ok(file)
228}
229
230#[cfg(not(any(target_os = "linux", target_os = "macos")))]
231pub fn open_direct(path: &Path) -> io::Result<File> {
232    // Fallback to buffered I/O on unsupported platforms
233    OpenOptions::new().read(true).open(path)
234}
235
236/// Open a file for Direct I/O writing
237#[cfg(target_os = "linux")]
238pub fn open_direct_write(path: &Path) -> io::Result<File> {
239    use std::os::unix::fs::OpenOptionsExt;
240
241    OpenOptions::new()
242        .write(true)
243        .create(true)
244        .truncate(true)
245        .custom_flags(libc::O_DIRECT)
246        .open(path)
247}
248
249#[cfg(target_os = "macos")]
250pub fn open_direct_write(path: &Path) -> io::Result<File> {
251    let file = OpenOptions::new()
252        .write(true)
253        .create(true)
254        .truncate(true)
255        .open(path)?;
256
257    unsafe {
258        let fd = std::os::unix::io::AsRawFd::as_raw_fd(&file);
259        if libc::fcntl(fd, libc::F_NOCACHE, 1) == -1 {
260            return Err(io::Error::last_os_error());
261        }
262    }
263
264    Ok(file)
265}
266
267#[cfg(not(any(target_os = "linux", target_os = "macos")))]
268pub fn open_direct_write(path: &Path) -> io::Result<File> {
269    OpenOptions::new()
270        .write(true)
271        .create(true)
272        .truncate(true)
273        .open(path)
274}
275
276/// Direct I/O reader with aligned buffers
277pub struct DirectReader {
278    file: File,
279    buffer: AlignedBuffer,
280    file_offset: u64,
281    buffer_offset: usize,
282    buffer_valid: usize,
283    file_size: u64,
284}
285
286impl DirectReader {
287    /// Create a new Direct I/O reader
288    pub fn open(path: &Path, buffer_size: usize) -> io::Result<Self> {
289        let file = open_direct(path)?;
290        let file_size = file.metadata()?.len();
291
292        Ok(Self {
293            file,
294            buffer: AlignedBuffer::new(buffer_size),
295            file_offset: 0,
296            buffer_offset: 0,
297            buffer_valid: 0,
298            file_size,
299        })
300    }
301
302    /// Create from an existing file
303    pub fn from_file(file: File, buffer_size: usize) -> io::Result<Self> {
304        let file_size = file.metadata()?.len();
305
306        Ok(Self {
307            file,
308            buffer: AlignedBuffer::new(buffer_size),
309            file_offset: 0,
310            buffer_offset: 0,
311            buffer_valid: 0,
312            file_size,
313        })
314    }
315
316    /// Get the file size
317    pub fn file_size(&self) -> u64 {
318        self.file_size
319    }
320
321    /// Refill the buffer from the file
322    fn refill_buffer(&mut self) -> io::Result<usize> {
323        // Align the file offset
324        let aligned_offset =
325            (self.file_offset / DIRECT_IO_ALIGNMENT as u64) * DIRECT_IO_ALIGNMENT as u64;
326        let skip = (self.file_offset - aligned_offset) as usize;
327
328        self.file.seek(SeekFrom::Start(aligned_offset))?;
329
330        let buf = self.buffer.as_mut_slice();
331        let bytes_read = self.file.read(buf)?;
332
333        self.buffer.set_len(bytes_read);
334        self.buffer_valid = bytes_read;
335        self.buffer_offset = skip;
336
337        Ok(bytes_read.saturating_sub(skip))
338    }
339}
340
341impl Read for DirectReader {
342    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
343        let mut total_read = 0;
344
345        while total_read < buf.len() {
346            // Check if we have data in buffer
347            let available = self.buffer_valid.saturating_sub(self.buffer_offset);
348
349            if available == 0 {
350                // Buffer exhausted, refill
351                if self.file_offset >= self.file_size {
352                    break; // EOF
353                }
354                let refilled = self.refill_buffer()?;
355                if refilled == 0 {
356                    break; // EOF
357                }
358                continue;
359            }
360
361            // Copy from buffer
362            let to_copy = (buf.len() - total_read).min(available);
363            let src = &self.buffer.as_slice()[self.buffer_offset..self.buffer_offset + to_copy];
364            buf[total_read..total_read + to_copy].copy_from_slice(src);
365
366            self.buffer_offset += to_copy;
367            self.file_offset += to_copy as u64;
368            total_read += to_copy;
369        }
370
371        Ok(total_read)
372    }
373}
374
375impl Seek for DirectReader {
376    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
377        let new_pos = match pos {
378            SeekFrom::Start(p) => p,
379            SeekFrom::End(p) => {
380                if p >= 0 {
381                    self.file_size + p as u64
382                } else {
383                    self.file_size.saturating_sub((-p) as u64)
384                }
385            }
386            SeekFrom::Current(p) => {
387                if p >= 0 {
388                    self.file_offset + p as u64
389                } else {
390                    self.file_offset.saturating_sub((-p) as u64)
391                }
392            }
393        };
394
395        // Invalidate buffer if seeking outside buffered range
396        let buffer_start = self.file_offset - self.buffer_offset as u64;
397        let buffer_end = buffer_start + self.buffer_valid as u64;
398
399        if new_pos < buffer_start || new_pos >= buffer_end {
400            self.buffer_offset = 0;
401            self.buffer_valid = 0;
402        } else {
403            self.buffer_offset = (new_pos - buffer_start) as usize;
404        }
405
406        self.file_offset = new_pos;
407        Ok(new_pos)
408    }
409}
410
411/// Statistics for Direct I/O operations
412#[derive(Debug, Default, Clone)]
413pub struct DirectIoStats {
414    /// Total bytes read with Direct I/O
415    pub direct_bytes_read: u64,
416    /// Total bytes read with buffered I/O
417    pub buffered_bytes_read: u64,
418    /// Total bytes written with Direct I/O
419    pub direct_bytes_written: u64,
420    /// Total bytes written with buffered I/O
421    pub buffered_bytes_written: u64,
422    /// Number of Direct I/O reads
423    pub direct_reads: u64,
424    /// Number of buffered reads
425    pub buffered_reads: u64,
426}
427
428impl DirectIoStats {
429    /// Record a Direct I/O read
430    pub fn record_direct_read(&mut self, bytes: u64) {
431        self.direct_bytes_read += bytes;
432        self.direct_reads += 1;
433    }
434
435    /// Record a buffered read
436    pub fn record_buffered_read(&mut self, bytes: u64) {
437        self.buffered_bytes_read += bytes;
438        self.buffered_reads += 1;
439    }
440
441    /// Get total bytes read
442    pub fn total_bytes_read(&self) -> u64 {
443        self.direct_bytes_read + self.buffered_bytes_read
444    }
445
446    /// Get Direct I/O ratio
447    pub fn direct_io_ratio(&self) -> f64 {
448        let total = self.total_bytes_read();
449        if total == 0 {
450            0.0
451        } else {
452            self.direct_bytes_read as f64 / total as f64
453        }
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use std::io::Write;
461    use tempfile::NamedTempFile;
462
463    #[test]
464    fn test_aligned_buffer() {
465        let buf = AlignedBuffer::new(1024);
466        assert!(buf.is_aligned());
467        assert!(buf.capacity() >= 1024);
468        assert_eq!(buf.len(), 0);
469    }
470
471    #[test]
472    fn test_aligned_buffer_write_read() {
473        let mut buf = AlignedBuffer::new(100);
474        {
475            let slice = buf.as_mut_slice();
476            for (i, item) in slice.iter_mut().enumerate().take(50) {
477                *item = i as u8;
478            }
479        }
480        buf.set_len(50);
481
482        assert_eq!(buf.len(), 50);
483        assert_eq!(buf.as_slice()[0], 0);
484        assert_eq!(buf.as_slice()[49], 49);
485    }
486
487    #[test]
488    fn test_direct_io_config() {
489        let default = DirectIoConfig::default();
490        assert_eq!(default.mode, DirectIoMode::Buffered);
491
492        let direct = DirectIoConfig::direct();
493        assert_eq!(direct.mode, DirectIoMode::Direct);
494
495        let auto = DirectIoConfig::auto();
496        assert_eq!(auto.mode, DirectIoMode::Auto);
497    }
498
499    #[test]
500    fn test_direct_reader() {
501        // Create a temporary file with some data
502        let mut temp = NamedTempFile::new().unwrap();
503        let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
504        temp.write_all(&data).unwrap();
505        temp.flush().unwrap();
506
507        // Read with Direct I/O
508        let mut reader = DirectReader::open(temp.path(), 4096).unwrap();
509        let mut read_buf = vec![0u8; 10000];
510        let bytes_read = reader.read(&mut read_buf).unwrap();
511
512        assert_eq!(bytes_read, 10000);
513        assert_eq!(read_buf, data);
514    }
515
516    #[test]
517    fn test_direct_reader_seek() {
518        let mut temp = NamedTempFile::new().unwrap();
519        let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
520        temp.write_all(&data).unwrap();
521        temp.flush().unwrap();
522
523        let mut reader = DirectReader::open(temp.path(), 4096).unwrap();
524
525        // Seek to middle
526        reader.seek(SeekFrom::Start(5000)).unwrap();
527
528        let mut read_buf = vec![0u8; 100];
529        reader.read_exact(&mut read_buf).unwrap();
530
531        // Verify we read the right data
532        assert_eq!(read_buf, data[5000..5100]);
533    }
534
535    #[test]
536    fn test_direct_io_stats() {
537        let mut stats = DirectIoStats::default();
538
539        stats.record_direct_read(1000);
540        stats.record_buffered_read(500);
541
542        assert_eq!(stats.total_bytes_read(), 1500);
543        assert!((stats.direct_io_ratio() - 0.666).abs() < 0.01);
544    }
545
546    #[test]
547    fn test_open_direct() {
548        let temp = NamedTempFile::new().unwrap();
549        let result = open_direct(temp.path());
550        assert!(result.is_ok());
551    }
552}