use super::{Fs, FsDirEntry, FsFile, FsMetadata, FsOpenOptions};
use crate::HashMap;
use io_uring::{IoUring, opcode, types};
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
const DEFAULT_SQ_ENTRIES: u32 = 256;
#[must_use]
pub fn is_io_uring_available() -> bool {
IoUring::new(2).is_ok()
}
pub struct IoUringFs {
inner: Arc<RingThread>,
}
impl IoUringFs {
pub fn new() -> io::Result<Self> {
Self::with_ring_size(DEFAULT_SQ_ENTRIES)
}
pub fn with_ring_size(sq_entries: u32) -> io::Result<Self> {
let inner = RingThread::spawn(sq_entries)?;
Ok(Self {
inner: Arc::new(inner),
})
}
}
impl Clone for IoUringFs {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl std::fmt::Debug for IoUringFs {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IoUringFs").finish_non_exhaustive()
}
}
impl Fs for IoUringFs {
fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result<Box<dyn FsFile>> {
let file = OpenOptions::new()
.read(opts.read)
.write(opts.write)
.create(opts.create)
.create_new(opts.create_new)
.truncate(opts.truncate)
.append(opts.append)
.open(path)?;
let cursor = if opts.append {
file.metadata()?.len()
} else {
0
};
Ok(Box::new(IoUringFile {
file,
cursor: AtomicU64::new(cursor),
is_append: opts.append,
ring: Arc::clone(&self.inner),
}))
}
fn create_dir_all(&self, path: &Path) -> io::Result<()> {
std::fs::create_dir_all(path)
}
fn read_dir(&self, path: &Path) -> io::Result<Vec<FsDirEntry>> {
std::fs::read_dir(path)?
.map(|res| {
let entry = res?;
let file_type = entry.file_type()?;
let file_name_os = entry.file_name();
let file_name = file_name_os.into_string().map_err(|os| {
#[expect(
clippy::unnecessary_debug_formatting,
reason = "OsString has no Display impl — Debug is required"
)]
let msg = format!("non-UTF-8 filename in directory {}: {os:?}", path.display());
io::Error::new(io::ErrorKind::InvalidData, msg)
})?;
Ok(FsDirEntry {
path: entry.path(),
file_name,
is_dir: file_type.is_dir(),
})
})
.collect()
}
fn remove_file(&self, path: &Path) -> io::Result<()> {
std::fs::remove_file(path)
}
fn remove_dir_all(&self, path: &Path) -> io::Result<()> {
std::fs::remove_dir_all(path)
}
fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
std::fs::rename(from, to)
}
fn metadata(&self, path: &Path) -> io::Result<FsMetadata> {
let m = std::fs::metadata(path)?;
Ok(FsMetadata {
len: m.len(),
is_dir: m.is_dir(),
is_file: m.is_file(),
})
}
fn sync_directory(&self, path: &Path) -> io::Result<()> {
let dir = File::open(path)?;
if !dir.metadata()?.is_dir() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"sync_directory: path is not a directory",
));
}
self.inner.submit_fsync(dir.as_raw_fd(), false)?;
Ok(())
}
fn exists(&self, path: &Path) -> io::Result<bool> {
path.try_exists()
}
}
pub struct IoUringFile {
file: File,
cursor: AtomicU64,
is_append: bool,
ring: Arc<RingThread>,
}
impl FsFile for IoUringFile {
fn sync_all(&self) -> io::Result<()> {
self.ring.submit_fsync(self.file.as_raw_fd(), false)?;
Ok(())
}
fn sync_data(&self) -> io::Result<()> {
self.ring.submit_fsync(self.file.as_raw_fd(), true)?;
Ok(())
}
fn metadata(&self) -> io::Result<FsMetadata> {
let m = self.file.metadata()?;
Ok(FsMetadata {
len: m.len(),
is_dir: m.is_dir(),
is_file: m.is_file(),
})
}
fn set_len(&self, size: u64) -> io::Result<()> {
self.file.set_len(size)
}
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let fd = self.file.as_raw_fd();
let mut total_read: usize = 0;
while total_read < buf.len() {
let remaining = buf.get_mut(total_read..).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "read_at offset out of bounds")
})?;
let current_offset = offset.checked_add(total_read as u64).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "read_at offset overflow")
})?;
let n = loop {
match self.ring.submit_read(fd, remaining, current_offset) {
Ok(n) => break n,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
};
if n == 0 {
break; }
total_read += n as usize;
}
Ok(total_read)
}
fn lock_exclusive(&self) -> io::Result<()> {
FsFile::lock_exclusive(&self.file)
}
}
impl Read for IoUringFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let cursor = self.cursor.get_mut();
let n = self.ring.submit_read(self.file.as_raw_fd(), buf, *cursor)?;
*cursor += u64::from(n);
Ok(n as usize)
}
}
impl Write for IoUringFile {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let cursor = self.cursor.get_mut();
if self.is_append {
*cursor = self.file.metadata()?.len();
}
let n = self
.ring
.submit_write(self.file.as_raw_fd(), buf, *cursor)?;
*cursor += u64::from(n);
Ok(n as usize)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Seek for IoUringFile {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let cursor = self.cursor.get_mut();
let new_pos = match pos {
SeekFrom::Start(n) => n,
SeekFrom::Current(n) => if n >= 0 {
cursor.checked_add(n.unsigned_abs())
} else {
cursor.checked_sub(n.unsigned_abs())
}
.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "seek position out of range")
})?,
SeekFrom::End(n) => {
let len = self.file.metadata()?.len();
if n >= 0 {
len.checked_add(n.unsigned_abs())
} else {
len.checked_sub(n.unsigned_abs())
}
.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "seek position out of range")
})?
}
};
*cursor = new_pos;
Ok(new_pos)
}
}
struct UnsafeSendMutPtr(*mut u8);
struct UnsafeSendConstPtr(*const u8);
#[expect(unsafe_code, reason = "marking raw-pointer wrapper as Send")]
unsafe impl Send for UnsafeSendMutPtr {}
#[expect(unsafe_code, reason = "marking raw-pointer wrapper as Send")]
unsafe impl Send for UnsafeSendConstPtr {}
enum OpKind {
Read {
fd: i32,
buf: UnsafeSendMutPtr,
len: u32,
offset: u64,
},
Write {
fd: i32,
buf: UnsafeSendConstPtr,
len: u32,
offset: u64,
},
Fsync {
fd: i32,
datasync: bool,
},
}
struct Op {
kind: OpKind,
result_tx: mpsc::SyncSender<i32>,
}
struct RingThread {
tx: Mutex<Option<mpsc::SyncSender<Op>>>,
handle: Mutex<Option<thread::JoinHandle<()>>>,
}
impl RingThread {
fn spawn(sq_entries: u32) -> io::Result<Self> {
let ring = IoUring::new(sq_entries)?;
let (tx, rx) = mpsc::sync_channel(sq_entries as usize);
let handle = thread::Builder::new()
.name("lsm-io-uring".into())
.spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
Self::event_loop(ring, rx);
}));
if result.is_err() {
log::error!("io_uring ring thread panicked; aborting to avoid UB");
std::process::abort();
}
})?;
Ok(Self {
tx: Mutex::new(Some(tx)),
handle: Mutex::new(Some(handle)),
})
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[expect(
clippy::needless_pass_by_value,
reason = "rx is moved into the spawned thread — must be owned"
)]
fn event_loop(mut ring: IoUring, rx: mpsc::Receiver<Op>) {
let mut pending =
std::mem::ManuallyDrop::new(HashMap::<u64, mpsc::SyncSender<i32>>::default());
let mut next_id: u64 = 0;
loop {
let first = if pending.is_empty() {
match rx.recv() {
Ok(op) => Some(op),
Err(mpsc::RecvError) => break,
}
} else {
match rx.try_recv() {
Ok(op) => Some(op),
Err(mpsc::TryRecvError::Empty) => None,
Err(mpsc::TryRecvError::Disconnected) => {
if pending.is_empty() {
break;
}
None
}
}
};
if let Some(op) = first {
Self::enqueue(&mut ring, &mut pending, &mut next_id, op);
while let Ok(op) = rx.try_recv() {
Self::enqueue(&mut ring, &mut pending, &mut next_id, op);
}
}
if pending.is_empty() {
continue;
}
loop {
match ring.submit_and_wait(1) {
Ok(_) => break,
Err(ref e) if e.raw_os_error() == Some(4 ) => {}
Err(e) => {
log::error!(
"io_uring submit_and_wait failed: {e}; aborting process to avoid UB"
);
std::process::abort();
}
}
}
for cqe in ring.completion() {
let id = cqe.user_data();
if let Some(tx) = pending.remove(&id) {
let _ = tx.send(cqe.result());
}
}
}
#[expect(unsafe_code, reason = "ManuallyDrop cleanup on normal exit path")]
unsafe {
std::mem::ManuallyDrop::drop(&mut pending);
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
fn enqueue(
ring: &mut IoUring,
pending: &mut HashMap<u64, mpsc::SyncSender<i32>>,
next_id: &mut u64,
op: Op,
) {
let id = *next_id;
*next_id = next_id.wrapping_add(1);
let sqe = match op.kind {
OpKind::Read {
fd,
buf,
len,
offset,
} => opcode::Read::new(types::Fd(fd), buf.0, len)
.offset(offset)
.build()
.user_data(id),
OpKind::Write {
fd,
buf,
len,
offset,
} => opcode::Write::new(types::Fd(fd), buf.0, len)
.offset(offset)
.build()
.user_data(id),
OpKind::Fsync { fd, datasync } => {
let mut entry = opcode::Fsync::new(types::Fd(fd));
if datasync {
entry = entry.flags(types::FsyncFlags::DATASYNC);
}
entry.build().user_data(id)
}
};
#[expect(unsafe_code, reason = "io_uring SQE push")]
unsafe {
while ring.submission().push(&sqe).is_err() {
loop {
match ring.submit_and_wait(1) {
Ok(_) => break,
Err(ref e) if e.raw_os_error() == Some(4 ) => {}
Err(e) => {
log::error!(
"io_uring submit_and_wait failed in SQ retry: {e}; aborting"
);
std::process::abort();
}
}
}
for cqe in ring.completion() {
let cid = cqe.user_data();
if let Some(tx) = pending.remove(&cid) {
let _ = tx.send(cqe.result());
}
}
}
}
pending.insert(id, op.result_tx);
}
fn submit_read(&self, fd: i32, buf: &mut [u8], offset: u64) -> io::Result<u32> {
let len: u32 = i32::try_from(buf.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "buffer exceeds i32::MAX"))?
.unsigned_abs();
let (tx, rx) = mpsc::sync_channel(1);
let op = Op {
kind: OpKind::Read {
fd,
buf: UnsafeSendMutPtr(buf.as_mut_ptr()),
len,
offset,
},
result_tx: tx,
};
self.send_and_wait(op, &rx)
}
fn submit_write(&self, fd: i32, buf: &[u8], offset: u64) -> io::Result<u32> {
let len: u32 = i32::try_from(buf.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "buffer exceeds i32::MAX"))?
.unsigned_abs();
let (tx, rx) = mpsc::sync_channel(1);
let op = Op {
kind: OpKind::Write {
fd,
buf: UnsafeSendConstPtr(buf.as_ptr()),
len,
offset,
},
result_tx: tx,
};
self.send_and_wait(op, &rx)
}
fn submit_fsync(&self, fd: i32, datasync: bool) -> io::Result<u32> {
let (tx, rx) = mpsc::sync_channel(1);
let op = Op {
kind: OpKind::Fsync { fd, datasync },
result_tx: tx,
};
self.send_and_wait(op, &rx)
}
fn send_and_wait(&self, op: Op, rx: &mpsc::Receiver<i32>) -> io::Result<u32> {
self.tx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "io_uring thread shut down"))?
.send(op)
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "io_uring thread exited"))?;
let result = rx
.recv()
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "io_uring thread exited"))?;
if result >= 0 {
#[expect(clippy::cast_sign_loss, reason = "guarded by result >= 0 check above")]
Ok(result as u32)
} else {
Err(io::Error::from_raw_os_error(-result))
}
}
}
impl Drop for RingThread {
#[cfg_attr(coverage_nightly, coverage(off))]
fn drop(&mut self) {
let tx = match self.tx.get_mut() {
Ok(tx) => tx,
Err(poisoned) => poisoned.into_inner(),
};
*tx = None;
let handle_slot = match self.handle.get_mut() {
Ok(h) => h,
Err(poisoned) => poisoned.into_inner(),
};
if let Some(handle) = handle_slot.take()
&& handle.join().is_err()
{
log::error!("io_uring ring thread panicked during shutdown");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Read, Write};
use std::sync::Arc;
use test_log::test;
fn try_io_uring() -> Option<IoUringFs> {
if !is_io_uring_available() {
eprintln!("skipping: io_uring not supported by kernel");
return None;
}
Some(IoUringFs::new().expect("io_uring available but IoUringFs::new() failed"))
}
#[test]
fn probe_availability() {
let available = is_io_uring_available();
eprintln!("io_uring available: {available}");
}
#[test]
fn create_read_write() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.txt");
let opts = FsOpenOptions::new().write(true).create(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"hello world")?;
file.sync_all()?;
drop(file);
let opts = FsOpenOptions::new().read(true);
let mut file = fs.open(&path, &opts)?;
let mut buf = String::new();
file.read_to_string(&mut buf)?;
assert_eq!(buf, "hello world");
Ok(())
}
#[test]
fn read_at_pread_semantics() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("pread.bin");
let opts = FsOpenOptions::new().write(true).create(true).read(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"hello world")?;
file.sync_data()?;
let mut buf = [0u8; 5];
let n = file.read_at(&mut buf, 6)?;
assert_eq!(n, 5);
assert_eq!(&buf, b"world");
let n = file.read_at(&mut buf, 0)?;
assert_eq!(n, 5);
assert_eq!(&buf, b"hello");
Ok(())
}
#[test]
fn directory_operations() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let nested = dir.path().join("a").join("b").join("c");
fs.create_dir_all(&nested)?;
assert!(fs.exists(&nested)?);
let file_path = nested.join("data.bin");
let opts = FsOpenOptions::new().write(true).create_new(true);
let mut file = fs.open(&file_path, &opts)?;
file.write_all(b"data")?;
drop(file);
let entries: Vec<_> = fs.read_dir(&nested)?;
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].file_name, "data.bin");
let meta = fs.metadata(&file_path)?;
assert!(meta.is_file);
assert_eq!(meta.len, 4);
fs.remove_file(&file_path)?;
assert!(!fs.exists(&file_path)?);
let top = dir.path().join("a");
fs.remove_dir_all(&top)?;
assert!(!fs.exists(&top)?);
Ok(())
}
#[test]
fn rename() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let src = dir.path().join("src.txt");
let dst = dir.path().join("dst.txt");
let opts = FsOpenOptions::new().write(true).create(true);
let mut file = fs.open(&src, &opts)?;
file.write_all(b"content")?;
drop(file);
fs.rename(&src, &dst)?;
assert!(!fs.exists(&src)?);
assert!(fs.exists(&dst)?);
Ok(())
}
#[test]
fn sync_directory() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
fs.sync_directory(dir.path())?;
Ok(())
}
#[test]
fn file_metadata() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("meta.bin");
let opts = FsOpenOptions::new().write(true).create(true).read(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"12345")?;
let meta = file.metadata()?;
assert!(meta.is_file);
assert_eq!(meta.len, 5);
Ok(())
}
#[test]
fn file_set_len() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("truncate.bin");
let opts = FsOpenOptions::new().write(true).create(true).read(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"hello world")?;
file.set_len(5)?;
let meta = file.metadata()?;
assert_eq!(meta.len, 5);
Ok(())
}
#[test]
fn lock_exclusive() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("lockfile");
let opts = FsOpenOptions::new().write(true).create(true);
let file = fs.open(&path, &opts)?;
file.lock_exclusive()?;
Ok(())
}
#[test]
fn truncate_and_append() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("trunc.txt");
let opts = FsOpenOptions::new().write(true).create(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"hello world")?;
drop(file);
let opts = FsOpenOptions::new().write(true).truncate(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"hi")?;
drop(file);
let meta = fs.metadata(&path)?;
assert_eq!(meta.len, 2);
let opts = FsOpenOptions::new().write(true).append(true);
let mut file = fs.open(&path, &opts)?;
file.seek(SeekFrom::Start(0))?;
file.write_all(b"!")?;
drop(file);
let mut file = fs.open(&path, &FsOpenOptions::new().read(true))?;
let mut buf = String::new();
file.read_to_string(&mut buf)?;
assert_eq!(buf, "hi!");
assert_eq!(fs.metadata(&path)?.len, 3);
Ok(())
}
#[test]
fn seek_operations() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("seek.bin");
let opts = FsOpenOptions::new().write(true).create(true).read(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"hello world")?;
file.seek(SeekFrom::Start(0))?;
let mut buf = [0u8; 5];
file.read_exact(&mut buf)?;
assert_eq!(&buf, b"hello");
file.seek(SeekFrom::Current(1))?;
file.read_exact(&mut buf)?;
assert_eq!(&buf, b"world");
let pos = file.seek(SeekFrom::End(-5))?;
assert_eq!(pos, 6);
Ok(())
}
#[test]
fn concurrent_read_at() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("concurrent.bin");
let opts = FsOpenOptions::new().write(true).create(true).read(true);
let mut file = fs.open(&path, &opts)?;
#[expect(clippy::cast_possible_truncation, reason = "% 256 guarantees 0..=255")]
let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
file.write_all(&data)?;
file.sync_all()?;
let file = Arc::new(file);
let mut handles = Vec::new();
for chunk_start in (0..1000).step_by(100) {
let file = Arc::clone(&file);
handles.push(thread::spawn(move || -> io::Result<()> {
let mut buf = [0u8; 100];
let n = file.read_at(&mut buf, chunk_start as u64)?;
assert_eq!(n, 100);
for (i, &byte) in buf.iter().enumerate() {
#[expect(clippy::cast_possible_truncation, reason = "% 256 guarantees 0..=255")]
let expected = ((chunk_start + i) % 256) as u8;
assert_eq!(byte, expected);
}
Ok(())
}));
}
for h in handles {
match h.join() {
Ok(result) => result?,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "thread panicked")),
}
}
Ok(())
}
#[test]
fn metadata_directory() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let meta = fs.metadata(dir.path())?;
assert!(meta.is_dir);
assert!(!meta.is_file);
Ok(())
}
#[test]
fn object_safety() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let fs: Arc<dyn Fs> = Arc::new(fs);
let dir = tempfile::tempdir()?;
let bogus = dir.path().join("nonexistent");
assert!(!fs.exists(&bogus)?);
Ok(())
}
#[test]
fn empty_buffer_returns_zero() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("empty_buf.bin");
let opts = FsOpenOptions::new().write(true).create(true).read(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"data")?;
let n = file.read_at(&mut [], 0)?;
assert_eq!(n, 0);
let n = file.read(&mut [])?;
assert_eq!(n, 0);
let n = file.write(&[])?;
assert_eq!(n, 0);
file.flush()?;
Ok(())
}
#[test]
fn sync_directory_rejects_file() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("not_a_dir.txt");
let opts = FsOpenOptions::new().write(true).create(true);
fs.open(&path, &opts)?;
match fs.sync_directory(&path) {
Ok(()) => panic!("sync_directory on a file should fail"),
Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
}
Ok(())
}
#[test]
fn seek_overflow_returns_error() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("seek_overflow.bin");
let opts = FsOpenOptions::new().write(true).create(true).read(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"data")?;
file.seek(SeekFrom::Start(u64::MAX - 1))?;
match file.seek(SeekFrom::Current(2)) {
Ok(_) => panic!("seek past u64::MAX should fail"),
Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
}
file.seek(SeekFrom::Start(0))?;
match file.seek(SeekFrom::Current(-1)) {
Ok(_) => panic!("seek before zero should fail"),
Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
}
match file.seek(SeekFrom::End(-100)) {
Ok(_) => panic!("seek before zero should fail"),
Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
}
Ok(())
}
#[test]
fn debug_impl() {
let Some(fs) = try_io_uring() else {
return;
};
let debug = format!("{fs:?}");
assert!(debug.contains("IoUringFs"));
}
#[test]
fn with_ring_size() -> io::Result<()> {
if !is_io_uring_available() {
eprintln!("skipping: io_uring not supported by kernel");
return Ok(());
}
let fs = IoUringFs::with_ring_size(64)
.expect("io_uring available but with_ring_size(64) failed");
let dir = tempfile::tempdir()?;
let path = dir.path().join("ring64.bin");
let opts = FsOpenOptions::new().write(true).create(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"ok")?;
file.sync_all()?;
assert_eq!(fs.metadata(&path)?.len, 2);
Ok(())
}
#[test]
fn seek_negative_from_current() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let dir = tempfile::tempdir()?;
let path = dir.path().join("seek_neg.bin");
let opts = FsOpenOptions::new().write(true).create(true).read(true);
let mut file = fs.open(&path, &opts)?;
file.write_all(b"abcdefghij")?;
file.seek(SeekFrom::Start(8))?;
let pos = file.seek(SeekFrom::Current(-3))?;
assert_eq!(pos, 5);
let mut buf = [0u8; 5];
file.read_exact(&mut buf)?;
assert_eq!(&buf, b"fghij");
Ok(())
}
#[test]
fn clone_shares_ring() -> io::Result<()> {
let Some(fs) = try_io_uring() else {
return Ok(());
};
let fs2 = fs.clone();
let dir = tempfile::tempdir()?;
let p1 = dir.path().join("a.txt");
let p2 = dir.path().join("b.txt");
let opts = FsOpenOptions::new().write(true).create(true);
let mut f1 = fs.open(&p1, &opts)?;
let mut f2 = fs2.open(&p2, &opts)?;
f1.write_all(b"one")?;
f2.write_all(b"two")?;
f1.sync_all()?;
f2.sync_all()?;
assert_eq!(fs.metadata(&p1)?.len, 3);
assert_eq!(fs2.metadata(&p2)?.len, 3);
Ok(())
}
}