use std::error::Error;
use std::fmt::{self, Debug, Display, Formatter};
use std::mem;
use std::sync::atomic;
use crate::sys;
use crate::util::{unsync_load, Mmap};
use bitflags::bitflags;
pub(crate) struct Inner<E: EntryMarker> {
pub(crate) head: *const atomic::AtomicU32,
pub(crate) tail: *const atomic::AtomicU32,
pub(crate) ring_mask: u32,
pub(crate) ring_entries: u32,
pub(crate) flags: *const atomic::AtomicU32,
dropped: *const atomic::AtomicU32,
pub(crate) sqes: *mut E,
}
pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
head: u32,
tail: u32,
queue: &'a Inner<E>,
}
pub(crate) use private::Sealed;
mod private {
pub trait Sealed {
const ADDITIONAL_FLAGS: u32;
}
}
pub trait EntryMarker: Clone + Debug + From<Entry> + Sealed {}
#[repr(C)]
pub struct Entry(pub(crate) sys::io_uring_sqe);
#[repr(C)]
#[derive(Clone)]
pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
#[test]
fn test_entry_sizes() {
assert_eq!(mem::size_of::<Entry>(), 64);
assert_eq!(mem::size_of::<Entry128>(), 128);
}
bitflags! {
pub struct Flags: u8 {
#[doc(hidden)]
const FIXED_FILE = 1 << sys::IOSQE_FIXED_FILE_BIT;
const IO_DRAIN = 1 << sys::IOSQE_IO_DRAIN_BIT;
const IO_LINK = 1 << sys::IOSQE_IO_LINK_BIT;
const IO_HARDLINK = 1 << sys::IOSQE_IO_HARDLINK_BIT;
const ASYNC = 1 << sys::IOSQE_ASYNC_BIT;
const BUFFER_SELECT = 1 << sys::IOSQE_BUFFER_SELECT_BIT;
const SKIP_SUCCESS = 1 << sys::IOSQE_CQE_SKIP_SUCCESS_BIT;
}
}
impl<E: EntryMarker> Inner<E> {
#[rustfmt::skip]
pub(crate) unsafe fn new(
sq_mmap: &Mmap,
sqe_mmap: &Mmap,
p: &sys::io_uring_params,
) -> Self {
let head = sq_mmap.offset(p.sq_off.head ) as *const atomic::AtomicU32;
let tail = sq_mmap.offset(p.sq_off.tail ) as *const atomic::AtomicU32;
let ring_mask = sq_mmap.offset(p.sq_off.ring_mask ).cast::<u32>().read();
let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
let flags = sq_mmap.offset(p.sq_off.flags ) as *const atomic::AtomicU32;
let dropped = sq_mmap.offset(p.sq_off.dropped ) as *const atomic::AtomicU32;
let array = sq_mmap.offset(p.sq_off.array ) as *mut u32;
let sqes = sqe_mmap.as_mut_ptr() as *mut E;
for i in 0..ring_entries {
array.add(i as usize).write_volatile(i);
}
Self {
head,
tail,
ring_mask,
ring_entries,
flags,
dropped,
sqes,
}
}
#[inline]
pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
SubmissionQueue {
head: (*self.head).load(atomic::Ordering::Acquire),
tail: unsync_load(self.tail),
queue: self,
}
}
#[inline]
pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
unsafe { self.borrow_shared() }
}
}
impl<E: EntryMarker> SubmissionQueue<'_, E> {
#[inline]
pub fn sync(&mut self) {
unsafe {
(*self.queue.tail).store(self.tail, atomic::Ordering::Release);
self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
}
}
#[inline]
pub fn need_wakeup(&self) -> bool {
unsafe {
(*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP != 0
}
}
pub fn dropped(&self) -> u32 {
unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
}
pub fn cq_overflow(&self) -> bool {
unsafe {
(*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.queue.ring_entries as usize
}
#[inline]
pub fn len(&self) -> usize {
self.tail.wrapping_sub(self.head) as usize
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
#[inline]
pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
if !self.is_full() {
self.push_unchecked(entry);
Ok(())
} else {
Err(PushError)
}
}
#[inline]
pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
if self.capacity() - self.len() < entries.len() {
return Err(PushError);
}
for entry in entries {
self.push_unchecked(entry);
}
Ok(())
}
#[inline]
unsafe fn push_unchecked(&mut self, entry: &E) {
*self
.queue
.sqes
.add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
self.tail = self.tail.wrapping_add(1);
}
}
impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
#[inline]
fn drop(&mut self) {
unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
}
}
impl Entry {
#[inline]
pub fn flags(mut self, flags: Flags) -> Entry {
self.0.flags |= flags.bits();
self
}
#[inline]
pub fn user_data(mut self, user_data: u64) -> Entry {
self.0.user_data = user_data;
self
}
pub fn personality(mut self, personality: u16) -> Entry {
self.0.personality = personality;
self
}
}
impl Sealed for Entry {
const ADDITIONAL_FLAGS: u32 = 0;
}
impl EntryMarker for Entry {}
impl Clone for Entry {
fn clone(&self) -> Entry {
Entry(unsafe { mem::transmute_copy(&self.0) })
}
}
impl Debug for Entry {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Entry")
.field("op_code", &self.0.opcode)
.field("flags", &self.0.flags)
.field("user_data", &self.0.user_data)
.finish()
}
}
impl Entry128 {
#[inline]
pub fn flags(mut self, flags: Flags) -> Entry128 {
self.0 .0.flags |= flags.bits();
self
}
#[inline]
pub fn user_data(mut self, user_data: u64) -> Entry128 {
self.0 .0.user_data = user_data;
self
}
pub fn personality(mut self, personality: u16) -> Entry128 {
self.0 .0.personality = personality;
self
}
}
impl Sealed for Entry128 {
const ADDITIONAL_FLAGS: u32 = sys::IORING_SETUP_SQE128;
}
impl EntryMarker for Entry128 {}
impl From<Entry> for Entry128 {
fn from(entry: Entry) -> Entry128 {
Entry128(entry, [0u8; 64])
}
}
impl Debug for Entry128 {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Entry128")
.field("op_code", &self.0 .0.opcode)
.field("flags", &self.0 .0.flags)
.field("user_data", &self.0 .0.user_data)
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct PushError;
impl Display for PushError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("submission queue is full")
}
}
impl Error for PushError {}
impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut d = f.debug_list();
let mut pos = self.head;
while pos != self.tail {
let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
d.entry(&entry);
pos = pos.wrapping_add(1);
}
d.finish()
}
}