1use std::alloc::{Layout, alloc, dealloc};
44use std::fs::{File, OpenOptions};
45use std::io::{self, Read, Seek, SeekFrom};
46use std::path::Path;
47
48#[cfg(target_os = "linux")]
50pub const DIRECT_IO_ALIGNMENT: usize = 512;
51
52#[cfg(target_os = "macos")]
53pub const DIRECT_IO_ALIGNMENT: usize = 4096;
54
55#[cfg(not(any(target_os = "linux", target_os = "macos")))]
56pub const DIRECT_IO_ALIGNMENT: usize = 4096;
57
58pub const PAGE_SIZE: usize = 4096;
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum DirectIoMode {
64 #[default]
66 Buffered,
67 Direct,
69 Auto,
71}
72
73#[derive(Debug, Clone)]
75pub struct DirectIoConfig {
76 pub mode: DirectIoMode,
78 pub buffer_size: usize,
80 pub auto_threshold: usize,
82}
83
84impl Default for DirectIoConfig {
85 fn default() -> Self {
86 Self {
87 mode: DirectIoMode::Buffered,
88 buffer_size: 256 * 1024, auto_threshold: 64 * 1024 * 1024, }
91 }
92}
93
94impl DirectIoConfig {
95 pub fn direct() -> Self {
97 Self {
98 mode: DirectIoMode::Direct,
99 ..Default::default()
100 }
101 }
102
103 pub fn auto() -> Self {
105 Self {
106 mode: DirectIoMode::Auto,
107 ..Default::default()
108 }
109 }
110}
111
112pub struct AlignedBuffer {
117 ptr: *mut u8,
118 len: usize,
119 capacity: usize,
120 alignment: usize,
121}
122
123impl AlignedBuffer {
124 pub fn new(capacity: usize) -> Self {
126 Self::with_alignment(capacity, DIRECT_IO_ALIGNMENT)
127 }
128
129 pub fn with_alignment(capacity: usize, alignment: usize) -> Self {
131 let aligned_capacity = capacity.div_ceil(alignment) * alignment;
133
134 let layout =
135 Layout::from_size_align(aligned_capacity, alignment).expect("Invalid alignment");
136
137 let ptr = unsafe { alloc(layout) };
138 if ptr.is_null() {
139 panic!("Failed to allocate aligned buffer");
140 }
141
142 Self {
143 ptr,
144 len: 0,
145 capacity: aligned_capacity,
146 alignment,
147 }
148 }
149
150 pub fn as_slice(&self) -> &[u8] {
152 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
153 }
154
155 pub fn as_mut_slice(&mut self) -> &mut [u8] {
157 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.capacity) }
158 }
159
160 pub fn set_len(&mut self, len: usize) {
162 assert!(len <= self.capacity);
163 self.len = len;
164 }
165
166 pub fn capacity(&self) -> usize {
168 self.capacity
169 }
170
171 pub fn len(&self) -> usize {
173 self.len
174 }
175
176 pub fn is_empty(&self) -> bool {
178 self.len == 0
179 }
180
181 pub fn alignment(&self) -> usize {
183 self.alignment
184 }
185
186 pub fn is_aligned(&self) -> bool {
188 (self.ptr as usize).is_multiple_of(self.alignment)
189 }
190}
191
192impl Drop for AlignedBuffer {
193 fn drop(&mut self) {
194 let layout = Layout::from_size_align(self.capacity, self.alignment)
195 .expect("Invalid alignment in drop");
196 unsafe { dealloc(self.ptr, layout) };
197 }
198}
199
200unsafe impl Send for AlignedBuffer {}
202unsafe impl Sync for AlignedBuffer {}
203
204#[cfg(target_os = "linux")]
206pub fn open_direct(path: &Path) -> io::Result<File> {
207 use std::os::unix::fs::OpenOptionsExt;
208
209 OpenOptions::new()
210 .read(true)
211 .custom_flags(libc::O_DIRECT)
212 .open(path)
213}
214
215#[cfg(target_os = "macos")]
216pub fn open_direct(path: &Path) -> io::Result<File> {
217 let file = OpenOptions::new().read(true).open(path)?;
218
219 unsafe {
221 let fd = std::os::unix::io::AsRawFd::as_raw_fd(&file);
222 if libc::fcntl(fd, libc::F_NOCACHE, 1) == -1 {
223 return Err(io::Error::last_os_error());
224 }
225 }
226
227 Ok(file)
228}
229
230#[cfg(not(any(target_os = "linux", target_os = "macos")))]
231pub fn open_direct(path: &Path) -> io::Result<File> {
232 OpenOptions::new().read(true).open(path)
234}
235
236#[cfg(target_os = "linux")]
238pub fn open_direct_write(path: &Path) -> io::Result<File> {
239 use std::os::unix::fs::OpenOptionsExt;
240
241 OpenOptions::new()
242 .write(true)
243 .create(true)
244 .truncate(true)
245 .custom_flags(libc::O_DIRECT)
246 .open(path)
247}
248
249#[cfg(target_os = "macos")]
250pub fn open_direct_write(path: &Path) -> io::Result<File> {
251 let file = OpenOptions::new()
252 .write(true)
253 .create(true)
254 .truncate(true)
255 .open(path)?;
256
257 unsafe {
258 let fd = std::os::unix::io::AsRawFd::as_raw_fd(&file);
259 if libc::fcntl(fd, libc::F_NOCACHE, 1) == -1 {
260 return Err(io::Error::last_os_error());
261 }
262 }
263
264 Ok(file)
265}
266
267#[cfg(not(any(target_os = "linux", target_os = "macos")))]
268pub fn open_direct_write(path: &Path) -> io::Result<File> {
269 OpenOptions::new()
270 .write(true)
271 .create(true)
272 .truncate(true)
273 .open(path)
274}
275
276pub struct DirectReader {
278 file: File,
279 buffer: AlignedBuffer,
280 file_offset: u64,
281 buffer_offset: usize,
282 buffer_valid: usize,
283 file_size: u64,
284}
285
286impl DirectReader {
287 pub fn open(path: &Path, buffer_size: usize) -> io::Result<Self> {
289 let file = open_direct(path)?;
290 let file_size = file.metadata()?.len();
291
292 Ok(Self {
293 file,
294 buffer: AlignedBuffer::new(buffer_size),
295 file_offset: 0,
296 buffer_offset: 0,
297 buffer_valid: 0,
298 file_size,
299 })
300 }
301
302 pub fn from_file(file: File, buffer_size: usize) -> io::Result<Self> {
304 let file_size = file.metadata()?.len();
305
306 Ok(Self {
307 file,
308 buffer: AlignedBuffer::new(buffer_size),
309 file_offset: 0,
310 buffer_offset: 0,
311 buffer_valid: 0,
312 file_size,
313 })
314 }
315
316 pub fn file_size(&self) -> u64 {
318 self.file_size
319 }
320
321 fn refill_buffer(&mut self) -> io::Result<usize> {
323 let aligned_offset =
325 (self.file_offset / DIRECT_IO_ALIGNMENT as u64) * DIRECT_IO_ALIGNMENT as u64;
326 let skip = (self.file_offset - aligned_offset) as usize;
327
328 self.file.seek(SeekFrom::Start(aligned_offset))?;
329
330 let buf = self.buffer.as_mut_slice();
331 let bytes_read = self.file.read(buf)?;
332
333 self.buffer.set_len(bytes_read);
334 self.buffer_valid = bytes_read;
335 self.buffer_offset = skip;
336
337 Ok(bytes_read.saturating_sub(skip))
338 }
339}
340
341impl Read for DirectReader {
342 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
343 let mut total_read = 0;
344
345 while total_read < buf.len() {
346 let available = self.buffer_valid.saturating_sub(self.buffer_offset);
348
349 if available == 0 {
350 if self.file_offset >= self.file_size {
352 break; }
354 let refilled = self.refill_buffer()?;
355 if refilled == 0 {
356 break; }
358 continue;
359 }
360
361 let to_copy = (buf.len() - total_read).min(available);
363 let src = &self.buffer.as_slice()[self.buffer_offset..self.buffer_offset + to_copy];
364 buf[total_read..total_read + to_copy].copy_from_slice(src);
365
366 self.buffer_offset += to_copy;
367 self.file_offset += to_copy as u64;
368 total_read += to_copy;
369 }
370
371 Ok(total_read)
372 }
373}
374
375impl Seek for DirectReader {
376 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
377 let new_pos = match pos {
378 SeekFrom::Start(p) => p,
379 SeekFrom::End(p) => {
380 if p >= 0 {
381 self.file_size + p as u64
382 } else {
383 self.file_size.saturating_sub((-p) as u64)
384 }
385 }
386 SeekFrom::Current(p) => {
387 if p >= 0 {
388 self.file_offset + p as u64
389 } else {
390 self.file_offset.saturating_sub((-p) as u64)
391 }
392 }
393 };
394
395 let buffer_start = self.file_offset - self.buffer_offset as u64;
397 let buffer_end = buffer_start + self.buffer_valid as u64;
398
399 if new_pos < buffer_start || new_pos >= buffer_end {
400 self.buffer_offset = 0;
401 self.buffer_valid = 0;
402 } else {
403 self.buffer_offset = (new_pos - buffer_start) as usize;
404 }
405
406 self.file_offset = new_pos;
407 Ok(new_pos)
408 }
409}
410
411#[derive(Debug, Default, Clone)]
413pub struct DirectIoStats {
414 pub direct_bytes_read: u64,
416 pub buffered_bytes_read: u64,
418 pub direct_bytes_written: u64,
420 pub buffered_bytes_written: u64,
422 pub direct_reads: u64,
424 pub buffered_reads: u64,
426}
427
428impl DirectIoStats {
429 pub fn record_direct_read(&mut self, bytes: u64) {
431 self.direct_bytes_read += bytes;
432 self.direct_reads += 1;
433 }
434
435 pub fn record_buffered_read(&mut self, bytes: u64) {
437 self.buffered_bytes_read += bytes;
438 self.buffered_reads += 1;
439 }
440
441 pub fn total_bytes_read(&self) -> u64 {
443 self.direct_bytes_read + self.buffered_bytes_read
444 }
445
446 pub fn direct_io_ratio(&self) -> f64 {
448 let total = self.total_bytes_read();
449 if total == 0 {
450 0.0
451 } else {
452 self.direct_bytes_read as f64 / total as f64
453 }
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use std::io::Write;
461 use tempfile::NamedTempFile;
462
463 #[test]
464 fn test_aligned_buffer() {
465 let buf = AlignedBuffer::new(1024);
466 assert!(buf.is_aligned());
467 assert!(buf.capacity() >= 1024);
468 assert_eq!(buf.len(), 0);
469 }
470
471 #[test]
472 fn test_aligned_buffer_write_read() {
473 let mut buf = AlignedBuffer::new(100);
474 {
475 let slice = buf.as_mut_slice();
476 for (i, item) in slice.iter_mut().enumerate().take(50) {
477 *item = i as u8;
478 }
479 }
480 buf.set_len(50);
481
482 assert_eq!(buf.len(), 50);
483 assert_eq!(buf.as_slice()[0], 0);
484 assert_eq!(buf.as_slice()[49], 49);
485 }
486
487 #[test]
488 fn test_direct_io_config() {
489 let default = DirectIoConfig::default();
490 assert_eq!(default.mode, DirectIoMode::Buffered);
491
492 let direct = DirectIoConfig::direct();
493 assert_eq!(direct.mode, DirectIoMode::Direct);
494
495 let auto = DirectIoConfig::auto();
496 assert_eq!(auto.mode, DirectIoMode::Auto);
497 }
498
499 #[test]
500 fn test_direct_reader() {
501 let mut temp = NamedTempFile::new().unwrap();
503 let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
504 temp.write_all(&data).unwrap();
505 temp.flush().unwrap();
506
507 let mut reader = DirectReader::open(temp.path(), 4096).unwrap();
509 let mut read_buf = vec![0u8; 10000];
510 let bytes_read = reader.read(&mut read_buf).unwrap();
511
512 assert_eq!(bytes_read, 10000);
513 assert_eq!(read_buf, data);
514 }
515
516 #[test]
517 fn test_direct_reader_seek() {
518 let mut temp = NamedTempFile::new().unwrap();
519 let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
520 temp.write_all(&data).unwrap();
521 temp.flush().unwrap();
522
523 let mut reader = DirectReader::open(temp.path(), 4096).unwrap();
524
525 reader.seek(SeekFrom::Start(5000)).unwrap();
527
528 let mut read_buf = vec![0u8; 100];
529 reader.read_exact(&mut read_buf).unwrap();
530
531 assert_eq!(read_buf, data[5000..5100]);
533 }
534
535 #[test]
536 fn test_direct_io_stats() {
537 let mut stats = DirectIoStats::default();
538
539 stats.record_direct_read(1000);
540 stats.record_buffered_read(500);
541
542 assert_eq!(stats.total_bytes_read(), 1500);
543 assert!((stats.direct_io_ratio() - 0.666).abs() < 0.01);
544 }
545
546 #[test]
547 fn test_open_direct() {
548 let temp = NamedTempFile::new().unwrap();
549 let result = open_direct(temp.path());
550 assert!(result.is_ok());
551 }
552}