#![cfg(target_os = "linux")]
#![allow(dead_code)]
use crate::{Error, Result};
use crossbeam_channel::{bounded, Receiver, Sender};
use std::os::fd::RawFd;
use std::thread::{self, JoinHandle};
pub(crate) struct IoUringRing {
tx: Option<Sender<Op>>,
join: Option<JoinHandle<()>>,
}
enum Op {
Write {
fd: RawFd,
buf_ptr: usize,
buf_len: usize,
offset: u64,
reply: Sender<Result<usize>>,
},
Read {
fd: RawFd,
buf_ptr: usize,
buf_len: usize,
offset: u64,
reply: Sender<Result<usize>>,
},
Fdatasync {
fd: RawFd,
reply: Sender<Result<()>>,
},
WriteLinkedFsync {
fd: RawFd,
buf_ptr: usize,
buf_len: usize,
offset: u64,
reply: Sender<Result<usize>>,
},
RegisterBuffers {
iovs: Vec<(usize, usize)>,
reply: Sender<Result<()>>,
},
WriteFixed {
fd: RawFd,
buf_idx: u16,
buf_ptr: usize,
buf_len: usize,
offset: u64,
reply: Sender<Result<usize>>,
},
}
impl IoUringRing {
pub(crate) fn new(queue_depth: u32, sqpoll_idle_ms: Option<u32>) -> Result<Self> {
let mut probe_builder = io_uring::IoUring::builder();
super::iouring_features::apply(&mut probe_builder, super::iouring_features::RingMode::Sync);
if let Some(idle_ms) = sqpoll_idle_ms {
let _ = probe_builder.setup_sqpoll(idle_ms);
}
match probe_builder.build(queue_depth) {
Ok(_probe) => {}
Err(source) => return Err(Error::IoUringSetupFailed { source }),
}
let cap = (queue_depth as usize).max(1).saturating_mul(2);
let (tx, rx) = bounded::<Op>(cap);
let join = thread::Builder::new()
.name("fsys-iouring".to_string())
.spawn(move || {
owner_loop(queue_depth, rx, sqpoll_idle_ms);
})
.map_err(|source| Error::IoUringSetupFailed { source })?;
Ok(Self {
tx: Some(tx),
join: Some(join),
})
}
pub(crate) fn write_at(&self, fd: RawFd, buf: &[u8], offset: u64) -> Result<usize> {
let (rt, rr) = bounded::<Result<usize>>(1);
let buf_ptr = buf.as_ptr() as usize;
let buf_len = buf.len();
self.send(Op::Write {
fd,
buf_ptr,
buf_len,
offset,
reply: rt,
})?;
rr.recv().map_err(|_| owner_dead())?
}
pub(crate) fn read_at(&self, fd: RawFd, buf: &mut [u8], offset: u64) -> Result<usize> {
let (rt, rr) = bounded::<Result<usize>>(1);
let buf_ptr = buf.as_mut_ptr() as usize;
let buf_len = buf.len();
self.send(Op::Read {
fd,
buf_ptr,
buf_len,
offset,
reply: rt,
})?;
rr.recv().map_err(|_| owner_dead())?
}
pub(crate) fn fdatasync(&self, fd: RawFd) -> Result<()> {
let (rt, rr) = bounded::<Result<()>>(1);
self.send(Op::Fdatasync { fd, reply: rt })?;
rr.recv().map_err(|_| owner_dead())?
}
pub(crate) fn write_at_linked_fsync(
&self,
fd: RawFd,
buf: &[u8],
offset: u64,
) -> Result<usize> {
let (rt, rr) = bounded::<Result<usize>>(1);
let buf_ptr = buf.as_ptr() as usize;
let buf_len = buf.len();
self.send(Op::WriteLinkedFsync {
fd,
buf_ptr,
buf_len,
offset,
reply: rt,
})?;
rr.recv().map_err(|_| owner_dead())?
}
pub(crate) fn register_buffers(&self, iovs: &[(usize, usize)]) -> Result<()> {
let (rt, rr) = bounded::<Result<()>>(1);
self.send(Op::RegisterBuffers {
iovs: iovs.to_vec(),
reply: rt,
})?;
rr.recv().map_err(|_| owner_dead())?
}
pub(crate) fn write_at_fixed(
&self,
fd: RawFd,
buf_idx: u16,
buf: &[u8],
offset: u64,
) -> Result<usize> {
let (rt, rr) = bounded::<Result<usize>>(1);
let buf_ptr = buf.as_ptr() as usize;
let buf_len = buf.len();
self.send(Op::WriteFixed {
fd,
buf_idx,
buf_ptr,
buf_len,
offset,
reply: rt,
})?;
rr.recv().map_err(|_| owner_dead())?
}
fn send(&self, op: Op) -> Result<()> {
self.tx
.as_ref()
.ok_or_else(owner_dead)?
.send(op)
.map_err(|_| owner_dead())
}
}
impl Drop for IoUringRing {
fn drop(&mut self) {
drop(self.tx.take());
if let Some(j) = self.join.take() {
let _ = j.join();
}
}
}
fn owner_loop(queue_depth: u32, rx: Receiver<Op>, sqpoll_idle_ms: Option<u32>) {
let mut builder = io_uring::IoUring::builder();
super::iouring_features::apply(&mut builder, super::iouring_features::RingMode::Sync);
if let Some(idle_ms) = sqpoll_idle_ms {
let _ = builder.setup_sqpoll(idle_ms);
}
let mut ring = match builder.build(queue_depth) {
Ok(r) => r,
Err(_) => return,
};
let mut fd_registry = FdRegistry::new();
let _ = fd_registry.initial_register(&ring.submitter());
while let Ok(op) = rx.recv() {
match op {
Op::Write {
fd,
buf_ptr,
buf_len,
offset,
reply,
} => {
let entry =
if let Some(slot) = fd_registry.try_get_or_register(&ring.submitter(), fd) {
io_uring::opcode::Write::new(
io_uring::types::Fixed(slot),
buf_ptr as *const u8,
buf_len as u32,
)
.offset(offset)
.build()
} else {
io_uring::opcode::Write::new(
io_uring::types::Fd(fd),
buf_ptr as *const u8,
buf_len as u32,
)
.offset(offset)
.build()
};
let push = unsafe { ring.submission().push(&entry) };
if push.is_err() {
let _ = reply.send(Err(io_err("io_uring submission queue full")));
continue;
}
let result = match ring.submit_and_wait(1) {
Ok(_) => match ring.completion().next() {
Some(c) if c.result() < 0 => {
Err(Error::Io(std::io::Error::from_raw_os_error(-c.result())))
}
Some(c) => Ok(c.result() as usize),
None => Err(io_err("io_uring completion queue empty")),
},
Err(e) => Err(Error::Io(e)),
};
let _ = reply.send(result);
}
Op::Read {
fd,
buf_ptr,
buf_len,
offset,
reply,
} => {
let entry =
if let Some(slot) = fd_registry.try_get_or_register(&ring.submitter(), fd) {
io_uring::opcode::Read::new(
io_uring::types::Fixed(slot),
buf_ptr as *mut u8,
buf_len as u32,
)
.offset(offset)
.build()
} else {
io_uring::opcode::Read::new(
io_uring::types::Fd(fd),
buf_ptr as *mut u8,
buf_len as u32,
)
.offset(offset)
.build()
};
let push = unsafe { ring.submission().push(&entry) };
if push.is_err() {
let _ = reply.send(Err(io_err("io_uring submission queue full")));
continue;
}
let result = match ring.submit_and_wait(1) {
Ok(_) => match ring.completion().next() {
Some(c) if c.result() < 0 => {
Err(Error::Io(std::io::Error::from_raw_os_error(-c.result())))
}
Some(c) => Ok(c.result() as usize),
None => Err(io_err("io_uring completion queue empty")),
},
Err(e) => Err(Error::Io(e)),
};
let _ = reply.send(result);
}
Op::Fdatasync { fd, reply } => {
let entry =
if let Some(slot) = fd_registry.try_get_or_register(&ring.submitter(), fd) {
io_uring::opcode::Fsync::new(io_uring::types::Fixed(slot))
.flags(io_uring::types::FsyncFlags::DATASYNC)
.build()
} else {
io_uring::opcode::Fsync::new(io_uring::types::Fd(fd))
.flags(io_uring::types::FsyncFlags::DATASYNC)
.build()
};
let push = unsafe { ring.submission().push(&entry) };
if push.is_err() {
let _ = reply.send(Err(io_err("io_uring submission queue full")));
continue;
}
let result = match ring.submit_and_wait(1) {
Ok(_) => match ring.completion().next() {
Some(c) if c.result() < 0 => {
Err(Error::Io(std::io::Error::from_raw_os_error(-c.result())))
}
Some(_) => Ok(()),
None => Err(io_err("io_uring completion queue empty")),
},
Err(e) => Err(Error::Io(e)),
};
let _ = reply.send(result);
}
Op::WriteLinkedFsync {
fd,
buf_ptr,
buf_len,
offset,
reply,
} => {
let (write_entry, fsync_entry) =
if let Some(slot) = fd_registry.try_get_or_register(&ring.submitter(), fd) {
(
io_uring::opcode::Write::new(
io_uring::types::Fixed(slot),
buf_ptr as *const u8,
buf_len as u32,
)
.offset(offset)
.build()
.flags(io_uring::squeue::Flags::IO_LINK),
io_uring::opcode::Fsync::new(io_uring::types::Fixed(slot))
.flags(io_uring::types::FsyncFlags::DATASYNC)
.build(),
)
} else {
(
io_uring::opcode::Write::new(
io_uring::types::Fd(fd),
buf_ptr as *const u8,
buf_len as u32,
)
.offset(offset)
.build()
.flags(io_uring::squeue::Flags::IO_LINK),
io_uring::opcode::Fsync::new(io_uring::types::Fd(fd))
.flags(io_uring::types::FsyncFlags::DATASYNC)
.build(),
)
};
let push_result = unsafe {
let mut sq = ring.submission();
sq.push(&write_entry).and_then(|()| sq.push(&fsync_entry))
};
if push_result.is_err() {
let _ = reply.send(Err(io_err(
"io_uring submission queue full (linked write+fsync)",
)));
continue;
}
let result = match ring.submit_and_wait(2) {
Ok(_) => {
let cqe1 = ring.completion().next();
let cqe2 = ring.completion().next();
match (cqe1, cqe2) {
(Some(w), Some(f)) => {
if w.result() < 0 {
Err(Error::Io(std::io::Error::from_raw_os_error(-w.result())))
} else if f.result() < 0 {
Err(Error::Io(std::io::Error::from_raw_os_error(-f.result())))
} else {
Ok(w.result() as usize)
}
}
_ => Err(io_err(
"io_uring completion queue short on linked write+fsync",
)),
}
}
Err(e) => Err(Error::Io(e)),
};
let _ = reply.send(result);
}
Op::RegisterBuffers { iovs, reply } => {
let iovec_array: Vec<libc::iovec> = iovs
.iter()
.map(|(p, l)| libc::iovec {
iov_base: *p as *mut libc::c_void,
iov_len: *l,
})
.collect();
let result =
unsafe { ring.submitter().register_buffers(&iovec_array) }.map_err(Error::Io);
let _ = reply.send(result);
}
Op::WriteFixed {
fd,
buf_idx,
buf_ptr,
buf_len,
offset,
reply,
} => {
let entry =
if let Some(slot) = fd_registry.try_get_or_register(&ring.submitter(), fd) {
io_uring::opcode::WriteFixed::new(
io_uring::types::Fixed(slot),
buf_ptr as *const u8,
buf_len as u32,
buf_idx,
)
.offset(offset)
.build()
} else {
io_uring::opcode::WriteFixed::new(
io_uring::types::Fd(fd),
buf_ptr as *const u8,
buf_len as u32,
buf_idx,
)
.offset(offset)
.build()
};
let push = unsafe { ring.submission().push(&entry) };
if push.is_err() {
let _ = reply.send(Err(io_err("io_uring submission queue full (WriteFixed)")));
continue;
}
let result = match ring.submit_and_wait(1) {
Ok(_) => match ring.completion().next() {
Some(c) if c.result() < 0 => {
Err(Error::Io(std::io::Error::from_raw_os_error(-c.result())))
}
Some(c) => Ok(c.result() as usize),
None => Err(io_err("io_uring completion queue empty (WriteFixed)")),
},
Err(e) => Err(Error::Io(e)),
};
let _ = reply.send(result);
}
}
}
}
fn io_err(msg: &'static str) -> Error {
Error::Io(std::io::Error::other(msg))
}
fn owner_dead() -> Error {
Error::Io(std::io::Error::other("io_uring owner thread terminated"))
}
struct FdRegistry {
slots: Vec<RawFd>,
fd_to_slot: std::collections::HashMap<RawFd, u32>,
registered: bool,
}
const SLOT_TABLE_SIZE: usize = 16;
impl FdRegistry {
fn new() -> Self {
Self {
slots: vec![-1; SLOT_TABLE_SIZE],
fd_to_slot: std::collections::HashMap::new(),
registered: false,
}
}
fn initial_register(&mut self, submitter: &io_uring::Submitter<'_>) -> std::io::Result<()> {
submitter.register_files(&self.slots)?;
self.registered = true;
Ok(())
}
fn try_get_or_register(
&mut self,
submitter: &io_uring::Submitter<'_>,
fd: RawFd,
) -> Option<u32> {
if !self.registered {
return None;
}
if let Some(&slot) = self.fd_to_slot.get(&fd) {
return Some(slot);
}
let slot_idx = self.slots.iter().position(|&s| s == -1)?;
let update = [fd];
let updated = submitter
.register_files_update(slot_idx as u32, &update)
.ok()?;
if updated == 0 {
return None;
}
self.slots[slot_idx] = fd;
let _ = self.fd_to_slot.insert(fd, slot_idx as u32);
Some(slot_idx as u32)
}
}
pub(crate) struct NvmeAccess {
pub(crate) char_dev: std::fs::File,
pub(crate) nsid: u32,
}
pub(crate) fn nvme_flush_capable(fd: RawFd) -> Option<NvmeAccess> {
if std::env::var_os("FSYS_DISABLE_NVME_PASSTHROUGH").is_some() {
return None;
}
let nvme_dev = nvme_char_device_for(fd)?;
let nsid = nvme_namespace_id_for(fd).unwrap_or(1);
let char_dev = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&nvme_dev)
.ok()?;
Some(NvmeAccess { char_dev, nsid })
}
pub(crate) fn nvme_identify_namespace(nvme_fd: RawFd, nsid: u32) -> Result<[u8; 4096]> {
#[repr(C)]
#[derive(Default)]
struct NvmePassthruCmd {
opcode: u8,
flags: u8,
rsvd1: u16,
nsid: u32,
cdw2: u32,
cdw3: u32,
metadata: u64,
addr: u64,
metadata_len: u32,
data_len: u32,
cdw10: u32,
cdw11: u32,
cdw12: u32,
cdw13: u32,
cdw14: u32,
cdw15: u32,
timeout_ms: u32,
result: u32,
}
const NVME_IOCTL_ADMIN_CMD: libc::c_ulong = 0xc040_4e41;
const OPC_IDENTIFY: u8 = 0x06;
const CNS_NAMESPACE: u32 = 0x0000_0000;
const ID_BUF_LEN: usize = 4096;
let mut buf = [0u8; ID_BUF_LEN];
let mut cmd = NvmePassthruCmd {
opcode: OPC_IDENTIFY,
nsid,
addr: buf.as_mut_ptr() as u64,
data_len: ID_BUF_LEN as u32,
cdw10: CNS_NAMESPACE,
..Default::default()
};
let rc = unsafe { libc::ioctl(nvme_fd, NVME_IOCTL_ADMIN_CMD, &mut cmd) };
if rc < 0 {
return Err(Error::Io(std::io::Error::last_os_error()));
}
if cmd.result != 0 {
return Err(Error::Io(std::io::Error::other(format!(
"NVMe Identify Namespace returned status 0x{:x}",
cmd.result
))));
}
Ok(buf)
}
pub(crate) fn parse_nawun_nawupf(id_buf: &[u8; 4096]) -> (Option<u32>, Option<u32>) {
let nawun = u16::from_le_bytes([id_buf[74], id_buf[75]]);
let nawupf = u16::from_le_bytes([id_buf[76], id_buf[77]]);
let cvt = |v: u16| -> Option<u32> {
if v == u16::MAX {
None
} else {
Some(v as u32)
}
};
(cvt(nawun), cvt(nawupf))
}
pub(crate) fn nvme_flush_ioctl(nvme_fd: RawFd, nsid: u32) -> Result<()> {
#[repr(C)]
#[derive(Default)]
struct NvmePassthruCmd {
opcode: u8,
flags: u8,
rsvd1: u16,
nsid: u32,
cdw2: u32,
cdw3: u32,
metadata: u64,
addr: u64,
metadata_len: u32,
data_len: u32,
cdw10: u32,
cdw11: u32,
cdw12: u32,
cdw13: u32,
cdw14: u32,
cdw15: u32,
timeout_ms: u32,
result: u32,
}
const NVME_IOCTL_IO_CMD: libc::c_ulong = 0xc040_4e43;
let mut cmd = NvmePassthruCmd {
opcode: 0x00, nsid,
..Default::default()
};
let rc = unsafe { libc::ioctl(nvme_fd, NVME_IOCTL_IO_CMD, &mut cmd) };
if rc < 0 {
return Err(Error::Io(std::io::Error::last_os_error()));
}
Ok(())
}
fn nvme_char_device_for(fd: RawFd) -> Option<std::path::PathBuf> {
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
let rc = unsafe { libc::fstat(fd, &mut stat) };
if rc != 0 {
return None;
}
let dev = stat.st_dev;
let major = libc::major(dev);
let minor = libc::minor(dev);
let block_link = format!("/sys/dev/block/{major}:{minor}");
let resolved = std::fs::canonicalize(&block_link).ok()?;
let name = resolved.file_name()?.to_str()?;
if !name.starts_with("nvme") {
return None;
}
let controller = name.split('n').next()?;
if controller.is_empty() || !controller.starts_with("nvme") {
return None;
}
Some(std::path::PathBuf::from(format!("/dev/{controller}")))
}
fn nvme_namespace_id_for(fd: RawFd) -> Option<u32> {
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
let rc = unsafe { libc::fstat(fd, &mut stat) };
if rc != 0 {
return None;
}
let dev = stat.st_dev;
let major = libc::major(dev);
let minor = libc::minor(dev);
let nsid_path = format!("/sys/dev/block/{major}:{minor}/nsid");
let s = std::fs::read_to_string(&nsid_path).ok()?;
s.trim().parse::<u32>().ok()
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::OpenOptions;
use std::io::Write as _;
use std::os::fd::AsRawFd;
use std::sync::atomic::{AtomicU32, Ordering};
static C: AtomicU32 = AtomicU32::new(0);
fn tmp_path(tag: &str) -> std::path::PathBuf {
let n = C.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_iouring_test_{}_{}_{}",
std::process::id(),
n,
tag
))
}
fn ring_or_skip() -> Option<IoUringRing> {
match IoUringRing::new(8, None) {
Ok(r) => Some(r),
Err(Error::IoUringSetupFailed { .. }) => None,
Err(e) => panic!("unexpected ring construction error: {e:?}"),
}
}
struct Cleanup(std::path::PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
#[test]
fn ring_construction_returns_ring_or_setup_failed() {
match IoUringRing::new(8, None) {
Ok(_) => {}
Err(Error::IoUringSetupFailed { .. }) => {}
Err(e) => panic!("unexpected variant: {e:?}"),
}
}
#[test]
fn write_at_round_trip() {
let Some(ring) = ring_or_skip() else { return };
let path = tmp_path("write_rt");
let _g = Cleanup(path.clone());
let f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
let data = vec![0xA5u8; 4096];
let n = ring.write_at(f.as_raw_fd(), &data, 0).expect("write_at");
assert_eq!(n, data.len());
ring.fdatasync(f.as_raw_fd()).expect("fdatasync");
let read_back = std::fs::read(&path).expect("read");
assert_eq!(read_back, data);
}
#[test]
fn read_at_round_trip() {
let Some(ring) = ring_or_skip() else { return };
let path = tmp_path("read_rt");
let _g = Cleanup(path.clone());
let data = vec![0x5Au8; 4096];
std::fs::write(&path, &data).unwrap();
let f = OpenOptions::new().read(true).open(&path).unwrap();
let mut buf = vec![0u8; 4096];
let n = ring.read_at(f.as_raw_fd(), &mut buf, 0).expect("read_at");
assert_eq!(n, data.len());
assert_eq!(buf, data);
}
#[test]
fn concurrent_submitters_serialise_through_owner() {
let Some(ring) = ring_or_skip() else { return };
let ring = std::sync::Arc::new(ring);
let path = tmp_path("concurrent");
let _g = Cleanup(path.clone());
let mut f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
f.write_all(&vec![0u8; 16 * 4096]).unwrap();
drop(f);
let f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
let fd = f.as_raw_fd();
let mut handles = Vec::new();
for i in 0..16usize {
let ring = ring.clone();
let payload = vec![i as u8; 4096];
handles.push(std::thread::spawn(move || {
ring.write_at(fd, &payload, (i * 4096) as u64).unwrap()
}));
}
for h in handles {
assert_eq!(h.join().unwrap(), 4096);
}
ring.fdatasync(fd).unwrap();
drop(f);
let bytes = std::fs::read(&path).unwrap();
for i in 0..16 {
let slice = &bytes[i * 4096..(i + 1) * 4096];
assert!(
slice.iter().all(|&b| b == i as u8),
"sector {i} content drift — owner-thread serialisation broken",
);
}
}
#[test]
fn writes_across_many_distinct_fds_complete_correctly() {
let Some(ring) = ring_or_skip() else { return };
const N_FDS: usize = 20;
const PAYLOAD_LEN: usize = 256;
let mut paths = Vec::with_capacity(N_FDS);
let mut guards = Vec::with_capacity(N_FDS);
let mut files = Vec::with_capacity(N_FDS);
for i in 0..N_FDS {
let path = tmp_path(&format!("manyfds_{i:02}"));
guards.push(Cleanup(path.clone()));
let f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
files.push(f);
paths.push(path);
}
for (i, f) in files.iter().enumerate() {
let payload = vec![i as u8; PAYLOAD_LEN];
let n = ring.write_at(f.as_raw_fd(), &payload, 0).expect("write_at");
assert_eq!(n, PAYLOAD_LEN, "fd {i}: short write");
ring.fdatasync(f.as_raw_fd()).expect("fdatasync");
}
drop(files);
for (i, path) in paths.iter().enumerate() {
let bytes = std::fs::read(path).expect("read");
assert_eq!(
bytes.len(),
PAYLOAD_LEN,
"fd {i}: wrong file size on read-back"
);
assert!(
bytes.iter().all(|&b| b == i as u8),
"fd {i}: content drift — slot/fd mapping bug"
);
}
}
#[test]
fn repeated_writes_on_same_fd_round_trip() {
let Some(ring) = ring_or_skip() else { return };
const N_WRITES: usize = 32;
const PAYLOAD_LEN: usize = 64;
let path = tmp_path("slot_cache");
let _g = Cleanup(path.clone());
let f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
std::fs::write(&path, vec![0u8; N_WRITES * PAYLOAD_LEN]).unwrap();
let fd = f.as_raw_fd();
for i in 0..N_WRITES {
let payload = vec![(i & 0xFF) as u8; PAYLOAD_LEN];
let n = ring
.write_at(fd, &payload, (i * PAYLOAD_LEN) as u64)
.expect("write_at");
assert_eq!(n, PAYLOAD_LEN, "iter {i}: short write");
}
ring.fdatasync(fd).expect("fdatasync");
drop(f);
let bytes = std::fs::read(&path).unwrap();
assert_eq!(bytes.len(), N_WRITES * PAYLOAD_LEN);
for i in 0..N_WRITES {
let slice = &bytes[i * PAYLOAD_LEN..(i + 1) * PAYLOAD_LEN];
let expected = (i & 0xFF) as u8;
assert!(
slice.iter().all(|&b| b == expected),
"iter {i}: content drift (expected {expected}, got {:?}...)",
&slice[..4]
);
}
}
#[test]
fn write_at_linked_fsync_round_trips_under_owner_thread() {
let Some(ring) = ring_or_skip() else { return };
let path = tmp_path("linked_write_fsync");
let _g = Cleanup(path.clone());
let f = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&path)
.unwrap();
let fd = f.as_raw_fd();
let payload = b"linked write + fsync";
let n = ring.write_at_linked_fsync(fd, payload, 0).unwrap();
assert_eq!(n, payload.len());
drop(f);
let bytes = std::fs::read(&path).unwrap();
assert_eq!(bytes, payload);
}
#[test]
fn parse_nawun_nawupf_extracts_le_u16_at_offset_74_76() {
let mut id = [0u8; 4096];
id[74] = 0x07;
id[75] = 0x00;
id[76] = 0x0F;
id[77] = 0x00;
let (nawun, nawupf) = parse_nawun_nawupf(&id);
assert_eq!(nawun, Some(7));
assert_eq!(nawupf, Some(15));
}
#[test]
fn parse_nawun_nawupf_sentinel_0xffff_reads_as_none() {
let mut id = [0u8; 4096];
id[74] = 0xFF;
id[75] = 0xFF;
id[76] = 0xFF;
id[77] = 0xFF;
let (nawun, nawupf) = parse_nawun_nawupf(&id);
assert_eq!(nawun, None);
assert_eq!(nawupf, None);
}
#[test]
fn parse_nawun_nawupf_zero_means_one_block_guarantee() {
let id = [0u8; 4096];
let (nawun, nawupf) = parse_nawun_nawupf(&id);
assert_eq!(nawun, Some(0));
assert_eq!(nawupf, Some(0));
}
}