use std::{
cell::RefCell,
ffi::CString,
fs::File,
io::Write,
os::{
fd::{AsRawFd, FromRawFd},
unix::ffi::OsStrExt,
},
path::{Path, PathBuf},
};
use crate::bf_tree::error::IoErrorKind;
use crate::bf_tree::utils;
#[allow(unused_imports)]
use crate::{check_parent, counter, histogram, info};
use io_uring::{IoUring, opcode};
use super::{OffsetAlloc, VfsImpl};
struct IoUringInstance {
ring: Vec<RefCell<IoUring>>,
}
impl IoUringInstance {
fn new(poll: bool) -> Result<Self, IoErrorKind> {
let parallelism: usize = std::thread::available_parallelism()
.map_err(|_| IoErrorKind::VfsRead { offset: 0 })?
.into();
let thread_cnt = 32.max(parallelism * 4);
let mut ring = Vec::with_capacity(thread_cnt);
for i in 0..thread_cnt {
let mut r = IoUring::builder();
if poll {
r.setup_sqpoll(50000);
r.setup_iopoll();
if i >= 1 {
let pre_r: &RefCell<IoUring> = &ring[i - 1];
r.setup_attach_wq(pre_r.borrow().as_raw_fd());
}
}
let r = r.build(8).map_err(|_| IoErrorKind::VfsRead { offset: 0 })?;
ring.push(RefCell::new(r));
}
Ok(Self { ring })
}
fn get_current_ring(&self) -> &RefCell<IoUring> {
let v = utils::thread_id_to_u64(std::thread::current().id());
let idx = v % self.ring.len() as u64;
self.get_ring(idx)
}
fn get_ring(&self, thread_id: u64) -> &RefCell<IoUring> {
&self.ring[thread_id as usize]
}
}
pub(crate) struct IoUringVfs {
pub(crate) file: File,
offset_alloc: OffsetAlloc,
rings: IoUringInstance,
_path: PathBuf,
polling: bool,
}
unsafe impl Send for IoUringVfs {}
unsafe impl Sync for IoUringVfs {}
impl IoUringVfs {
pub(crate) fn new_blocking(path: impl AsRef<Path>) -> Result<Self, IoErrorKind> {
Self::new_inner(path, false)
}
pub(crate) fn open(path: impl AsRef<Path>) -> Result<Self, IoErrorKind> {
Self::new_inner(path, true)
}
fn wait_cnt(&self) -> usize {
if self.polling { 0 } else { 1 }
}
fn new_inner(path: impl AsRef<Path>, use_poll: bool) -> Result<Self, IoErrorKind> {
let path = path.as_ref();
let parent = path.parent().ok_or(IoErrorKind::VfsRead { offset: 0 })?;
_ = std::fs::create_dir_all(parent);
let path_cstr = CString::new(path.as_os_str().as_bytes())
.map_err(|_| IoErrorKind::VfsRead { offset: 0 })?;
let raw_fd = unsafe {
libc::open(
path_cstr.as_ptr(),
libc::O_DIRECT | libc::O_RDWR | libc::O_CREAT,
libc::S_IRUSR | libc::S_IWUSR,
)
};
if raw_fd < 0 {
return Err(IoErrorKind::VfsRead { offset: 0 });
}
let mut file = unsafe { File::from_raw_fd(raw_fd) };
file.flush().map_err(|_| IoErrorKind::VfsFlush)?;
let offset = file
.metadata()
.map_err(|_| IoErrorKind::VfsRead { offset: 0 })?
.len();
Ok(IoUringVfs {
rings: IoUringInstance::new(use_poll)?,
file,
_path: path.to_path_buf(),
offset_alloc: OffsetAlloc::new_with(offset as usize),
polling: use_poll,
})
}
}
impl VfsImpl for IoUringVfs {
fn alloc_offset(&self, size: usize) -> usize {
self.offset_alloc.alloc(size)
}
fn dealloc_offset(&self, offset: usize) {
self.offset_alloc.dealloc_offset(offset)
}
fn flush(&self) -> Result<(), IoErrorKind> {
self.file.sync_all().map_err(|_| IoErrorKind::VfsFlush)
}
fn read(&self, offset: usize, buf: &mut [u8]) -> Result<(), IoErrorKind> {
counter!(IOReadRequest);
let read_e = opcode::Read::new(
io_uring::types::Fd(self.file.as_raw_fd()),
buf.as_mut_ptr(),
buf.len() as _,
)
.offset(offset as u64)
.build()
.user_data(0x42);
let ring = self.rings.get_current_ring();
let mut ring_mut = ring.borrow_mut();
unsafe {
let mut sq = ring_mut.submission();
sq.push(&read_e)
.map_err(|_| IoErrorKind::VfsRead { offset })?;
sq.sync();
}
ring_mut
.submit_and_wait(self.wait_cnt())
.map_err(|_| IoErrorKind::VfsRead { offset })?;
let mut cq = ring_mut.completion();
loop {
cq.sync();
match cq.next() {
Some(cqe) => {
if cqe.result() != buf.len() as i32 {
return Err(IoErrorKind::VfsRead { offset });
}
break;
}
None => {
continue;
}
};
}
Ok(())
}
fn write(&self, offset: usize, buf: &[u8]) -> Result<(), IoErrorKind> {
counter!(IOWriteRequest);
let write_e = opcode::Write::new(
io_uring::types::Fd(self.file.as_raw_fd()),
buf.as_ptr(),
buf.len() as _,
)
.offset(offset as u64)
.build()
.user_data(0x42);
let ring = self.rings.get_current_ring();
let mut ring_mut = ring.borrow_mut();
unsafe {
let mut sq = ring_mut.submission();
sq.push(&write_e)
.map_err(|_| IoErrorKind::VfsWrite { offset })?;
sq.sync();
}
ring_mut
.submit_and_wait(self.wait_cnt())
.map_err(|_| IoErrorKind::VfsWrite { offset })?;
let mut cq = ring_mut.completion();
loop {
cq.sync();
match cq.next() {
Some(cqe) => {
if cqe.result() != buf.len() as i32 {
return Err(IoErrorKind::VfsWrite { offset });
}
break;
}
None => {
continue;
}
};
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ioring_vfs_open() {
let file_path = PathBuf::from_str("target")
.unwrap()
.join("test_io_uring_vfs_open.db");
let vfs = IoUringVfs::open(&file_path).unwrap();
assert!(vfs.file.metadata().is_ok());
std::fs::remove_file(&file_path).expect("Failed to remove test file");
}
use std::{alloc::Layout, str::FromStr};
#[test]
#[ignore] fn test_read_write_operations() {
let file_path = PathBuf::from_str("target")
.unwrap()
.join("test_read_write_operations.db");
std::fs::write(&file_path, "This is a test file for IoUringVfs.").unwrap();
let vfs = IoUringVfs::open(&file_path).unwrap();
let buf_size = 512;
let buf_layout = Layout::from_size_align(buf_size, buf_size).unwrap();
let write_buf_ptr = unsafe { std::alloc::alloc(buf_layout) };
let write_buf = unsafe { std::slice::from_raw_parts_mut(write_buf_ptr, buf_size) };
for i in 0..write_buf.len() {
write_buf[i] = i as u8;
}
let offset = 0;
vfs.write(offset, write_buf).unwrap();
let read_buf_ptr = unsafe { std::alloc::alloc_zeroed(buf_layout) };
let read_buf = unsafe { std::slice::from_raw_parts_mut(read_buf_ptr, buf_size) };
vfs.read(offset, read_buf).unwrap();
assert_eq!(read_buf, write_buf, "Read data does not match written data");
unsafe { std::alloc::dealloc(write_buf_ptr, buf_layout) };
unsafe { std::alloc::dealloc(read_buf_ptr, buf_layout) };
std::fs::remove_file(file_path).expect("Failed to remove test file");
}
}