sochdb_storage/
direct_io.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Direct I/O Support for Cache-Bypass Scenarios
16//!
17//! Implements O_DIRECT path for SSTable reads to prevent page cache pollution
18//! during large scans, compaction, and backup operations.
19//!
20//! ## jj.md Task 14: Direct I/O
21//!
22//! Goals:
23//! - Prevent cache pollution from large scans
24//! - Predictable latency (no page cache contention)
25//! - Better control of memory usage
26//!
27//! ## When to Use Direct I/O
28//!
29//! - Full table scans (would evict entire cache)
30//! - Compaction reads (data processed once)
31//! - Backup operations (one-time sequential read)
32//! - User-configured per-query
33//!
34//! ## Platform Support
35//!
36//! - Linux: O_DIRECT flag
37//! - macOS: F_NOCACHE via fcntl
38//! - Windows: FILE_FLAG_NO_BUFFERING (not implemented yet)
39
40use std::alloc::{Layout, alloc, dealloc};
41use std::fs::{File, OpenOptions};
42use std::io::{self, Read, Seek, SeekFrom};
43use std::path::Path;
44
45/// Alignment required for Direct I/O
46#[cfg(target_os = "linux")]
47pub const DIRECT_IO_ALIGNMENT: usize = 512;
48
49#[cfg(target_os = "macos")]
50pub const DIRECT_IO_ALIGNMENT: usize = 4096;
51
52#[cfg(not(any(target_os = "linux", target_os = "macos")))]
53pub const DIRECT_IO_ALIGNMENT: usize = 4096;
54
55/// Page size for Direct I/O buffers
56pub const PAGE_SIZE: usize = 4096;
57
58/// Direct I/O mode configuration
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
60pub enum DirectIoMode {
61    /// Use buffered I/O (default)
62    #[default]
63    Buffered,
64    /// Use Direct I/O (bypass page cache)
65    Direct,
66    /// Auto-detect based on access pattern
67    Auto,
68}
69
70/// Configuration for Direct I/O operations
71#[derive(Debug, Clone)]
72pub struct DirectIoConfig {
73    /// I/O mode
74    pub mode: DirectIoMode,
75    /// Buffer size for Direct I/O reads (must be aligned)
76    pub buffer_size: usize,
77    /// Threshold for switching to Direct I/O in Auto mode (bytes)
78    pub auto_threshold: usize,
79}
80
81impl Default for DirectIoConfig {
82    fn default() -> Self {
83        Self {
84            mode: DirectIoMode::Buffered,
85            buffer_size: 256 * 1024,          // 256KB
86            auto_threshold: 64 * 1024 * 1024, // 64MB
87        }
88    }
89}
90
91impl DirectIoConfig {
92    /// Create a Direct I/O config
93    pub fn direct() -> Self {
94        Self {
95            mode: DirectIoMode::Direct,
96            ..Default::default()
97        }
98    }
99
100    /// Create an auto-detect config
101    pub fn auto() -> Self {
102        Self {
103            mode: DirectIoMode::Auto,
104            ..Default::default()
105        }
106    }
107}
108
109/// Aligned buffer for Direct I/O operations
110///
111/// Direct I/O requires buffers to be aligned to the filesystem block size
112/// (typically 512 bytes or 4KB).
113pub struct AlignedBuffer {
114    ptr: *mut u8,
115    len: usize,
116    capacity: usize,
117    alignment: usize,
118}
119
120impl AlignedBuffer {
121    /// Create a new aligned buffer
122    pub fn new(capacity: usize) -> Self {
123        Self::with_alignment(capacity, DIRECT_IO_ALIGNMENT)
124    }
125
126    /// Create with specific alignment
127    pub fn with_alignment(capacity: usize, alignment: usize) -> Self {
128        // Round up capacity to alignment boundary
129        let aligned_capacity = capacity.div_ceil(alignment) * alignment;
130
131        let layout =
132            Layout::from_size_align(aligned_capacity, alignment).expect("Invalid alignment");
133
134        let ptr = unsafe { alloc(layout) };
135        if ptr.is_null() {
136            panic!("Failed to allocate aligned buffer");
137        }
138
139        Self {
140            ptr,
141            len: 0,
142            capacity: aligned_capacity,
143            alignment,
144        }
145    }
146
147    /// Get the buffer contents
148    pub fn as_slice(&self) -> &[u8] {
149        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
150    }
151
152    /// Get mutable buffer contents
153    pub fn as_mut_slice(&mut self) -> &mut [u8] {
154        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.capacity) }
155    }
156
157    /// Set the length of valid data
158    pub fn set_len(&mut self, len: usize) {
159        assert!(len <= self.capacity);
160        self.len = len;
161    }
162
163    /// Get capacity
164    pub fn capacity(&self) -> usize {
165        self.capacity
166    }
167
168    /// Get length
169    pub fn len(&self) -> usize {
170        self.len
171    }
172
173    /// Check if empty
174    pub fn is_empty(&self) -> bool {
175        self.len == 0
176    }
177
178    /// Get alignment
179    pub fn alignment(&self) -> usize {
180        self.alignment
181    }
182
183    /// Check if pointer is properly aligned
184    pub fn is_aligned(&self) -> bool {
185        (self.ptr as usize).is_multiple_of(self.alignment)
186    }
187}
188
189impl Drop for AlignedBuffer {
190    fn drop(&mut self) {
191        let layout = Layout::from_size_align(self.capacity, self.alignment)
192            .expect("Invalid alignment in drop");
193        unsafe { dealloc(self.ptr, layout) };
194    }
195}
196
197// AlignedBuffer is Send + Sync because the pointer is exclusively owned
198unsafe impl Send for AlignedBuffer {}
199unsafe impl Sync for AlignedBuffer {}
200
201/// Open a file with Direct I/O enabled (if supported)
202#[cfg(target_os = "linux")]
203pub fn open_direct(path: &Path) -> io::Result<File> {
204    use std::os::unix::fs::OpenOptionsExt;
205
206    OpenOptions::new()
207        .read(true)
208        .custom_flags(libc::O_DIRECT)
209        .open(path)
210}
211
212#[cfg(target_os = "macos")]
213pub fn open_direct(path: &Path) -> io::Result<File> {
214    let file = OpenOptions::new().read(true).open(path)?;
215
216    // On macOS, we use F_NOCACHE to disable caching
217    unsafe {
218        let fd = std::os::unix::io::AsRawFd::as_raw_fd(&file);
219        if libc::fcntl(fd, libc::F_NOCACHE, 1) == -1 {
220            return Err(io::Error::last_os_error());
221        }
222    }
223
224    Ok(file)
225}
226
227#[cfg(not(any(target_os = "linux", target_os = "macos")))]
228pub fn open_direct(path: &Path) -> io::Result<File> {
229    // Fallback to buffered I/O on unsupported platforms
230    OpenOptions::new().read(true).open(path)
231}
232
233/// Open a file for Direct I/O writing
234#[cfg(target_os = "linux")]
235pub fn open_direct_write(path: &Path) -> io::Result<File> {
236    use std::os::unix::fs::OpenOptionsExt;
237
238    OpenOptions::new()
239        .write(true)
240        .create(true)
241        .truncate(true)
242        .custom_flags(libc::O_DIRECT)
243        .open(path)
244}
245
246#[cfg(target_os = "macos")]
247pub fn open_direct_write(path: &Path) -> io::Result<File> {
248    let file = OpenOptions::new()
249        .write(true)
250        .create(true)
251        .truncate(true)
252        .open(path)?;
253
254    unsafe {
255        let fd = std::os::unix::io::AsRawFd::as_raw_fd(&file);
256        if libc::fcntl(fd, libc::F_NOCACHE, 1) == -1 {
257            return Err(io::Error::last_os_error());
258        }
259    }
260
261    Ok(file)
262}
263
264#[cfg(not(any(target_os = "linux", target_os = "macos")))]
265pub fn open_direct_write(path: &Path) -> io::Result<File> {
266    OpenOptions::new()
267        .write(true)
268        .create(true)
269        .truncate(true)
270        .open(path)
271}
272
273/// Direct I/O reader with aligned buffers
274pub struct DirectReader {
275    file: File,
276    buffer: AlignedBuffer,
277    file_offset: u64,
278    buffer_offset: usize,
279    buffer_valid: usize,
280    file_size: u64,
281}
282
283impl DirectReader {
284    /// Create a new Direct I/O reader
285    pub fn open(path: &Path, buffer_size: usize) -> io::Result<Self> {
286        let file = open_direct(path)?;
287        let file_size = file.metadata()?.len();
288
289        Ok(Self {
290            file,
291            buffer: AlignedBuffer::new(buffer_size),
292            file_offset: 0,
293            buffer_offset: 0,
294            buffer_valid: 0,
295            file_size,
296        })
297    }
298
299    /// Create from an existing file
300    pub fn from_file(file: File, buffer_size: usize) -> io::Result<Self> {
301        let file_size = file.metadata()?.len();
302
303        Ok(Self {
304            file,
305            buffer: AlignedBuffer::new(buffer_size),
306            file_offset: 0,
307            buffer_offset: 0,
308            buffer_valid: 0,
309            file_size,
310        })
311    }
312
313    /// Get the file size
314    pub fn file_size(&self) -> u64 {
315        self.file_size
316    }
317
318    /// Refill the buffer from the file
319    fn refill_buffer(&mut self) -> io::Result<usize> {
320        // Align the file offset
321        let aligned_offset =
322            (self.file_offset / DIRECT_IO_ALIGNMENT as u64) * DIRECT_IO_ALIGNMENT as u64;
323        let skip = (self.file_offset - aligned_offset) as usize;
324
325        self.file.seek(SeekFrom::Start(aligned_offset))?;
326
327        let buf = self.buffer.as_mut_slice();
328        let bytes_read = self.file.read(buf)?;
329
330        self.buffer.set_len(bytes_read);
331        self.buffer_valid = bytes_read;
332        self.buffer_offset = skip;
333
334        Ok(bytes_read.saturating_sub(skip))
335    }
336}
337
338impl Read for DirectReader {
339    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
340        let mut total_read = 0;
341
342        while total_read < buf.len() {
343            // Check if we have data in buffer
344            let available = self.buffer_valid.saturating_sub(self.buffer_offset);
345
346            if available == 0 {
347                // Buffer exhausted, refill
348                if self.file_offset >= self.file_size {
349                    break; // EOF
350                }
351                let refilled = self.refill_buffer()?;
352                if refilled == 0 {
353                    break; // EOF
354                }
355                continue;
356            }
357
358            // Copy from buffer
359            let to_copy = (buf.len() - total_read).min(available);
360            let src = &self.buffer.as_slice()[self.buffer_offset..self.buffer_offset + to_copy];
361            buf[total_read..total_read + to_copy].copy_from_slice(src);
362
363            self.buffer_offset += to_copy;
364            self.file_offset += to_copy as u64;
365            total_read += to_copy;
366        }
367
368        Ok(total_read)
369    }
370}
371
372impl Seek for DirectReader {
373    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
374        let new_pos = match pos {
375            SeekFrom::Start(p) => p,
376            SeekFrom::End(p) => {
377                if p >= 0 {
378                    self.file_size + p as u64
379                } else {
380                    self.file_size.saturating_sub((-p) as u64)
381                }
382            }
383            SeekFrom::Current(p) => {
384                if p >= 0 {
385                    self.file_offset + p as u64
386                } else {
387                    self.file_offset.saturating_sub((-p) as u64)
388                }
389            }
390        };
391
392        // Invalidate buffer if seeking outside buffered range
393        let buffer_start = self.file_offset - self.buffer_offset as u64;
394        let buffer_end = buffer_start + self.buffer_valid as u64;
395
396        if new_pos < buffer_start || new_pos >= buffer_end {
397            self.buffer_offset = 0;
398            self.buffer_valid = 0;
399        } else {
400            self.buffer_offset = (new_pos - buffer_start) as usize;
401        }
402
403        self.file_offset = new_pos;
404        Ok(new_pos)
405    }
406}
407
408/// Statistics for Direct I/O operations
409#[derive(Debug, Default, Clone)]
410pub struct DirectIoStats {
411    /// Total bytes read with Direct I/O
412    pub direct_bytes_read: u64,
413    /// Total bytes read with buffered I/O
414    pub buffered_bytes_read: u64,
415    /// Total bytes written with Direct I/O
416    pub direct_bytes_written: u64,
417    /// Total bytes written with buffered I/O
418    pub buffered_bytes_written: u64,
419    /// Number of Direct I/O reads
420    pub direct_reads: u64,
421    /// Number of buffered reads
422    pub buffered_reads: u64,
423}
424
425impl DirectIoStats {
426    /// Record a Direct I/O read
427    pub fn record_direct_read(&mut self, bytes: u64) {
428        self.direct_bytes_read += bytes;
429        self.direct_reads += 1;
430    }
431
432    /// Record a buffered read
433    pub fn record_buffered_read(&mut self, bytes: u64) {
434        self.buffered_bytes_read += bytes;
435        self.buffered_reads += 1;
436    }
437
438    /// Get total bytes read
439    pub fn total_bytes_read(&self) -> u64 {
440        self.direct_bytes_read + self.buffered_bytes_read
441    }
442
443    /// Get Direct I/O ratio
444    pub fn direct_io_ratio(&self) -> f64 {
445        let total = self.total_bytes_read();
446        if total == 0 {
447            0.0
448        } else {
449            self.direct_bytes_read as f64 / total as f64
450        }
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use std::io::Write;
458    use tempfile::NamedTempFile;
459
460    #[test]
461    fn test_aligned_buffer() {
462        let buf = AlignedBuffer::new(1024);
463        assert!(buf.is_aligned());
464        assert!(buf.capacity() >= 1024);
465        assert_eq!(buf.len(), 0);
466    }
467
468    #[test]
469    fn test_aligned_buffer_write_read() {
470        let mut buf = AlignedBuffer::new(100);
471        {
472            let slice = buf.as_mut_slice();
473            for (i, item) in slice.iter_mut().enumerate().take(50) {
474                *item = i as u8;
475            }
476        }
477        buf.set_len(50);
478
479        assert_eq!(buf.len(), 50);
480        assert_eq!(buf.as_slice()[0], 0);
481        assert_eq!(buf.as_slice()[49], 49);
482    }
483
484    #[test]
485    fn test_direct_io_config() {
486        let default = DirectIoConfig::default();
487        assert_eq!(default.mode, DirectIoMode::Buffered);
488
489        let direct = DirectIoConfig::direct();
490        assert_eq!(direct.mode, DirectIoMode::Direct);
491
492        let auto = DirectIoConfig::auto();
493        assert_eq!(auto.mode, DirectIoMode::Auto);
494    }
495
496    #[test]
497    fn test_direct_reader() {
498        // Create a temporary file with some data
499        let mut temp = NamedTempFile::new().unwrap();
500        let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
501        temp.write_all(&data).unwrap();
502        temp.flush().unwrap();
503
504        // Read with Direct I/O
505        let mut reader = DirectReader::open(temp.path(), 4096).unwrap();
506        let mut read_buf = vec![0u8; 10000];
507        let bytes_read = reader.read(&mut read_buf).unwrap();
508
509        assert_eq!(bytes_read, 10000);
510        assert_eq!(read_buf, data);
511    }
512
513    #[test]
514    fn test_direct_reader_seek() {
515        let mut temp = NamedTempFile::new().unwrap();
516        let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
517        temp.write_all(&data).unwrap();
518        temp.flush().unwrap();
519
520        let mut reader = DirectReader::open(temp.path(), 4096).unwrap();
521
522        // Seek to middle
523        reader.seek(SeekFrom::Start(5000)).unwrap();
524
525        let mut read_buf = vec![0u8; 100];
526        reader.read_exact(&mut read_buf).unwrap();
527
528        // Verify we read the right data
529        assert_eq!(read_buf, data[5000..5100]);
530    }
531
532    #[test]
533    fn test_direct_io_stats() {
534        let mut stats = DirectIoStats::default();
535
536        stats.record_direct_read(1000);
537        stats.record_buffered_read(500);
538
539        assert_eq!(stats.total_bytes_read(), 1500);
540        assert!((stats.direct_io_ratio() - 0.666).abs() < 0.01);
541    }
542
543    #[test]
544    fn test_open_direct() {
545        let temp = NamedTempFile::new().unwrap();
546        let result = open_direct(temp.path());
547        assert!(result.is_ok());
548    }
549}