use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::Path;
pub const DIRECT_IO_ALIGNMENT: usize = 4096;
#[derive(Debug, Clone, Copy)]
pub struct DirectIoConfig {
pub wal_direct_io: bool,
pub sstable_direct_io: bool,
pub alignment: usize,
}
impl Default for DirectIoConfig {
fn default() -> Self {
Self {
wal_direct_io: false,
sstable_direct_io: false,
alignment: DIRECT_IO_ALIGNMENT,
}
}
}
impl DirectIoConfig {
pub fn enabled() -> Self {
Self {
wal_direct_io: true,
sstable_direct_io: true,
alignment: DIRECT_IO_ALIGNMENT,
}
}
}
#[cfg(target_os = "linux")]
pub fn open_with_direct_io<P: AsRef<Path>>(
path: P,
create: bool,
write: bool,
direct: bool,
) -> io::Result<File> {
use std::os::unix::fs::OpenOptionsExt;
let mut opts = OpenOptions::new();
opts.read(true);
if write {
opts.write(true);
}
if create {
opts.create(true);
}
if direct {
opts.custom_flags(libc::O_DIRECT);
}
opts.open(path)
}
#[cfg(target_os = "macos")]
pub fn open_with_direct_io<P: AsRef<Path>>(
path: P,
create: bool,
write: bool,
direct: bool,
) -> io::Result<File> {
use std::os::unix::io::AsRawFd;
let mut opts = OpenOptions::new();
opts.read(true);
if write {
opts.write(true);
}
if create {
opts.create(true);
}
let file = opts.open(path)?;
if direct {
unsafe {
libc::fcntl(file.as_raw_fd(), libc::F_NOCACHE, 1);
}
}
Ok(file)
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
pub fn open_with_direct_io<P: AsRef<Path>>(
path: P,
create: bool,
write: bool,
_direct: bool, ) -> io::Result<File> {
let mut opts = OpenOptions::new();
opts.read(true);
if write {
opts.write(true);
}
if create {
opts.create(true);
}
opts.open(path)
}
pub struct AlignedBuffer {
ptr: *mut u8,
len: usize,
capacity: usize,
layout: std::alloc::Layout,
alignment: usize,
}
unsafe impl Send for AlignedBuffer {}
unsafe impl Sync for AlignedBuffer {}
impl AlignedBuffer {
pub fn new(capacity: usize, alignment: usize) -> Self {
let aligned_capacity = (capacity + alignment - 1) & !(alignment - 1);
let layout = std::alloc::Layout::from_size_align(aligned_capacity, alignment).unwrap();
let ptr = unsafe {
let p = std::alloc::alloc_zeroed(layout);
if p.is_null() {
std::alloc::handle_alloc_error(layout);
}
p
};
Self {
ptr,
len: 0,
capacity: aligned_capacity,
layout,
alignment,
}
}
pub fn alignment(&self) -> usize {
self.alignment
}
pub fn is_aligned(&self) -> bool {
self.ptr as usize % self.alignment == 0
}
pub fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}
pub fn clear(&mut self) {
self.len = 0;
}
pub fn extend_aligned(&mut self, data: &[u8]) {
let new_len = self.len + data.len();
if new_len > self.capacity {
self.grow(new_len);
}
unsafe {
std::ptr::copy_nonoverlapping(data.as_ptr(), self.ptr.add(self.len), data.len());
}
self.len = new_len;
}
pub fn pad_to_alignment(&mut self) {
let remainder = self.len % self.alignment;
if remainder != 0 {
let padding = self.alignment - remainder;
let new_len = self.len + padding;
if new_len > self.capacity {
self.grow(new_len);
}
unsafe {
std::ptr::write_bytes(self.ptr.add(self.len), 0, padding);
}
self.len = new_len;
}
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
fn grow(&mut self, min_capacity: usize) {
let new_capacity = std::cmp::max(self.capacity * 2, min_capacity);
let new_capacity = (new_capacity + self.alignment - 1) & !(self.alignment - 1);
let new_layout =
std::alloc::Layout::from_size_align(new_capacity, self.alignment).unwrap();
let new_ptr = unsafe {
let p = std::alloc::alloc_zeroed(new_layout);
if p.is_null() {
std::alloc::handle_alloc_error(new_layout);
}
std::ptr::copy_nonoverlapping(self.ptr, p, self.len);
std::alloc::dealloc(self.ptr, self.layout);
p
};
self.ptr = new_ptr;
self.capacity = new_capacity;
self.layout = new_layout;
}
}
impl Drop for AlignedBuffer {
fn drop(&mut self) {
unsafe {
std::alloc::dealloc(self.ptr, self.layout);
}
}
}
pub struct DirectIoWriter {
file: File,
buffer: AlignedBuffer,
alignment: usize,
}
impl DirectIoWriter {
pub fn new(file: File, buffer_size: usize, alignment: usize) -> Self {
Self {
file,
buffer: AlignedBuffer::new(buffer_size, alignment),
alignment,
}
}
pub fn sync(&mut self) -> io::Result<()> {
if !self.buffer.is_empty() {
self.buffer.pad_to_alignment();
self.file.write_all(self.buffer.as_slice())?;
self.buffer.clear();
}
self.file.sync_all()
}
}
impl Write for DirectIoWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.extend_aligned(buf);
if self.buffer.len() >= self.alignment * 16 {
self.buffer.pad_to_alignment();
self.file.write_all(self.buffer.as_slice())?;
self.buffer.clear();
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.sync()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_aligned_buffer() {
let mut buf = AlignedBuffer::new(4096, DIRECT_IO_ALIGNMENT);
assert!(buf.is_aligned());
buf.extend_aligned(b"hello world");
assert_eq!(buf.len(), 11);
buf.pad_to_alignment();
assert_eq!(buf.len() % DIRECT_IO_ALIGNMENT, 0);
}
#[test]
fn test_open_direct_io() {
let temp = TempDir::new().unwrap();
let path = temp.path().join("test.dat");
let file = open_with_direct_io(&path, true, true, false);
assert!(file.is_ok());
}
}