use super::{Fs, FsDirEntry, FsFile, FsMetadata, FsOpenOptions};
use crate::HashMap;
use core::sync::atomic::AtomicU64;
use io_uring::{IoUring, opcode, types};
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
const DEFAULT_SQ_ENTRIES: u32 = 256;
#[must_use]
pub fn is_io_uring_available() -> bool {
IoUring::new(2).is_ok()
}
pub struct IoUringFs {
inner: Arc<RingThread>,
}
impl IoUringFs {
pub fn new() -> io::Result<Self> {
Self::with_ring_size(DEFAULT_SQ_ENTRIES)
}
pub fn with_ring_size(sq_entries: u32) -> io::Result<Self> {
let inner = RingThread::spawn(sq_entries)?;
Ok(Self {
inner: Arc::new(inner),
})
}
}
impl Clone for IoUringFs {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl std::fmt::Debug for IoUringFs {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IoUringFs").finish_non_exhaustive()
}
}
impl Fs for IoUringFs {
fn open(&self, path: &Path, opts: &FsOpenOptions) -> crate::io::Result<Box<dyn FsFile>> {
let mut builder = OpenOptions::new();
builder
.read(opts.read)
.write(opts.write)
.create(opts.create)
.create_new(opts.create_new)
.truncate(opts.truncate)
.append(opts.append);
#[cfg(feature = "std")]
super::direct_io::apply_direct_io_flag(&mut builder, opts.direct_io);
let file = builder.open(path)?;
let cursor = if opts.append {
file.metadata()?.len()
} else {
0
};
Ok(Box::new(IoUringFile {
file,
cursor: AtomicU64::new(cursor),
is_append: opts.append,
ring: Arc::clone(&self.inner),
}))
}
fn create_dir_all(&self, path: &Path) -> crate::io::Result<()> {
std::fs::create_dir_all(path).map_err(crate::io::Error::from)
}
fn create_dir(&self, path: &Path) -> crate::io::Result<()> {
std::fs::create_dir(path).map_err(crate::io::Error::from)
}
fn read_dir(&self, path: &Path) -> crate::io::Result<Vec<FsDirEntry>> {
std::fs::read_dir(path)?
.map(|res| {
let entry = res?;
let file_type = entry.file_type()?;
let file_name_os = entry.file_name();
let file_name = file_name_os.into_string().map_err(|os| {
#[expect(
clippy::unnecessary_debug_formatting,
reason = "OsString has no Display impl — Debug is required"
)]
let msg = format!("non-UTF-8 filename in directory {}: {os:?}", path.display());
io::Error::new(io::ErrorKind::InvalidData, msg)
})?;
Ok(FsDirEntry {
path: entry.path(),
file_name,
is_dir: file_type.is_dir(),
})
})
.collect::<io::Result<Vec<_>>>()
.map_err(crate::io::Error::from)
}
fn remove_file(&self, path: &Path) -> crate::io::Result<()> {
std::fs::remove_file(path).map_err(crate::io::Error::from)
}
fn remove_dir_all(&self, path: &Path) -> crate::io::Result<()> {
std::fs::remove_dir_all(path).map_err(crate::io::Error::from)
}
fn rename(&self, from: &Path, to: &Path) -> crate::io::Result<()> {
std::fs::rename(from, to).map_err(crate::io::Error::from)
}
fn metadata(&self, path: &Path) -> crate::io::Result<FsMetadata> {
let m = std::fs::metadata(path)?;
Ok(FsMetadata {
len: m.len(),
is_dir: m.is_dir(),
is_file: m.is_file(),
})
}
fn available_space(&self, path: &Path) -> crate::io::Result<u64> {
super::statvfs_available_space(path).map_err(crate::io::Error::from)
}
fn sync_directory(&self, path: &Path) -> crate::io::Result<()> {
let dir = File::open(path)?;
if !dir.metadata()?.is_dir() {
return Err(crate::io::Error::new(
crate::io::ErrorKind::InvalidInput,
"sync_directory: path is not a directory",
));
}
self.inner.submit_fsync(dir.as_raw_fd(), false)?;
Ok(())
}
fn exists(&self, path: &Path) -> crate::io::Result<bool> {
path.try_exists().map_err(crate::io::Error::from)
}
fn hard_link(&self, src: &Path, dst: &Path) -> crate::io::Result<()> {
super::StdFs.hard_link(src, dst)
}
fn backend_id(&self) -> Option<u64> {
super::StdFs.backend_id()
}
fn volume_id(&self, path: &Path) -> Option<u64> {
super::StdFs.volume_id(path)
}
}
pub struct IoUringFile {
file: File,
cursor: AtomicU64,
is_append: bool,
ring: Arc<RingThread>,
}
impl FsFile for IoUringFile {
fn sync_all(&self) -> crate::io::Result<()> {
self.ring.submit_fsync(self.file.as_raw_fd(), false)?;
Ok(())
}
fn sync_data(&self) -> crate::io::Result<()> {
self.ring.submit_fsync(self.file.as_raw_fd(), true)?;
Ok(())
}
fn metadata(&self) -> crate::io::Result<FsMetadata> {
let m = self.file.metadata()?;
Ok(FsMetadata {
len: m.len(),
is_dir: m.is_dir(),
is_file: m.is_file(),
})
}
fn set_len(&self, size: u64) -> crate::io::Result<()> {
self.file.set_len(size).map_err(crate::io::Error::from)
}
fn read_at(&self, buf: &mut [u8], offset: u64) -> crate::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let fd = self.file.as_raw_fd();
let mut total_read: usize = 0;
while total_read < buf.len() {
let remaining = buf.get_mut(total_read..).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "read_at offset out of bounds")
})?;
let current_offset = offset.checked_add(total_read as u64).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "read_at offset overflow")
})?;
let n = loop {
match self.ring.submit_read(fd, remaining, current_offset) {
Ok(n) => break n,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e.into()),
}
};
if n == 0 {
break; }
total_read += n as usize;
}
Ok(total_read)
}
fn lock_exclusive(&self) -> crate::io::Result<()> {
FsFile::lock_exclusive(&self.file)
}
fn try_lock_exclusive(&self) -> crate::io::Result<bool> {
FsFile::try_lock_exclusive(&self.file)
}
}
impl Read for IoUringFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let cursor = self.cursor.get_mut();
let n = self.ring.submit_read(self.file.as_raw_fd(), buf, *cursor)?;
*cursor += u64::from(n);
Ok(n as usize)
}
}
impl Write for IoUringFile {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let cursor = self.cursor.get_mut();
if self.is_append {
*cursor = self.file.metadata()?.len();
}
let n = self
.ring
.submit_write(self.file.as_raw_fd(), buf, *cursor)?;
*cursor += u64::from(n);
Ok(n as usize)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Seek for IoUringFile {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let cursor = self.cursor.get_mut();
let new_pos = match pos {
SeekFrom::Start(n) => n,
SeekFrom::Current(n) => if n >= 0 {
cursor.checked_add(n.unsigned_abs())
} else {
cursor.checked_sub(n.unsigned_abs())
}
.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "seek position out of range")
})?,
SeekFrom::End(n) => {
let len = self.file.metadata()?.len();
if n >= 0 {
len.checked_add(n.unsigned_abs())
} else {
len.checked_sub(n.unsigned_abs())
}
.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "seek position out of range")
})?
}
};
*cursor = new_pos;
Ok(new_pos)
}
}
struct UnsafeSendMutPtr(*mut u8);
struct UnsafeSendConstPtr(*const u8);
#[expect(unsafe_code, reason = "marking raw-pointer wrapper as Send")]
unsafe impl Send for UnsafeSendMutPtr {}
#[expect(unsafe_code, reason = "marking raw-pointer wrapper as Send")]
unsafe impl Send for UnsafeSendConstPtr {}
enum OpKind {
Read {
fd: i32,
buf: UnsafeSendMutPtr,
len: u32,
offset: u64,
},
Write {
fd: i32,
buf: UnsafeSendConstPtr,
len: u32,
offset: u64,
},
Fsync {
fd: i32,
datasync: bool,
},
}
struct Op {
kind: OpKind,
result_tx: mpsc::SyncSender<i32>,
}
struct RingThread {
tx: Mutex<Option<mpsc::SyncSender<Op>>>,
handle: Mutex<Option<thread::JoinHandle<()>>>,
}
impl RingThread {
fn spawn(sq_entries: u32) -> io::Result<Self> {
let ring = IoUring::new(sq_entries)?;
let (tx, rx) = mpsc::sync_channel(sq_entries as usize);
let handle = thread::Builder::new()
.name("lsm-io-uring".into())
.spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
Self::event_loop(ring, rx);
}));
if result.is_err() {
log::error!("io_uring ring thread panicked; aborting to avoid UB");
std::process::abort();
}
})?;
Ok(Self {
tx: Mutex::new(Some(tx)),
handle: Mutex::new(Some(handle)),
})
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[expect(
clippy::needless_pass_by_value,
reason = "rx is moved into the spawned thread — must be owned"
)]
fn event_loop(mut ring: IoUring, rx: mpsc::Receiver<Op>) {
let mut pending =
std::mem::ManuallyDrop::new(HashMap::<u64, mpsc::SyncSender<i32>>::default());
let mut next_id: u64 = 0;
loop {
let first = if pending.is_empty() {
match rx.recv() {
Ok(op) => Some(op),
Err(mpsc::RecvError) => break,
}
} else {
match rx.try_recv() {
Ok(op) => Some(op),
Err(mpsc::TryRecvError::Empty) => None,
Err(mpsc::TryRecvError::Disconnected) => {
if pending.is_empty() {
break;
}
None
}
}
};
if let Some(op) = first {
Self::enqueue(&mut ring, &mut pending, &mut next_id, op);
while let Ok(op) = rx.try_recv() {
Self::enqueue(&mut ring, &mut pending, &mut next_id, op);
}
}
if pending.is_empty() {
continue;
}
loop {
match ring.submit_and_wait(1) {
Ok(_) => break,
Err(ref e) if e.raw_os_error() == Some(4 ) => {}
Err(e) => {
log::error!(
"io_uring submit_and_wait failed: {e}; aborting process to avoid UB"
);
std::process::abort();
}
}
}
for cqe in ring.completion() {
let id = cqe.user_data();
if let Some(tx) = pending.remove(&id) {
let _ = tx.send(cqe.result());
}
}
}
#[expect(unsafe_code, reason = "ManuallyDrop cleanup on normal exit path")]
unsafe {
std::mem::ManuallyDrop::drop(&mut pending);
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
fn enqueue(
ring: &mut IoUring,
pending: &mut HashMap<u64, mpsc::SyncSender<i32>>,
next_id: &mut u64,
op: Op,
) {
let id = *next_id;
*next_id = next_id.wrapping_add(1);
let sqe = match op.kind {
OpKind::Read {
fd,
buf,
len,
offset,
} => opcode::Read::new(types::Fd(fd), buf.0, len)
.offset(offset)
.build()
.user_data(id),
OpKind::Write {
fd,
buf,
len,
offset,
} => opcode::Write::new(types::Fd(fd), buf.0, len)
.offset(offset)
.build()
.user_data(id),
OpKind::Fsync { fd, datasync } => {
let mut entry = opcode::Fsync::new(types::Fd(fd));
if datasync {
entry = entry.flags(types::FsyncFlags::DATASYNC);
}
entry.build().user_data(id)
}
};
#[expect(unsafe_code, reason = "io_uring SQE push")]
unsafe {
while ring.submission().push(&sqe).is_err() {
loop {
match ring.submit_and_wait(1) {
Ok(_) => break,
Err(ref e) if e.raw_os_error() == Some(4 ) => {}
Err(e) => {
log::error!(
"io_uring submit_and_wait failed in SQ retry: {e}; aborting"
);
std::process::abort();
}
}
}
for cqe in ring.completion() {
let cid = cqe.user_data();
if let Some(tx) = pending.remove(&cid) {
let _ = tx.send(cqe.result());
}
}
}
}
pending.insert(id, op.result_tx);
}
fn submit_read(&self, fd: i32, buf: &mut [u8], offset: u64) -> io::Result<u32> {
let len: u32 = i32::try_from(buf.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "buffer exceeds i32::MAX"))?
.unsigned_abs();
let (tx, rx) = mpsc::sync_channel(1);
let op = Op {
kind: OpKind::Read {
fd,
buf: UnsafeSendMutPtr(buf.as_mut_ptr()),
len,
offset,
},
result_tx: tx,
};
self.send_and_wait(op, &rx)
}
fn submit_write(&self, fd: i32, buf: &[u8], offset: u64) -> io::Result<u32> {
let len: u32 = i32::try_from(buf.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "buffer exceeds i32::MAX"))?
.unsigned_abs();
let (tx, rx) = mpsc::sync_channel(1);
let op = Op {
kind: OpKind::Write {
fd,
buf: UnsafeSendConstPtr(buf.as_ptr()),
len,
offset,
},
result_tx: tx,
};
self.send_and_wait(op, &rx)
}
fn submit_fsync(&self, fd: i32, datasync: bool) -> io::Result<u32> {
let (tx, rx) = mpsc::sync_channel(1);
let op = Op {
kind: OpKind::Fsync { fd, datasync },
result_tx: tx,
};
self.send_and_wait(op, &rx)
}
fn send_and_wait(&self, op: Op, rx: &mpsc::Receiver<i32>) -> io::Result<u32> {
self.tx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "io_uring thread shut down"))?
.send(op)
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "io_uring thread exited"))?;
let result = rx
.recv()
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "io_uring thread exited"))?;
if result >= 0 {
#[expect(clippy::cast_sign_loss, reason = "guarded by result >= 0 check above")]
Ok(result as u32)
} else {
Err(io::Error::from_raw_os_error(-result))
}
}
}
impl Drop for RingThread {
#[cfg_attr(coverage_nightly, coverage(off))]
fn drop(&mut self) {
let tx = match self.tx.get_mut() {
Ok(tx) => tx,
Err(poisoned) => poisoned.into_inner(),
};
*tx = None;
let handle_slot = match self.handle.get_mut() {
Ok(h) => h,
Err(poisoned) => poisoned.into_inner(),
};
if let Some(handle) = handle_slot.take()
&& handle.join().is_err()
{
log::error!("io_uring ring thread panicked during shutdown");
}
}
}
#[cfg(test)]
mod tests;