1use std::alloc::{Layout, alloc, dealloc};
41use std::fs::{File, OpenOptions};
42use std::io::{self, Read, Seek, SeekFrom};
43use std::path::Path;
44
45#[cfg(target_os = "linux")]
47pub const DIRECT_IO_ALIGNMENT: usize = 512;
48
49#[cfg(target_os = "macos")]
50pub const DIRECT_IO_ALIGNMENT: usize = 4096;
51
52#[cfg(not(any(target_os = "linux", target_os = "macos")))]
53pub const DIRECT_IO_ALIGNMENT: usize = 4096;
54
55pub const PAGE_SIZE: usize = 4096;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
60pub enum DirectIoMode {
61 #[default]
63 Buffered,
64 Direct,
66 Auto,
68}
69
70#[derive(Debug, Clone)]
72pub struct DirectIoConfig {
73 pub mode: DirectIoMode,
75 pub buffer_size: usize,
77 pub auto_threshold: usize,
79}
80
81impl Default for DirectIoConfig {
82 fn default() -> Self {
83 Self {
84 mode: DirectIoMode::Buffered,
85 buffer_size: 256 * 1024, auto_threshold: 64 * 1024 * 1024, }
88 }
89}
90
91impl DirectIoConfig {
92 pub fn direct() -> Self {
94 Self {
95 mode: DirectIoMode::Direct,
96 ..Default::default()
97 }
98 }
99
100 pub fn auto() -> Self {
102 Self {
103 mode: DirectIoMode::Auto,
104 ..Default::default()
105 }
106 }
107}
108
109pub struct AlignedBuffer {
114 ptr: *mut u8,
115 len: usize,
116 capacity: usize,
117 alignment: usize,
118}
119
120impl AlignedBuffer {
121 pub fn new(capacity: usize) -> Self {
123 Self::with_alignment(capacity, DIRECT_IO_ALIGNMENT)
124 }
125
126 pub fn with_alignment(capacity: usize, alignment: usize) -> Self {
128 let aligned_capacity = capacity.div_ceil(alignment) * alignment;
130
131 let layout =
132 Layout::from_size_align(aligned_capacity, alignment).expect("Invalid alignment");
133
134 let ptr = unsafe { alloc(layout) };
135 if ptr.is_null() {
136 panic!("Failed to allocate aligned buffer");
137 }
138
139 Self {
140 ptr,
141 len: 0,
142 capacity: aligned_capacity,
143 alignment,
144 }
145 }
146
147 pub fn as_slice(&self) -> &[u8] {
149 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
150 }
151
152 pub fn as_mut_slice(&mut self) -> &mut [u8] {
154 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.capacity) }
155 }
156
157 pub fn set_len(&mut self, len: usize) {
159 assert!(len <= self.capacity);
160 self.len = len;
161 }
162
163 pub fn capacity(&self) -> usize {
165 self.capacity
166 }
167
168 pub fn len(&self) -> usize {
170 self.len
171 }
172
173 pub fn is_empty(&self) -> bool {
175 self.len == 0
176 }
177
178 pub fn alignment(&self) -> usize {
180 self.alignment
181 }
182
183 pub fn is_aligned(&self) -> bool {
185 (self.ptr as usize).is_multiple_of(self.alignment)
186 }
187}
188
189impl Drop for AlignedBuffer {
190 fn drop(&mut self) {
191 let layout = Layout::from_size_align(self.capacity, self.alignment)
192 .expect("Invalid alignment in drop");
193 unsafe { dealloc(self.ptr, layout) };
194 }
195}
196
197unsafe impl Send for AlignedBuffer {}
199unsafe impl Sync for AlignedBuffer {}
200
201#[cfg(target_os = "linux")]
203pub fn open_direct(path: &Path) -> io::Result<File> {
204 use std::os::unix::fs::OpenOptionsExt;
205
206 OpenOptions::new()
207 .read(true)
208 .custom_flags(libc::O_DIRECT)
209 .open(path)
210}
211
212#[cfg(target_os = "macos")]
213pub fn open_direct(path: &Path) -> io::Result<File> {
214 let file = OpenOptions::new().read(true).open(path)?;
215
216 unsafe {
218 let fd = std::os::unix::io::AsRawFd::as_raw_fd(&file);
219 if libc::fcntl(fd, libc::F_NOCACHE, 1) == -1 {
220 return Err(io::Error::last_os_error());
221 }
222 }
223
224 Ok(file)
225}
226
227#[cfg(not(any(target_os = "linux", target_os = "macos")))]
228pub fn open_direct(path: &Path) -> io::Result<File> {
229 OpenOptions::new().read(true).open(path)
231}
232
233#[cfg(target_os = "linux")]
235pub fn open_direct_write(path: &Path) -> io::Result<File> {
236 use std::os::unix::fs::OpenOptionsExt;
237
238 OpenOptions::new()
239 .write(true)
240 .create(true)
241 .truncate(true)
242 .custom_flags(libc::O_DIRECT)
243 .open(path)
244}
245
246#[cfg(target_os = "macos")]
247pub fn open_direct_write(path: &Path) -> io::Result<File> {
248 let file = OpenOptions::new()
249 .write(true)
250 .create(true)
251 .truncate(true)
252 .open(path)?;
253
254 unsafe {
255 let fd = std::os::unix::io::AsRawFd::as_raw_fd(&file);
256 if libc::fcntl(fd, libc::F_NOCACHE, 1) == -1 {
257 return Err(io::Error::last_os_error());
258 }
259 }
260
261 Ok(file)
262}
263
264#[cfg(not(any(target_os = "linux", target_os = "macos")))]
265pub fn open_direct_write(path: &Path) -> io::Result<File> {
266 OpenOptions::new()
267 .write(true)
268 .create(true)
269 .truncate(true)
270 .open(path)
271}
272
273pub struct DirectReader {
275 file: File,
276 buffer: AlignedBuffer,
277 file_offset: u64,
278 buffer_offset: usize,
279 buffer_valid: usize,
280 file_size: u64,
281}
282
283impl DirectReader {
284 pub fn open(path: &Path, buffer_size: usize) -> io::Result<Self> {
286 let file = open_direct(path)?;
287 let file_size = file.metadata()?.len();
288
289 Ok(Self {
290 file,
291 buffer: AlignedBuffer::new(buffer_size),
292 file_offset: 0,
293 buffer_offset: 0,
294 buffer_valid: 0,
295 file_size,
296 })
297 }
298
299 pub fn from_file(file: File, buffer_size: usize) -> io::Result<Self> {
301 let file_size = file.metadata()?.len();
302
303 Ok(Self {
304 file,
305 buffer: AlignedBuffer::new(buffer_size),
306 file_offset: 0,
307 buffer_offset: 0,
308 buffer_valid: 0,
309 file_size,
310 })
311 }
312
313 pub fn file_size(&self) -> u64 {
315 self.file_size
316 }
317
318 fn refill_buffer(&mut self) -> io::Result<usize> {
320 let aligned_offset =
322 (self.file_offset / DIRECT_IO_ALIGNMENT as u64) * DIRECT_IO_ALIGNMENT as u64;
323 let skip = (self.file_offset - aligned_offset) as usize;
324
325 self.file.seek(SeekFrom::Start(aligned_offset))?;
326
327 let buf = self.buffer.as_mut_slice();
328 let bytes_read = self.file.read(buf)?;
329
330 self.buffer.set_len(bytes_read);
331 self.buffer_valid = bytes_read;
332 self.buffer_offset = skip;
333
334 Ok(bytes_read.saturating_sub(skip))
335 }
336}
337
338impl Read for DirectReader {
339 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
340 let mut total_read = 0;
341
342 while total_read < buf.len() {
343 let available = self.buffer_valid.saturating_sub(self.buffer_offset);
345
346 if available == 0 {
347 if self.file_offset >= self.file_size {
349 break; }
351 let refilled = self.refill_buffer()?;
352 if refilled == 0 {
353 break; }
355 continue;
356 }
357
358 let to_copy = (buf.len() - total_read).min(available);
360 let src = &self.buffer.as_slice()[self.buffer_offset..self.buffer_offset + to_copy];
361 buf[total_read..total_read + to_copy].copy_from_slice(src);
362
363 self.buffer_offset += to_copy;
364 self.file_offset += to_copy as u64;
365 total_read += to_copy;
366 }
367
368 Ok(total_read)
369 }
370}
371
372impl Seek for DirectReader {
373 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
374 let new_pos = match pos {
375 SeekFrom::Start(p) => p,
376 SeekFrom::End(p) => {
377 if p >= 0 {
378 self.file_size + p as u64
379 } else {
380 self.file_size.saturating_sub((-p) as u64)
381 }
382 }
383 SeekFrom::Current(p) => {
384 if p >= 0 {
385 self.file_offset + p as u64
386 } else {
387 self.file_offset.saturating_sub((-p) as u64)
388 }
389 }
390 };
391
392 let buffer_start = self.file_offset - self.buffer_offset as u64;
394 let buffer_end = buffer_start + self.buffer_valid as u64;
395
396 if new_pos < buffer_start || new_pos >= buffer_end {
397 self.buffer_offset = 0;
398 self.buffer_valid = 0;
399 } else {
400 self.buffer_offset = (new_pos - buffer_start) as usize;
401 }
402
403 self.file_offset = new_pos;
404 Ok(new_pos)
405 }
406}
407
408#[derive(Debug, Default, Clone)]
410pub struct DirectIoStats {
411 pub direct_bytes_read: u64,
413 pub buffered_bytes_read: u64,
415 pub direct_bytes_written: u64,
417 pub buffered_bytes_written: u64,
419 pub direct_reads: u64,
421 pub buffered_reads: u64,
423}
424
425impl DirectIoStats {
426 pub fn record_direct_read(&mut self, bytes: u64) {
428 self.direct_bytes_read += bytes;
429 self.direct_reads += 1;
430 }
431
432 pub fn record_buffered_read(&mut self, bytes: u64) {
434 self.buffered_bytes_read += bytes;
435 self.buffered_reads += 1;
436 }
437
438 pub fn total_bytes_read(&self) -> u64 {
440 self.direct_bytes_read + self.buffered_bytes_read
441 }
442
443 pub fn direct_io_ratio(&self) -> f64 {
445 let total = self.total_bytes_read();
446 if total == 0 {
447 0.0
448 } else {
449 self.direct_bytes_read as f64 / total as f64
450 }
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use std::io::Write;
458 use tempfile::NamedTempFile;
459
460 #[test]
461 fn test_aligned_buffer() {
462 let buf = AlignedBuffer::new(1024);
463 assert!(buf.is_aligned());
464 assert!(buf.capacity() >= 1024);
465 assert_eq!(buf.len(), 0);
466 }
467
468 #[test]
469 fn test_aligned_buffer_write_read() {
470 let mut buf = AlignedBuffer::new(100);
471 {
472 let slice = buf.as_mut_slice();
473 for (i, item) in slice.iter_mut().enumerate().take(50) {
474 *item = i as u8;
475 }
476 }
477 buf.set_len(50);
478
479 assert_eq!(buf.len(), 50);
480 assert_eq!(buf.as_slice()[0], 0);
481 assert_eq!(buf.as_slice()[49], 49);
482 }
483
484 #[test]
485 fn test_direct_io_config() {
486 let default = DirectIoConfig::default();
487 assert_eq!(default.mode, DirectIoMode::Buffered);
488
489 let direct = DirectIoConfig::direct();
490 assert_eq!(direct.mode, DirectIoMode::Direct);
491
492 let auto = DirectIoConfig::auto();
493 assert_eq!(auto.mode, DirectIoMode::Auto);
494 }
495
496 #[test]
497 fn test_direct_reader() {
498 let mut temp = NamedTempFile::new().unwrap();
500 let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
501 temp.write_all(&data).unwrap();
502 temp.flush().unwrap();
503
504 let mut reader = DirectReader::open(temp.path(), 4096).unwrap();
506 let mut read_buf = vec![0u8; 10000];
507 let bytes_read = reader.read(&mut read_buf).unwrap();
508
509 assert_eq!(bytes_read, 10000);
510 assert_eq!(read_buf, data);
511 }
512
513 #[test]
514 fn test_direct_reader_seek() {
515 let mut temp = NamedTempFile::new().unwrap();
516 let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
517 temp.write_all(&data).unwrap();
518 temp.flush().unwrap();
519
520 let mut reader = DirectReader::open(temp.path(), 4096).unwrap();
521
522 reader.seek(SeekFrom::Start(5000)).unwrap();
524
525 let mut read_buf = vec![0u8; 100];
526 reader.read_exact(&mut read_buf).unwrap();
527
528 assert_eq!(read_buf, data[5000..5100]);
530 }
531
532 #[test]
533 fn test_direct_io_stats() {
534 let mut stats = DirectIoStats::default();
535
536 stats.record_direct_read(1000);
537 stats.record_buffered_read(500);
538
539 assert_eq!(stats.total_bytes_read(), 1500);
540 assert!((stats.direct_io_ratio() - 0.666).abs() < 0.01);
541 }
542
543 #[test]
544 fn test_open_direct() {
545 let temp = NamedTempFile::new().unwrap();
546 let result = open_direct(temp.path());
547 assert!(result.is_ok());
548 }
549}