use std::os::fd::RawFd;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use std::{fmt, io, ptr};
use crate::io_uring::{Shared, libc, load_kernel_shared, mmap, munmap, op};
use crate::{asan, debug_detail, lock};
#[derive(Debug)]
pub(crate) struct Completions {
ring: ptr::NonNull<libc::c_void>,
ring_len: u32,
entries_head: ptr::NonNull<AtomicU32>,
entries_tail: ptr::NonNull<AtomicU32>,
entries: ptr::NonNull<Completion>,
entries_len: u32,
}
impl Completions {
pub(crate) fn new(rfd: RawFd, parameters: &libc::io_uring_params) -> io::Result<Completions> {
let entries_len = parameters.cq_entries * (size_of::<Completion>() as u32);
let ring_len = parameters.cq_off.cqes + entries_len;
let ring = mmap(
ring_len as usize,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_POPULATE,
rfd,
libc::off_t::from(libc::IORING_OFF_CQ_RING),
)?;
let entries = unsafe { ring.add(parameters.cq_off.cqes as usize).cast() };
asan::poison_region(entries.cast().as_ptr(), entries_len as usize);
Ok(Completions {
ring,
ring_len,
entries_head: unsafe { ring.add(parameters.cq_off.head as usize).cast() },
entries_tail: unsafe { ring.add(parameters.cq_off.tail as usize).cast() },
entries,
entries_len: parameters.cq_entries,
})
}
pub(crate) fn poll(&mut self, shared: &Shared, timeout: Option<Duration>) -> io::Result<()> {
let mut head = load_kernel_shared(self.entries_head);
let mut tail = load_kernel_shared(self.entries_tail);
if head >= tail {
log::trace!(timeout:?; "waiting for completion events");
shared.is_polling.store(true, Ordering::Release);
let result = shared.enter(1, libc::IORING_ENTER_GETEVENTS, timeout);
shared.is_polling.store(false, Ordering::Release);
result?;
tail = load_kernel_shared(self.entries_tail);
}
debug_assert!(tail >= head);
while head < tail {
let index = (head & (self.entries_len - 1)) as usize;
let ptr = unsafe { self.entries.add(index).as_ptr() };
asan::unpoison_region(ptr.cast(), size_of::<Completion>());
let completion = unsafe { &*ptr };
log::trace!(completion:?, index, head; "dequeued completion");
unsafe { completion.process() };
asan::poison_region(ptr.cast(), size_of::<Completion>());
head += 1;
}
unsafe { (&*self.entries_head.as_ptr()).store(head, Ordering::Release) };
Ok(())
}
}
unsafe impl Send for Completions {}
unsafe impl Sync for Completions {}
impl Drop for Completions {
fn drop(&mut self) {
let entries_len = (self.entries_len as usize) * size_of::<Completion>();
asan::poison_region(self.entries.as_ptr().cast(), entries_len);
let ptr = self.ring;
let len = self.ring_len as usize;
if let Err(err) = munmap(ptr, len) {
log::warn!(ptr:?, len; "error unmapping io_uring submission ring: {err}");
}
}
}
#[repr(transparent)]
pub(crate) struct Completion(pub(super) libc::io_uring_cqe);
pub(super) const SINGLESHOT_TAG: usize = 0b00;
pub(super) const MULTISHOT_TAG: usize = 0b01;
const TAG_MASK: usize = !MULTISHOT_TAG;
const NO_PROCESS: usize = 0;
pub(super) const WAKE_USER_DATA: u64 = NO_PROCESS as u64;
impl Completion {
unsafe fn process(&self) {
if self.0.flags & libc::IORING_CQE_F_SKIP != 0 {
return;
}
let user_data = self.0.user_data as usize;
if user_data == NO_PROCESS {
return;
}
let ptr: *const () = ptr::with_exposed_provenance(user_data);
let is_multishot = ptr.addr() & MULTISHOT_TAG == 0;
let ptr = ptr.map_addr(|addr| addr & TAG_MASK);
let update = if is_multishot {
const _ALIGNMENT_CHECK: () = assert!(align_of::<op::SingleShared>() > 1);
let head: &op::SingleShared = unsafe { &*ptr.cast() };
lock(head).update(self)
} else {
const _ALIGNMENT_CHECK: () = assert!(align_of::<op::MultiShared>() > 1);
let head: &op::MultiShared = unsafe { &*ptr.cast() };
lock(head).update(self)
};
match update {
op::StatusUpdate::Ok => { }
op::StatusUpdate::Wake(waker) => {
log::trace!(waker:?; "waking up future to make progress");
waker.wake();
}
op::StatusUpdate::Drop { drop } => {
log::trace!(ptr:?; "dropping operation state");
unsafe { drop(ptr.cast_mut()) }
}
}
}
pub(super) const fn complete(&self) -> bool {
self.0.flags & libc::IORING_CQE_F_MORE == 0
}
}
impl fmt::Debug for Completion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_detail!(
bitset CompletionFlags(u32),
libc::IORING_CQE_F_BUFFER,
libc::IORING_CQE_F_MORE,
libc::IORING_CQE_F_SOCK_NONEMPTY,
libc::IORING_CQE_F_NOTIF,
libc::IORING_CQE_F_BUF_MORE,
);
f.debug_struct("io_uring::Completion")
.field(
"user_data",
&ptr::with_exposed_provenance::<()>(self.0.user_data as usize),
)
.field("res", &self.0.res)
.field("flags", &CompletionFlags(self.0.flags))
.finish()
}
}