#![allow(dead_code)]
use std::fmt;
#[cfg(feature = "unstable")]
use std::mem::MaybeUninit;
use std::sync::atomic;
use crate::runtime::io_uring::sys;
use crate::runtime::io_uring::util::{unsync_load, Mmap};
pub(crate) struct Inner {
head: *const atomic::AtomicU32,
tail: *const atomic::AtomicU32,
ring_mask: u32,
ring_entries: u32,
overflow: *const atomic::AtomicU32,
cqes: *const sys::io_uring_cqe,
#[allow(dead_code)]
flags: *const atomic::AtomicU32,
}
pub struct CompletionQueue<'a> {
head: u32,
tail: u32,
queue: &'a Inner,
}
#[repr(transparent)]
#[derive(Clone)]
pub struct Entry(pub(crate) sys::io_uring_cqe);
impl Inner {
#[rustfmt::skip]
pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
let head = cq_mmap.offset(p.cq_off.head ) as *const atomic::AtomicU32;
let tail = cq_mmap.offset(p.cq_off.tail ) as *const atomic::AtomicU32;
let ring_mask = cq_mmap.offset(p.cq_off.ring_mask ).cast::<u32>().read();
let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
let overflow = cq_mmap.offset(p.cq_off.overflow ) as *const atomic::AtomicU32;
let cqes = cq_mmap.offset(p.cq_off.cqes ) as *const sys::io_uring_cqe;
let flags = cq_mmap.offset(p.cq_off.flags ) as *const atomic::AtomicU32;
Self {
head,
tail,
ring_mask,
ring_entries,
overflow,
cqes,
flags,
}
}
#[inline]
pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_> {
CompletionQueue {
head: unsync_load(self.head),
tail: (*self.tail).load(atomic::Ordering::Acquire),
queue: self,
}
}
#[inline]
pub(crate) fn borrow(&mut self) -> CompletionQueue<'_> {
unsafe { self.borrow_shared() }
}
}
impl CompletionQueue<'_> {
#[inline]
pub fn sync(&mut self) {
unsafe {
(*self.queue.head).store(self.head, atomic::Ordering::Release);
self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
}
}
pub fn overflow(&self) -> u32 {
unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
}
#[cfg(feature = "unstable")]
pub fn eventfd_disabled(&self) -> bool {
unsafe {
(*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_CQ_EVENTFD_DISABLED
!= 0
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.queue.ring_entries as usize
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
#[cfg(feature = "unstable")]
#[inline]
pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<Entry>]) -> &'a mut [Entry] {
let len = std::cmp::min(self.len(), entries.len());
for entry in &mut entries[..len] {
*entry = MaybeUninit::new(Entry(unsafe {
*self
.queue
.cqes
.add((self.head & self.queue.ring_mask) as usize)
}));
self.head = self.head.wrapping_add(1);
}
unsafe { std::slice::from_raw_parts_mut(entries as *mut _ as *mut Entry, len) }
}
}
impl Drop for CompletionQueue<'_> {
#[inline]
fn drop(&mut self) {
unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
}
}
impl Iterator for CompletionQueue<'_> {
type Item = Entry;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.head != self.tail {
let entry = unsafe {
*self
.queue
.cqes
.add((self.head & self.queue.ring_mask) as usize)
};
self.head = self.head.wrapping_add(1);
Some(Entry(entry))
} else {
None
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len(), Some(self.len()))
}
}
impl ExactSizeIterator for CompletionQueue<'_> {
#[inline]
fn len(&self) -> usize {
self.tail.wrapping_sub(self.head) as usize
}
}
impl Entry {
#[inline]
pub fn result(&self) -> i32 {
self.0.res
}
#[inline]
pub fn user_data(&self) -> u64 {
self.0.user_data
}
#[inline]
pub fn flags(&self) -> u32 {
self.0.flags
}
}
impl fmt::Debug for Entry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Entry")
.field("result", &self.0.res)
.field("user_data", &self.0.user_data)
.field("flags", &self.0.flags)
.finish()
}
}
#[cfg(feature = "unstable")]
pub fn buffer_select(flags: u32) -> Option<u16> {
if flags & sys::IORING_CQE_F_BUFFER != 0 {
let id = flags >> sys::IORING_CQE_BUFFER_SHIFT;
Some(id as u16)
} else {
None
}
}