#![cfg(target_os = "linux")]
#![allow(dead_code)]
use crate::{Error, Result};
use crossbeam_channel::{bounded, Receiver, Sender};
use std::os::fd::RawFd;
use std::thread::{self, JoinHandle};
pub(crate) struct IoUringRing {
tx: Option<Sender<Op>>,
join: Option<JoinHandle<()>>,
}
enum Op {
Write {
fd: RawFd,
buf_ptr: usize,
buf_len: usize,
offset: u64,
reply: Sender<Result<usize>>,
},
Read {
fd: RawFd,
buf_ptr: usize,
buf_len: usize,
offset: u64,
reply: Sender<Result<usize>>,
},
Fdatasync {
fd: RawFd,
reply: Sender<Result<()>>,
},
}
impl IoUringRing {
pub(crate) fn new(queue_depth: u32) -> Result<Self> {
match io_uring::IoUring::new(queue_depth) {
Ok(_probe) => {}
Err(source) => return Err(Error::IoUringSetupFailed { source }),
}
let cap = (queue_depth as usize).max(1).saturating_mul(2);
let (tx, rx) = bounded::<Op>(cap);
let join = thread::Builder::new()
.name("fsys-iouring".to_string())
.spawn(move || {
owner_loop(queue_depth, rx);
})
.map_err(|source| Error::IoUringSetupFailed { source })?;
Ok(Self {
tx: Some(tx),
join: Some(join),
})
}
pub(crate) fn write_at(&self, fd: RawFd, buf: &[u8], offset: u64) -> Result<usize> {
let (rt, rr) = bounded::<Result<usize>>(1);
let buf_ptr = buf.as_ptr() as usize;
let buf_len = buf.len();
self.send(Op::Write {
fd,
buf_ptr,
buf_len,
offset,
reply: rt,
})?;
rr.recv().map_err(|_| owner_dead())?
}
pub(crate) fn read_at(&self, fd: RawFd, buf: &mut [u8], offset: u64) -> Result<usize> {
let (rt, rr) = bounded::<Result<usize>>(1);
let buf_ptr = buf.as_mut_ptr() as usize;
let buf_len = buf.len();
self.send(Op::Read {
fd,
buf_ptr,
buf_len,
offset,
reply: rt,
})?;
rr.recv().map_err(|_| owner_dead())?
}
pub(crate) fn fdatasync(&self, fd: RawFd) -> Result<()> {
let (rt, rr) = bounded::<Result<()>>(1);
self.send(Op::Fdatasync { fd, reply: rt })?;
rr.recv().map_err(|_| owner_dead())?
}
fn send(&self, op: Op) -> Result<()> {
self.tx
.as_ref()
.ok_or_else(owner_dead)?
.send(op)
.map_err(|_| owner_dead())
}
}
impl Drop for IoUringRing {
fn drop(&mut self) {
drop(self.tx.take());
if let Some(j) = self.join.take() {
let _ = j.join();
}
}
}
fn owner_loop(queue_depth: u32, rx: Receiver<Op>) {
let mut ring = match io_uring::IoUring::new(queue_depth) {
Ok(r) => r,
Err(_) => return,
};
while let Ok(op) = rx.recv() {
match op {
Op::Write {
fd,
buf_ptr,
buf_len,
offset,
reply,
} => {
let entry = io_uring::opcode::Write::new(
io_uring::types::Fd(fd),
buf_ptr as *const u8,
buf_len as u32,
)
.offset(offset)
.build();
let push = unsafe { ring.submission().push(&entry) };
if push.is_err() {
let _ = reply.send(Err(io_err("io_uring submission queue full")));
continue;
}
let result = match ring.submit_and_wait(1) {
Ok(_) => match ring.completion().next() {
Some(c) if c.result() < 0 => {
Err(Error::Io(std::io::Error::from_raw_os_error(-c.result())))
}
Some(c) => Ok(c.result() as usize),
None => Err(io_err("io_uring completion queue empty")),
},
Err(e) => Err(Error::Io(e)),
};
let _ = reply.send(result);
}
Op::Read {
fd,
buf_ptr,
buf_len,
offset,
reply,
} => {
let entry = io_uring::opcode::Read::new(
io_uring::types::Fd(fd),
buf_ptr as *mut u8,
buf_len as u32,
)
.offset(offset)
.build();
let push = unsafe { ring.submission().push(&entry) };
if push.is_err() {
let _ = reply.send(Err(io_err("io_uring submission queue full")));
continue;
}
let result = match ring.submit_and_wait(1) {
Ok(_) => match ring.completion().next() {
Some(c) if c.result() < 0 => {
Err(Error::Io(std::io::Error::from_raw_os_error(-c.result())))
}
Some(c) => Ok(c.result() as usize),
None => Err(io_err("io_uring completion queue empty")),
},
Err(e) => Err(Error::Io(e)),
};
let _ = reply.send(result);
}
Op::Fdatasync { fd, reply } => {
let entry = io_uring::opcode::Fsync::new(io_uring::types::Fd(fd))
.flags(io_uring::types::FsyncFlags::DATASYNC)
.build();
let push = unsafe { ring.submission().push(&entry) };
if push.is_err() {
let _ = reply.send(Err(io_err("io_uring submission queue full")));
continue;
}
let result = match ring.submit_and_wait(1) {
Ok(_) => match ring.completion().next() {
Some(c) if c.result() < 0 => {
Err(Error::Io(std::io::Error::from_raw_os_error(-c.result())))
}
Some(_) => Ok(()),
None => Err(io_err("io_uring completion queue empty")),
},
Err(e) => Err(Error::Io(e)),
};
let _ = reply.send(result);
}
}
}
}
fn io_err(msg: &'static str) -> Error {
Error::Io(std::io::Error::other(msg))
}
fn owner_dead() -> Error {
Error::Io(std::io::Error::other("io_uring owner thread terminated"))
}
pub(crate) struct NvmeAccess {
pub(crate) char_dev: std::fs::File,
pub(crate) nsid: u32,
}
pub(crate) fn nvme_flush_capable(fd: RawFd) -> Option<NvmeAccess> {
if std::env::var_os("FSYS_DISABLE_NVME_PASSTHROUGH").is_some() {
return None;
}
let nvme_dev = nvme_char_device_for(fd)?;
let nsid = nvme_namespace_id_for(fd).unwrap_or(1);
let char_dev = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&nvme_dev)
.ok()?;
Some(NvmeAccess { char_dev, nsid })
}
pub(crate) fn nvme_flush_ioctl(nvme_fd: RawFd, nsid: u32) -> Result<()> {
#[repr(C)]
#[derive(Default)]
struct NvmePassthruCmd {
opcode: u8,
flags: u8,
rsvd1: u16,
nsid: u32,
cdw2: u32,
cdw3: u32,
metadata: u64,
addr: u64,
metadata_len: u32,
data_len: u32,
cdw10: u32,
cdw11: u32,
cdw12: u32,
cdw13: u32,
cdw14: u32,
cdw15: u32,
timeout_ms: u32,
result: u32,
}
const NVME_IOCTL_IO_CMD: libc::c_ulong = 0xc040_4e43;
let mut cmd = NvmePassthruCmd {
opcode: 0x00, nsid,
..Default::default()
};
let rc = unsafe { libc::ioctl(nvme_fd, NVME_IOCTL_IO_CMD, &mut cmd) };
if rc < 0 {
return Err(Error::Io(std::io::Error::last_os_error()));
}
Ok(())
}
fn nvme_char_device_for(fd: RawFd) -> Option<std::path::PathBuf> {
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
let rc = unsafe { libc::fstat(fd, &mut stat) };
if rc != 0 {
return None;
}
let dev = stat.st_dev;
let major = libc::major(dev);
let minor = libc::minor(dev);
let block_link = format!("/sys/dev/block/{major}:{minor}");
let resolved = std::fs::canonicalize(&block_link).ok()?;
let name = resolved.file_name()?.to_str()?;
if !name.starts_with("nvme") {
return None;
}
let controller = name.split('n').next()?;
if controller.is_empty() || !controller.starts_with("nvme") {
return None;
}
Some(std::path::PathBuf::from(format!("/dev/{controller}")))
}
fn nvme_namespace_id_for(fd: RawFd) -> Option<u32> {
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
let rc = unsafe { libc::fstat(fd, &mut stat) };
if rc != 0 {
return None;
}
let dev = stat.st_dev;
let major = libc::major(dev);
let minor = libc::minor(dev);
let nsid_path = format!("/sys/dev/block/{major}:{minor}/nsid");
let s = std::fs::read_to_string(&nsid_path).ok()?;
s.trim().parse::<u32>().ok()
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::OpenOptions;
use std::io::Write as _;
use std::os::fd::AsRawFd;
use std::sync::atomic::{AtomicU32, Ordering};
static C: AtomicU32 = AtomicU32::new(0);
fn tmp_path(tag: &str) -> std::path::PathBuf {
let n = C.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_iouring_test_{}_{}_{}",
std::process::id(),
n,
tag
))
}
fn ring_or_skip() -> Option<IoUringRing> {
match IoUringRing::new(8) {
Ok(r) => Some(r),
Err(Error::IoUringSetupFailed { .. }) => None,
Err(e) => panic!("unexpected ring construction error: {e:?}"),
}
}
struct Cleanup(std::path::PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
#[test]
fn ring_construction_returns_ring_or_setup_failed() {
match IoUringRing::new(8) {
Ok(_) => {}
Err(Error::IoUringSetupFailed { .. }) => {}
Err(e) => panic!("unexpected variant: {e:?}"),
}
}
#[test]
fn write_at_round_trip() {
let Some(ring) = ring_or_skip() else { return };
let path = tmp_path("write_rt");
let _g = Cleanup(path.clone());
let f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
let data = vec![0xA5u8; 4096];
let n = ring.write_at(f.as_raw_fd(), &data, 0).expect("write_at");
assert_eq!(n, data.len());
ring.fdatasync(f.as_raw_fd()).expect("fdatasync");
let read_back = std::fs::read(&path).expect("read");
assert_eq!(read_back, data);
}
#[test]
fn read_at_round_trip() {
let Some(ring) = ring_or_skip() else { return };
let path = tmp_path("read_rt");
let _g = Cleanup(path.clone());
let data = vec![0x5Au8; 4096];
std::fs::write(&path, &data).unwrap();
let f = OpenOptions::new().read(true).open(&path).unwrap();
let mut buf = vec![0u8; 4096];
let n = ring.read_at(f.as_raw_fd(), &mut buf, 0).expect("read_at");
assert_eq!(n, data.len());
assert_eq!(buf, data);
}
#[test]
fn concurrent_submitters_serialise_through_owner() {
let Some(ring) = ring_or_skip() else { return };
let ring = std::sync::Arc::new(ring);
let path = tmp_path("concurrent");
let _g = Cleanup(path.clone());
let mut f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
f.write_all(&vec![0u8; 16 * 4096]).unwrap();
drop(f);
let f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
let fd = f.as_raw_fd();
let mut handles = Vec::new();
for i in 0..16usize {
let ring = ring.clone();
let payload = vec![i as u8; 4096];
handles.push(std::thread::spawn(move || {
ring.write_at(fd, &payload, (i * 4096) as u64).unwrap()
}));
}
for h in handles {
assert_eq!(h.join().unwrap(), 4096);
}
ring.fdatasync(fd).unwrap();
drop(f);
let bytes = std::fs::read(&path).unwrap();
for i in 0..16 {
let slice = &bytes[i * 4096..(i + 1) * 4096];
assert!(
slice.iter().all(|&b| b == i as u8),
"sector {i} content drift — owner-thread serialisation broken",
);
}
}
}