use std::io;
use std::os::unix::io::AsRawFd;
use io_uring::{opcode, squeue, types, IoUring};
use crate::store::backend::{AlignedBlobBuf, BlobBufPool};
const RING_DEPTH: u32 = 256;
const RING_DEPTH_USIZE: usize = RING_DEPTH as usize;
const CQ_BITMAP_WORDS: usize = RING_DEPTH_USIZE.div_ceil(64);
pub(super) struct UringContext {
ring: IoUring,
raw_fd: i32,
fixed_fd: types::Fixed,
fixed_buffers: bool,
}
#[derive(Clone, Copy)]
struct OrderedWrite<'a> {
offset: u64,
buf: &'a AlignedBlobBuf,
order: usize,
}
impl std::fmt::Debug for UringContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UringContext")
.field("fd", &self.raw_fd)
.finish_non_exhaustive()
}
}
impl UringContext {
pub(super) fn new(file: &std::fs::File, buffers: Option<&BlobBufPool>) -> io::Result<Self> {
let ring = build_ring()?;
let raw_fd = file.as_raw_fd();
ring.submitter().register_files(&[raw_fd])?;
let fixed_buffers = if let Some(buffers) = buffers {
let iovecs = buffers.iovecs();
unsafe {
ring.submitter().register_buffers(&iovecs)?;
}
true
} else {
false
};
Ok(Self {
ring,
raw_fd,
fixed_fd: types::Fixed(0),
fixed_buffers,
})
}
pub(super) fn pwrite_at(&mut self, offset: u64, buf: &AlignedBlobBuf) -> io::Result<()> {
let write = [OrderedWrite {
offset,
buf,
order: 0,
}];
self.submit_write_batch(&write)
}
pub(super) fn pwrite_many_at(&mut self, writes: &[(u64, &AlignedBlobBuf)]) -> io::Result<()> {
let ordered = ordered_writes(writes);
if ordered.is_empty() {
return Ok(());
}
if ordered.len() <= RING_DEPTH_USIZE {
return self.submit_write_batch(&ordered);
}
self.submit_write_stream(&ordered)
}
pub(super) fn pwrite_many_and_sync_at(
&mut self,
writes: &[(u64, &AlignedBlobBuf)],
) -> io::Result<()> {
if writes.is_empty() {
return Ok(());
}
let ordered = ordered_writes(writes);
if ordered.is_empty() {
return Ok(());
}
if ordered.len() <= RING_DEPTH_USIZE {
self.submit_write_batch(&ordered)?;
} else {
self.submit_write_stream(&ordered)?;
}
self.sync_data()
}
pub(super) fn pread_at(&mut self, offset: u64, buf: &mut AlignedBlobBuf) -> io::Result<()> {
let entry = if self.fixed_buffers {
if let Some(buffer_index) = buf.fixed_buffer_index() {
opcode::ReadFixed::new(
self.fixed_fd,
buf.as_mut_ptr(),
buf.len() as u32,
buffer_index,
)
.offset(offset)
.build()
} else {
opcode::Read::new(self.fixed_fd, buf.as_mut_ptr(), buf.len() as u32)
.offset(offset)
.build()
}
} else {
opcode::Read::new(self.fixed_fd, buf.as_mut_ptr(), buf.len() as u32)
.offset(offset)
.build()
}
.user_data(0);
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| io::Error::other("uring SQ full"))?;
}
self.ring.submit_and_wait(1)?;
let cqe = self
.ring
.completion()
.next()
.ok_or_else(|| io::Error::other("uring CQE missing"))?;
let n = cqe.result();
if n < 0 {
return Err(io::Error::from_raw_os_error(-n));
}
if (n as usize) != buf.len() {
return Err(io::Error::other(format!(
"short uring read: read {} of {}",
n,
buf.len()
)));
}
Ok(())
}
pub(super) fn sync_data(&mut self) -> io::Result<()> {
let entry = self.fdatasync_entry();
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| io::Error::other("uring SQ full"))?;
}
self.ring.submit_and_wait(1)?;
let cqe = self
.ring
.completion()
.next()
.ok_or_else(|| io::Error::other("uring CQE missing"))?;
let n = cqe.result();
check_fsync_result(n)
}
fn submit_write_batch(&mut self, chunk: &[OrderedWrite<'_>]) -> io::Result<()> {
debug_assert!(!chunk.is_empty());
debug_assert!(chunk.len() <= RING_DEPTH_USIZE);
for (idx, write) in chunk.iter().enumerate() {
let entry = self
.write_entry(write.offset, write.buf)
.user_data(idx as u64);
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| io::Error::other("uring SQ full"))?;
}
}
self.ring.submit_and_wait(chunk.len())?;
self.drain_write_batch(chunk)
}
fn submit_write_stream(&mut self, writes: &[OrderedWrite<'_>]) -> io::Result<()> {
debug_assert!(writes.len() > RING_DEPTH_USIZE);
let mut next = 0usize;
let mut in_flight = 0usize;
let mut completed = 0usize;
let mut seen = vec![0u64; writes.len().div_ceil(64)];
let mut first_err: Option<io::Error> = None;
while completed < writes.len() {
let mut pushed = 0usize;
if first_err.is_none() {
while next < writes.len() && in_flight < RING_DEPTH_USIZE {
let write = writes[next];
let entry = self
.write_entry(write.offset, write.buf)
.user_data(next as u64);
let pushed_ok = unsafe { self.ring.submission().push(&entry).is_ok() };
if !pushed_ok {
break;
}
next += 1;
in_flight += 1;
pushed += 1;
}
}
if pushed > 0 {
self.ring.submit()?;
}
if in_flight == 0 {
if let Some(err) = first_err {
return Err(err);
}
return Err(io::Error::other("uring SQ full with no writes in flight"));
}
let drained = self.drain_write_stream_available(
writes,
&mut seen,
&mut first_err,
&mut in_flight,
&mut completed,
)?;
if drained == 0 {
self.ring.submit_and_wait(1)?;
self.drain_write_stream_available(
writes,
&mut seen,
&mut first_err,
&mut in_flight,
&mut completed,
)?;
}
if first_err.is_some() && in_flight == 0 {
break;
}
}
if let Some(err) = first_err {
return Err(err);
}
if completed != writes.len() {
return Err(io::Error::other(format!(
"missing uring write completions: completed {completed} of {}",
writes.len()
)));
}
Ok(())
}
fn write_entry(&self, offset: u64, buf: &AlignedBlobBuf) -> squeue::Entry {
if self.fixed_buffers {
if let Some(buffer_index) = buf.fixed_buffer_index() {
return opcode::WriteFixed::new(
self.fixed_fd,
buf.as_ptr(),
buf.len() as u32,
buffer_index,
)
.offset(offset)
.build();
}
}
opcode::Write::new(self.fixed_fd, buf.as_ptr(), buf.len() as u32)
.offset(offset)
.build()
}
fn fdatasync_entry(&self) -> squeue::Entry {
opcode::Fsync::new(self.fixed_fd)
.flags(types::FsyncFlags::DATASYNC)
.build()
}
fn drain_write_batch(&mut self, chunk: &[OrderedWrite<'_>]) -> io::Result<()> {
let mut seen = [0u64; CQ_BITMAP_WORDS];
let mut first_err: Option<io::Error> = None;
for _ in 0..chunk.len() {
let cqe = self
.ring
.completion()
.next()
.ok_or_else(|| io::Error::other("uring CQE missing"))?;
let user_data = cqe.user_data();
let n = cqe.result();
let Ok(idx) = usize::try_from(user_data) else {
record_err(
&mut first_err,
io::Error::other("uring CQE user_data overflow"),
);
continue;
};
if idx >= chunk.len() {
record_err(
&mut first_err,
io::Error::other("uring CQE user_data out of batch"),
);
continue;
}
if let Err(e) = mark_seen(&mut seen, idx) {
record_err(&mut first_err, e);
continue;
}
if n < 0 {
record_err(&mut first_err, io::Error::from_raw_os_error(-n));
continue;
}
let expected = chunk[idx].buf.len();
if (n as usize) != expected {
record_err(
&mut first_err,
io::Error::other(format!("short uring write: wrote {n} of {expected}")),
);
}
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
fn drain_write_stream_available(
&mut self,
writes: &[OrderedWrite<'_>],
seen: &mut [u64],
first_err: &mut Option<io::Error>,
in_flight: &mut usize,
completed: &mut usize,
) -> io::Result<usize> {
let mut drained = 0usize;
while let Some(cqe) = self.ring.completion().next() {
drained += 1;
*completed += 1;
*in_flight = in_flight
.checked_sub(1)
.ok_or_else(|| io::Error::other("uring CQE without matching in-flight write"))?;
let user_data = cqe.user_data();
let n = cqe.result();
let Ok(idx) = usize::try_from(user_data) else {
record_err(first_err, io::Error::other("uring CQE user_data overflow"));
continue;
};
if idx >= writes.len() {
record_err(
first_err,
io::Error::other("uring CQE user_data out of stream"),
);
continue;
}
if let Err(e) = mark_seen_dynamic(seen, idx) {
record_err(first_err, e);
continue;
}
if n < 0 {
record_err(first_err, io::Error::from_raw_os_error(-n));
continue;
}
let expected = writes[idx].buf.len();
if (n as usize) != expected {
record_err(
first_err,
io::Error::other(format!("short uring write: wrote {n} of {expected}")),
);
}
}
Ok(drained)
}
}
impl Drop for UringContext {
fn drop(&mut self) {
if self.fixed_buffers {
let _ = self.ring.submitter().unregister_buffers();
}
let _ = self.ring.submitter().unregister_files();
}
}
fn build_ring() -> io::Result<IoUring> {
let mut builder = IoUring::builder();
builder.setup_cqsize(RING_DEPTH * 2).setup_clamp();
builder.build(RING_DEPTH)
}
fn ordered_writes<'a>(writes: &'a [(u64, &'a AlignedBlobBuf)]) -> Vec<OrderedWrite<'a>> {
let mut ordered: Vec<_> = writes
.iter()
.enumerate()
.map(|(order, (offset, buf))| OrderedWrite {
offset: *offset,
buf: *buf,
order,
})
.collect();
ordered.sort_by(|a, b| a.offset.cmp(&b.offset).then(a.order.cmp(&b.order)));
let mut deduped = Vec::with_capacity(ordered.len());
let mut idx = 0usize;
while idx < ordered.len() {
let offset = ordered[idx].offset;
let mut last = ordered[idx];
idx += 1;
while idx < ordered.len() && ordered[idx].offset == offset {
last = ordered[idx];
idx += 1;
}
deduped.push(last);
}
deduped
}
fn check_fsync_result(n: i32) -> io::Result<()> {
if n < 0 {
return Err(io::Error::from_raw_os_error(-n));
}
if n != 0 {
return Err(io::Error::other(format!(
"unexpected uring fdatasync result: {n}",
)));
}
Ok(())
}
fn record_err(slot: &mut Option<io::Error>, err: io::Error) {
if slot.is_none() {
*slot = Some(err);
}
}
fn mark_seen(seen: &mut [u64; CQ_BITMAP_WORDS], idx: usize) -> io::Result<()> {
let word = idx / 64;
let bit = 1u64 << (idx % 64);
if seen[word] & bit != 0 {
return Err(io::Error::other("duplicate uring CQE user_data"));
}
seen[word] |= bit;
Ok(())
}
fn mark_seen_dynamic(seen: &mut [u64], idx: usize) -> io::Result<()> {
let word = idx / 64;
let bit = 1u64 << (idx % 64);
let Some(seen_word) = seen.get_mut(word) else {
return Err(io::Error::other("uring CQE user_data out of bitmap"));
};
if *seen_word & bit != 0 {
return Err(io::Error::other("duplicate uring CQE user_data"));
}
*seen_word |= bit;
Ok(())
}