use crate::error::Error;
use crate::op::Sqe;
use crate::syscall;
use crate::types::{
CqeFlags, EnterFlags, Features, IoUringCqe, IoUringParams, IoUringSqe, IoVec, MapFlags, Prot,
RegisterOp, RingOffset, SetupFlags,
};
use core::sync::atomic::{AtomicU32, Ordering};
#[derive(Debug, Clone, Copy)]
pub struct Completion {
pub user_data: u64,
pub result: i32,
pub flags: CqeFlags,
}
impl Completion {
#[allow(clippy::cast_sign_loss)]
pub const fn into_result(&self) -> Result<u32, Error> {
if self.result < 0 {
Err(Error(-self.result))
} else {
Ok(self.result as u32)
}
}
#[must_use]
pub const fn is_err(&self) -> bool {
self.result < 0
}
}
struct MappedRegion {
addr: usize,
len: usize,
}
impl MappedRegion {
const fn new(addr: usize, len: usize) -> Self {
Self { addr, len }
}
}
struct SetupGuard {
fd: usize,
sq_ring: MappedRegion,
cq_ring: MappedRegion,
sqes: MappedRegion,
}
impl SetupGuard {
const fn new(fd: usize) -> Self {
Self {
fd,
sq_ring: MappedRegion { addr: 0, len: 0 },
cq_ring: MappedRegion { addr: 0, len: 0 },
sqes: MappedRegion { addr: 0, len: 0 },
}
}
const fn disarm(self) {
core::mem::forget(self);
}
}
impl Drop for SetupGuard {
fn drop(&mut self) {
if self.sqes.len > 0 {
let _ = syscall::munmap(self.sqes.addr, self.sqes.len);
}
if self.cq_ring.len > 0 {
let _ = syscall::munmap(self.cq_ring.addr, self.cq_ring.len);
}
if self.sq_ring.len > 0 {
let _ = syscall::munmap(self.sq_ring.addr, self.sq_ring.len);
}
let _ = syscall::close(self.fd);
}
}
pub struct IoUring {
fd: usize,
sq_head: *const AtomicU32,
sq_tail: *const AtomicU32,
sq_mask: u32,
sq_flags: *const AtomicU32,
sqes: *mut IoUringSqe,
sq_tail_local: u32,
sq_submitted: u32,
cq_head: *const AtomicU32,
cq_tail: *const AtomicU32,
cq_mask: u32,
cqes: *const IoUringCqe,
cq_head_local: u32,
features: Features,
sq_ring: MappedRegion,
cq_ring: MappedRegion,
sqes_region: MappedRegion,
}
impl IoUring {
pub fn new(entries: u32) -> Result<Self, Error> {
IoUringBuilder::new(entries).build()
}
#[must_use]
pub fn builder(entries: u32) -> IoUringBuilder {
IoUringBuilder::new(entries)
}
#[must_use]
pub const fn features(&self) -> Features {
self.features
}
#[must_use]
pub fn cq_overflow(&self) -> bool {
const IORING_SQ_CQ_OVERFLOW: u32 = 1 << 1;
let flags = unsafe { &*self.sq_flags }.load(Ordering::Acquire);
flags & IORING_SQ_CQ_OVERFLOW != 0
}
#[must_use]
pub fn sq_need_wakeup(&self) -> bool {
const IORING_SQ_NEED_WAKEUP: u32 = 1 << 0;
let flags = unsafe { &*self.sq_flags }.load(Ordering::Acquire);
flags & IORING_SQ_NEED_WAKEUP != 0
}
#[allow(clippy::cast_ptr_alignment)]
fn from_params(entries: u32, params: &mut IoUringParams) -> Result<Self, Error> {
let prot = Prot::READ | Prot::WRITE;
let map = MapFlags::SHARED | MapFlags::POPULATE;
let fd = syscall::io_uring_setup(entries, &raw mut *params)?;
let mut guard = SetupGuard::new(fd);
let features = Features::from_raw(params.features);
let single_mmap = features.contains(Features::SINGLE_MMAP);
let sq_ring_sz =
params.sq_off.array as usize + params.sq_entries as usize * core::mem::size_of::<u32>();
let cq_ring_sz = params.cq_off.cqes as usize
+ params.cq_entries as usize * core::mem::size_of::<IoUringCqe>();
let mmap_sz = if single_mmap {
sq_ring_sz.max(cq_ring_sz)
} else {
sq_ring_sz
};
let sq_ring_ptr = syscall::mmap(0, mmap_sz, prot, map, fd, RingOffset::SqRing.into())?;
guard.sq_ring = MappedRegion::new(sq_ring_ptr, mmap_sz);
let (cq_ring_ptr, cq_ring_region) = if single_mmap {
(sq_ring_ptr, MappedRegion::new(0, 0))
} else {
let ptr = syscall::mmap(0, cq_ring_sz, prot, map, fd, RingOffset::CqRing.into())?;
let region = MappedRegion::new(ptr, cq_ring_sz);
guard.cq_ring = MappedRegion::new(ptr, cq_ring_sz);
(ptr, region)
};
let sqes_sz = params.sq_entries as usize * core::mem::size_of::<IoUringSqe>();
let sqes_ptr = syscall::mmap(0, sqes_sz, prot, map, fd, RingOffset::Sqes.into())?;
guard.sqes = MappedRegion::new(sqes_ptr, sqes_sz);
let sq_base = sq_ring_ptr as *const u8;
let sq_head = unsafe { sq_base.add(params.sq_off.head as usize) }.cast::<AtomicU32>();
let sq_tail = unsafe { sq_base.add(params.sq_off.tail as usize) }.cast::<AtomicU32>();
let sq_mask = unsafe { *sq_base.add(params.sq_off.ring_mask as usize).cast::<u32>() };
let sq_flags = unsafe { sq_base.add(params.sq_off.flags as usize) }.cast::<AtomicU32>();
let sq_array = unsafe { sq_base.add(params.sq_off.array as usize) } as *mut u32;
debug_assert!(sq_head.is_aligned(), "sq_head not aligned");
debug_assert!(sq_tail.is_aligned(), "sq_tail not aligned");
debug_assert!(sq_flags.is_aligned(), "sq_flags not aligned");
debug_assert!(sq_array.is_aligned(), "sq_array not aligned");
for i in 0..params.sq_entries {
unsafe { sq_array.add(i as usize).write(i) };
}
let cq_base = cq_ring_ptr as *const u8;
let cq_head = unsafe { cq_base.add(params.cq_off.head as usize) }.cast::<AtomicU32>();
let cq_tail = unsafe { cq_base.add(params.cq_off.tail as usize) }.cast::<AtomicU32>();
let cq_mask = unsafe { *cq_base.add(params.cq_off.ring_mask as usize).cast::<u32>() };
let cqes = unsafe { cq_base.add(params.cq_off.cqes as usize) }.cast::<IoUringCqe>();
debug_assert!(cq_head.is_aligned(), "cq_head not aligned");
debug_assert!(cq_tail.is_aligned(), "cq_tail not aligned");
debug_assert!(cqes.is_aligned(), "cqes not aligned");
let sq_tail_local = unsafe { &*sq_tail }.load(Ordering::Acquire);
let cq_head_local = unsafe { &*cq_head }.load(Ordering::Acquire);
guard.disarm();
Ok(Self {
fd,
sq_head,
sq_tail,
sq_mask,
sq_flags,
sqes: sqes_ptr as *mut IoUringSqe,
sq_tail_local,
sq_submitted: sq_tail_local,
cq_head,
cq_tail,
cq_mask,
cqes,
cq_head_local,
features,
sq_ring: MappedRegion::new(sq_ring_ptr, mmap_sz),
cq_ring: cq_ring_region,
sqes_region: MappedRegion::new(sqes_ptr, sqes_sz),
})
}
#[inline]
#[allow(clippy::needless_pass_by_value)]
pub fn push(&mut self, sqe: Sqe) -> Result<(), Error> {
let head = unsafe { &*self.sq_head }.load(Ordering::Acquire);
let next_tail = self.sq_tail_local.wrapping_add(1);
if next_tail.wrapping_sub(head) > self.sq_mask + 1 {
return Err(Error::EAGAIN);
}
let idx = self.sq_tail_local & self.sq_mask;
unsafe { *self.sqes.add(idx as usize) = sqe.0 };
self.sq_tail_local = next_tail;
Ok(())
}
pub fn push_nop(&mut self, user_data: u64) -> Result<(), Error> {
self.push(Sqe::nop().user_data(user_data))
}
#[inline]
pub fn flush_sq_tail(&self) {
unsafe { &*self.sq_tail }.store(self.sq_tail_local, Ordering::Release);
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn submit(&mut self) -> Result<u32, Error> {
self.flush_cq_head();
let to_submit = self.sq_tail_local.wrapping_sub(self.sq_submitted);
self.flush_sq_tail();
if to_submit == 0 {
return Ok(0);
}
let ret = syscall::io_uring_enter(self.fd, to_submit, 0, EnterFlags::default())?;
self.sq_submitted = self.sq_tail_local;
Ok(ret as u32)
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn submit_and_wait(&mut self, min_complete: u32) -> Result<u32, Error> {
self.flush_cq_head();
let to_submit = self.sq_tail_local.wrapping_sub(self.sq_submitted);
self.flush_sq_tail();
let ret = syscall::io_uring_enter(self.fd, to_submit, min_complete, EnterFlags::GETEVENTS)?;
self.sq_submitted = self.sq_tail_local;
Ok(ret as u32)
}
#[inline]
pub fn submit_sqpoll(&mut self) -> Result<(), Error> {
self.flush_cq_head();
self.flush_sq_tail();
self.sq_submitted = self.sq_tail_local;
if self.sq_need_wakeup() {
syscall::io_uring_enter(self.fd, 0, 0, EnterFlags::SQ_WAKEUP)?;
}
Ok(())
}
#[inline]
fn flush_cq_head(&self) {
unsafe { &*self.cq_head }.store(self.cq_head_local, Ordering::Release);
}
#[inline]
#[must_use]
pub fn complete(&mut self) -> Option<Completion> {
let tail = unsafe { &*self.cq_tail }.load(Ordering::Acquire);
if self.cq_head_local == tail {
return None;
}
let idx = self.cq_head_local & self.cq_mask;
let cqe = unsafe { &*self.cqes.add(idx as usize) };
let completion = Completion {
user_data: cqe.user_data,
result: cqe.res,
flags: CqeFlags::from_raw(cqe.flags),
};
self.cq_head_local = self.cq_head_local.wrapping_add(1);
Some(completion)
}
#[inline]
pub fn sync_cq(&self) {
self.flush_cq_head();
}
#[allow(clippy::cast_possible_truncation)]
pub fn register_buffers(&mut self, bufs: &[IoVec]) -> Result<(), Error> {
syscall::io_uring_register(
self.fd,
RegisterOp::RegisterBuffers.into(),
bufs.as_ptr() as usize,
bufs.len() as u32,
)?;
Ok(())
}
pub fn unregister_buffers(&mut self) -> Result<(), Error> {
syscall::io_uring_register(self.fd, RegisterOp::UnregisterBuffers.into(), 0, 0)?;
Ok(())
}
#[allow(clippy::cast_possible_truncation)]
pub fn register_files(&mut self, fds: &[i32]) -> Result<(), Error> {
syscall::io_uring_register(
self.fd,
RegisterOp::RegisterFiles.into(),
fds.as_ptr() as usize,
fds.len() as u32,
)?;
Ok(())
}
pub fn unregister_files(&mut self) -> Result<(), Error> {
syscall::io_uring_register(self.fd, RegisterOp::UnregisterFiles.into(), 0, 0)?;
Ok(())
}
pub const fn completions(&mut self) -> Completions<'_> {
Completions { ring: self }
}
}
pub struct Completions<'a> {
ring: &'a mut IoUring,
}
impl Iterator for Completions<'_> {
type Item = Completion;
fn next(&mut self) -> Option<Self::Item> {
self.ring.complete()
}
fn size_hint(&self) -> (usize, Option<usize>) {
let tail = unsafe { &*self.ring.cq_tail }.load(Ordering::Acquire);
let pending = tail.wrapping_sub(self.ring.cq_head_local) as usize;
(pending, None)
}
}
impl Drop for IoUring {
fn drop(&mut self) {
self.flush_cq_head();
let _ = syscall::munmap(self.sqes_region.addr, self.sqes_region.len);
if self.cq_ring.len > 0 {
let _ = syscall::munmap(self.cq_ring.addr, self.cq_ring.len);
}
let _ = syscall::munmap(self.sq_ring.addr, self.sq_ring.len);
let _ = syscall::close(self.fd);
}
}
pub struct IoUringBuilder {
entries: u32,
params: IoUringParams,
}
impl IoUringBuilder {
#[must_use]
pub fn new(entries: u32) -> Self {
assert!(entries > 0, "io_uring entries must be > 0");
Self {
entries,
params: IoUringParams::default(),
}
}
#[must_use]
pub const fn sqpoll(mut self, idle_ms: u32) -> Self {
self.params.flags |= SetupFlags::SQPOLL.bits();
self.params.sq_thread_idle = idle_ms;
self
}
#[must_use]
pub const fn sqpoll_cpu(mut self, cpu: u32) -> Self {
self.params.flags |= SetupFlags::SQPOLL.bits() | SetupFlags::SQ_AFF.bits();
self.params.sq_thread_cpu = cpu;
self
}
#[must_use]
pub const fn cq_entries(mut self, n: u32) -> Self {
self.params.flags |= SetupFlags::CQSIZE.bits();
self.params.cq_entries = n;
self
}
#[must_use]
pub const fn clamp(mut self) -> Self {
self.params.flags |= SetupFlags::CLAMP.bits();
self
}
#[must_use]
pub const fn single_issuer(mut self) -> Self {
self.params.flags |= SetupFlags::SINGLE_ISSUER.bits();
self
}
#[must_use]
pub const fn attach_wq(mut self, wq_fd: u32) -> Self {
self.params.flags |= SetupFlags::ATTACH_WQ.bits();
self.params.wq_fd = wq_fd;
self
}
#[must_use]
pub const fn setup_flags(mut self, flags: SetupFlags) -> Self {
self.params.flags |= flags.bits();
self
}
pub fn build(mut self) -> Result<IoUring, Error> {
IoUring::from_params(self.entries, &mut self.params)
}
}