use std::fmt::{self, Debug};
use std::mem;
use std::mem::MaybeUninit;
use std::sync::atomic;
use crate::sys;
use crate::util::{unsync_load, Mmap};
pub(crate) struct Inner<E: EntryMarker> {
head: *const atomic::AtomicU32,
tail: *const atomic::AtomicU32,
ring_mask: u32,
ring_entries: u32,
overflow: *const atomic::AtomicU32,
cqes: *const E,
#[allow(dead_code)]
flags: *const atomic::AtomicU32,
}
pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
head: u32,
tail: u32,
queue: &'a Inner<E>,
}
pub(crate) use private::Sealed;
mod private {
pub trait Sealed {
const ADDITIONAL_FLAGS: u32;
}
}
pub trait EntryMarker: Clone + Debug + Into<Entry> + Sealed {}
#[repr(C)]
pub struct Entry(pub(crate) sys::io_uring_cqe);
#[repr(C)]
#[derive(Clone)]
pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
#[test]
fn test_entry_sizes() {
assert_eq!(mem::size_of::<Entry>(), 16);
assert_eq!(mem::size_of::<Entry32>(), 32);
}
impl<E: EntryMarker> Inner<E> {
#[rustfmt::skip]
pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
let head = cq_mmap.offset(p.cq_off.head ) as *const atomic::AtomicU32;
let tail = cq_mmap.offset(p.cq_off.tail ) as *const atomic::AtomicU32;
let ring_mask = cq_mmap.offset(p.cq_off.ring_mask ).cast::<u32>().read();
let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
let overflow = cq_mmap.offset(p.cq_off.overflow ) as *const atomic::AtomicU32;
let cqes = cq_mmap.offset(p.cq_off.cqes ) as *const E;
let flags = cq_mmap.offset(p.cq_off.flags ) as *const atomic::AtomicU32;
Self {
head,
tail,
ring_mask,
ring_entries,
overflow,
cqes,
flags,
}
}
#[inline]
pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
CompletionQueue {
head: unsync_load(self.head),
tail: (*self.tail).load(atomic::Ordering::Acquire),
queue: self,
}
}
#[inline]
pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
unsafe { self.borrow_shared() }
}
}
impl<E: EntryMarker> CompletionQueue<'_, E> {
#[inline]
pub fn sync(&mut self) {
unsafe {
(*self.queue.head).store(self.head, atomic::Ordering::Release);
self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
}
}
pub fn overflow(&self) -> u32 {
unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
}
pub fn eventfd_disabled(&self) -> bool {
unsafe {
(*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_CQ_EVENTFD_DISABLED
!= 0
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.queue.ring_entries as usize
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
#[inline]
pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
let len = std::cmp::min(self.len(), entries.len());
for entry in &mut entries[..len] {
*entry = MaybeUninit::new(unsafe { self.pop() });
}
unsafe { std::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
}
#[inline]
unsafe fn pop(&mut self) -> E {
let entry = &*self
.queue
.cqes
.add((self.head & self.queue.ring_mask) as usize);
self.head = self.head.wrapping_add(1);
entry.clone()
}
}
impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
#[inline]
fn drop(&mut self) {
unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
}
}
impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
type Item = E;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.head != self.tail {
Some(unsafe { self.pop() })
} else {
None
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len(), Some(self.len()))
}
}
impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
#[inline]
fn len(&self) -> usize {
self.tail.wrapping_sub(self.head) as usize
}
}
impl Entry {
#[inline]
pub fn result(&self) -> i32 {
self.0.res
}
#[inline]
pub fn user_data(&self) -> u64 {
self.0.user_data
}
#[inline]
pub fn flags(&self) -> u32 {
self.0.flags
}
}
impl Sealed for Entry {
const ADDITIONAL_FLAGS: u32 = 0;
}
impl EntryMarker for Entry {}
impl Clone for Entry {
fn clone(&self) -> Entry {
Entry(unsafe { mem::transmute_copy(&self.0) })
}
}
impl Debug for Entry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Entry")
.field("result", &self.result())
.field("user_data", &self.user_data())
.field("flags", &self.flags())
.finish()
}
}
impl Entry32 {
#[inline]
pub fn result(&self) -> i32 {
self.0 .0.res
}
#[inline]
pub fn user_data(&self) -> u64 {
self.0 .0.user_data
}
#[inline]
pub fn flags(&self) -> u32 {
self.0 .0.flags
}
#[inline]
pub fn big_cqe(&self) -> &[u64; 2] {
&self.1
}
}
impl Sealed for Entry32 {
const ADDITIONAL_FLAGS: u32 = sys::IORING_SETUP_CQE32;
}
impl EntryMarker for Entry32 {}
impl From<Entry32> for Entry {
fn from(entry32: Entry32) -> Self {
entry32.0
}
}
impl Debug for Entry32 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Entry32")
.field("result", &self.result())
.field("user_data", &self.user_data())
.field("flags", &self.flags())
.field("big_cqe", &self.big_cqe())
.finish()
}
}
pub fn buffer_select(flags: u32) -> Option<u16> {
if flags & sys::IORING_CQE_F_BUFFER != 0 {
let id = flags >> sys::IORING_CQE_BUFFER_SHIFT;
Some(id as u16)
} else {
None
}
}
pub fn more(flags: u32) -> bool {
flags & sys::IORING_CQE_F_MORE != 0
}