use core::{
mem,
ptr,
sync::atomic::{
AtomicU32,
Ordering,
},
};
use lx::AsRawFd;
pub(crate) trait SubmissionQueueMapping {
fn map(&self, ring_idx: u32) -> u32;
}
pub(crate) struct IdentityMapping;
impl SubmissionQueueMapping for IdentityMapping {
fn map(&self, ring_idx: u32) -> u32 {
ring_idx
}
}
#[derive(Debug)]
pub(crate) struct QueueFullError;
pub(crate) struct Instance {
fd: lx::OwnedFd,
_shared_mmap: lx::Mmap,
sq_mmap: lx::Mmap,
sq_head: *const AtomicU32,
sq_head_cache: u32,
sq_tail: *const AtomicU32,
sq_tail_cache: u32,
sq_mask: u32,
sq_capacity: u32,
sq_array: *mut u32,
cq_head: *const AtomicU32,
cq_head_cache: u32,
cq_tail: *const AtomicU32,
cq_tail_cache: u32,
cq_mask: u32,
cq_capacity: u32,
cq_entries: *const lx::io_uring_cqe,
}
impl Instance {
pub(crate) fn new(entries: u32) -> lx::Result<Self> {
let mut params: lx::io_uring_params = unsafe { mem::zeroed() };
params.flags = lx::IORING_SETUP_SINGLE_ISSUER | lx::IORING_SETUP_DEFER_TASKRUN;
let fd = lx::io_uring_setup(entries, &mut params)?;
if params.features & lx::IORING_FEAT_SINGLE_MMAP == 0 {
return Err(lx::Error::from_code(lx::EINVAL));
}
let cq_size = usize::try_from(params.cq_off.cqes).unwrap()
+ mem::size_of::<lx::io_uring_cqe>() * usize::try_from(params.cq_entries).unwrap();
let sq_size = usize::try_from(params.sq_off.array).unwrap()
+ mem::size_of::<u32>() * usize::try_from(params.sq_entries).unwrap();
let size = cq_size.max(sq_size);
let shared_mmap = unsafe {
lx::mmap_file(
ptr::null_mut(),
size,
lx::PROT_READ | lx::PROT_WRITE,
lx::MAP_SHARED | lx::MAP_POPULATE,
&fd,
lx::IORING_OFF_SQ_RING,
)
}?;
let sq_head = unsafe {
shared_mmap
.as_ref()
.as_ptr()
.offset(isize::try_from(params.sq_off.head).unwrap())
as *const AtomicU32
};
let sq_tail = unsafe {
shared_mmap
.as_ref()
.as_ptr()
.offset(isize::try_from(params.sq_off.tail).unwrap())
as *const AtomicU32
};
let sq_array = unsafe {
shared_mmap
.as_ref()
.as_ptr()
.offset(isize::try_from(params.sq_off.array).unwrap()) as *mut u32
};
let cq_head = unsafe {
shared_mmap
.as_ref()
.as_ptr()
.offset(isize::try_from(params.cq_off.head).unwrap())
as *const AtomicU32
};
let cq_tail = unsafe {
shared_mmap
.as_ref()
.as_ptr()
.offset(isize::try_from(params.cq_off.tail).unwrap())
as *const AtomicU32
};
let cq_entries = unsafe {
shared_mmap
.as_ref()
.as_ptr()
.offset(isize::try_from(params.cq_off.cqes).unwrap())
as *const lx::io_uring_cqe
};
let sq_size =
mem::size_of::<lx::io_uring_sqe>() * usize::try_from(params.sq_entries).unwrap();
let sq_mmap = unsafe {
lx::mmap_file(
ptr::null_mut(),
sq_size,
lx::PROT_READ | lx::PROT_WRITE,
lx::MAP_SHARED | lx::MAP_POPULATE,
&fd,
lx::IORING_OFF_SQES,
)?
};
Ok(Self {
fd,
_shared_mmap: shared_mmap,
sq_mmap,
sq_head,
sq_head_cache: 0,
sq_tail,
sq_tail_cache: 0,
sq_mask: params.sq_entries - 1,
sq_capacity: params.sq_entries,
sq_array,
cq_head,
cq_head_cache: 0,
cq_tail,
cq_tail_cache: 0,
cq_mask: params.cq_entries - 1,
cq_capacity: params.cq_entries,
cq_entries,
})
}
#[inline]
pub(crate) fn start_push(&mut self) {
self.sq_head_cache = unsafe { (*self.sq_head).load(Ordering::Acquire) };
}
pub(crate) unsafe fn try_push(
&mut self,
mapping: &impl SubmissionQueueMapping,
) -> Result<&mut lx::io_uring_sqe, QueueFullError> {
if self.sq_tail_cache == self.sq_head_cache + self.sq_capacity {
return Err(QueueFullError);
}
let mask = self.sq_mask;
let index = usize::try_from(mapping.map(self.sq_tail_cache & mask)).unwrap();
let entry = unsafe { &mut *self.sq_entries_mut().get_unchecked_mut(index) };
self.sq_tail_cache += 1;
Ok(entry)
}
#[inline]
pub(crate) fn end_push(&mut self) {
unsafe { (*self.sq_tail).store(self.sq_tail_cache, Ordering::Release) };
}
pub(crate) unsafe fn set_sq_mapping(&mut self, mapping: &impl SubmissionQueueMapping) {
for (ring_idx, actual_idx) in (*self.sq_array_mut()).iter_mut().enumerate() {
*actual_idx = mapping.map(u32::try_from(ring_idx).unwrap());
}
}
#[inline]
fn sq_array_mut(&mut self) -> *mut [u32] {
ptr::slice_from_raw_parts_mut(self.sq_array, usize::try_from(self.sq_capacity).unwrap())
}
#[inline]
unsafe fn sq_entries_mut(&mut self) -> *mut [lx::io_uring_sqe] {
ptr::slice_from_raw_parts_mut(
self.sq_mmap.as_mut().as_mut_ptr() as *mut lx::io_uring_sqe,
usize::try_from(self.sq_capacity).unwrap(),
)
}
#[inline]
pub(crate) fn get_events(&mut self, to_submit: u32, min_complete: u32) -> lx::Result<i32> {
lx::io_uring_enter_getevents(self.fd.as_raw_fd() as u32, to_submit, min_complete)
}
#[inline]
pub(crate) fn start_pop(&mut self) {
self.cq_tail_cache = unsafe { (*self.cq_tail).load(Ordering::Acquire) };
}
pub(crate) fn pop(&mut self) -> Option<&lx::io_uring_cqe> {
if self.cq_head_cache == self.cq_tail_cache {
return None;
}
let index = usize::try_from(self.cq_head_cache & self.cq_mask).unwrap();
let entry = unsafe { &*self.cq_entries().get_unchecked(index) };
self.cq_head_cache += 1;
Some(entry)
}
#[inline]
pub(crate) fn end_pop(&mut self) {
unsafe { (*self.cq_head).store(self.cq_head_cache, Ordering::Release) };
}
#[inline]
fn cq_entries(&self) -> *const [lx::io_uring_cqe] {
ptr::slice_from_raw_parts(self.cq_entries, usize::try_from(self.cq_capacity).unwrap())
}
}
impl lx::AsRawFd for Instance {
#[inline]
fn as_raw_fd(&self) -> lx::RawFd {
self.fd.as_raw_fd()
}
}