#![allow(unsafe_code)]
use std::slice::from_raw_parts_mut;
use super::*;
#[derive(Debug)]
pub(crate) struct Sq {
khead: *mut AtomicU32,
ktail: *mut AtomicU32,
kring_mask: *mut u32,
kflags: *mut AtomicU32,
kdropped: *mut AtomicU32,
array: &'static mut [AtomicU32],
sqes: &'static mut [io_uring_sqe],
sqe_head: u32,
sqe_tail: u32,
ring_ptr: *const libc::c_void,
ring_mmap_sz: usize,
sqes_mmap_sz: usize,
}
impl Drop for Sq {
#[allow(unsafe_code)]
fn drop(&mut self) {
unsafe {
libc::munmap(
self.sqes.as_ptr() as *mut libc::c_void,
self.sqes_mmap_sz,
);
}
unsafe {
libc::munmap(
self.ring_ptr as *mut libc::c_void,
self.ring_mmap_sz,
);
}
}
}
impl Sq {
pub(crate) fn new(
params: &io_uring_params,
ring_fd: i32,
) -> io::Result<Sq> {
let sq_ring_mmap_sz = params.sq_off.array as usize
+ (params.sq_entries as usize
* std::mem::size_of::<u32>());
let sq_ring_ptr = uring_mmap(
sq_ring_mmap_sz,
ring_fd,
IORING_OFF_SQ_RING,
)?;
let sqes_mmap_sz: usize = params.sq_entries
as usize
* std::mem::size_of::<io_uring_sqe>();
let sqes_ptr: *mut io_uring_sqe = uring_mmap(
sqes_mmap_sz,
ring_fd,
IORING_OFF_SQES,
)? as _;
Ok(unsafe {
Sq {
sqe_head: 0,
sqe_tail: 0,
ring_ptr: sq_ring_ptr,
ring_mmap_sz: sq_ring_mmap_sz,
sqes_mmap_sz,
khead: sq_ring_ptr
.add(params.sq_off.head as usize)
as *mut AtomicU32,
ktail: sq_ring_ptr
.add(params.sq_off.tail as usize)
as *mut AtomicU32,
kring_mask: sq_ring_ptr
.add(params.sq_off.ring_mask as usize)
as *mut u32,
kflags: sq_ring_ptr
.add(params.sq_off.flags as usize)
as *mut AtomicU32,
kdropped: sq_ring_ptr
.add(params.sq_off.dropped as usize)
as *mut AtomicU32,
array: from_raw_parts_mut(
sq_ring_ptr
.add(params.sq_off.array as usize)
as _,
params.sq_entries as usize,
),
sqes: from_raw_parts_mut(
sqes_ptr,
params.sq_entries as usize,
),
}
})
}
pub(crate) fn try_get_sqe(
&mut self,
ring_flags: u32,
) -> Option<&mut io_uring_sqe> {
let next = self.sqe_tail + 1;
let head =
if (ring_flags & IORING_SETUP_SQPOLL) == 0 {
self.sqe_head
} else {
unsafe { &*self.khead }.load(Acquire)
};
if next - head <= self.sqes.len() as u32 {
let idx =
self.sqe_tail & unsafe { *self.kring_mask };
let sqe = &mut self.sqes[idx as usize];
self.sqe_tail = next;
Some(sqe)
} else {
None
}
}
fn flush(&mut self) -> u32 {
let mask: u32 = unsafe { *self.kring_mask };
let to_submit = self.sqe_tail - self.sqe_head;
let mut ktail =
unsafe { &*self.ktail }.load(Acquire);
for _ in 0..to_submit {
let index = ktail & mask;
self.array[index as usize]
.store(self.sqe_head & mask, Release);
ktail += 1;
self.sqe_head += 1;
}
let swapped =
unsafe { &*self.ktail }.swap(ktail, Release);
assert_eq!(swapped, ktail - to_submit);
to_submit
}
pub(crate) fn submit_all(
&mut self,
ring_flags: u32,
ring_fd: i32,
) -> u64 {
let submitted = if ring_flags & IORING_SETUP_SQPOLL
== 0
{
let flags = IORING_ENTER_GETEVENTS;
let flushed = self.flush();
let mut to_submit = flushed;
while to_submit > 0 {
let _ = Measure::new(&M.enter_sqe);
let ret = enter(
ring_fd,
to_submit,
0,
flags,
std::ptr::null_mut(),
)
.expect(
"Failed to submit items to kernel via \
io_uring. This should never fail.",
);
to_submit -= u32::try_from(ret).unwrap();
}
flushed
} else if unsafe { &*self.kflags }.load(Acquire)
& IORING_SQ_NEED_WAKEUP
!= 0
{
let to_submit = self.sqe_tail - self.sqe_head;
let _ = Measure::new(&M.enter_sqe);
enter(
ring_fd,
to_submit,
0,
IORING_ENTER_SQ_WAKEUP,
std::ptr::null_mut(),
)
.expect(
"Failed to wake up SQPOLL io_uring \
kernel thread. This should never fail.",
);
0
} else {
0
};
assert_eq!(
unsafe { &*self.kdropped }.load(Relaxed),
0
);
u64::from(submitted)
}
}