use std::sync::atomic;
use bitflags::bitflags;
use linux_io_uring_sys as sys;
use crate::util::{ Mmap, unsync_load };
use crate::mmap_offset;
pub struct SubmissionQueue {
pub(crate) head: *const atomic::AtomicU32,
pub(crate) tail: *const atomic::AtomicU32,
pub(crate) ring_mask: *const u32,
pub(crate) ring_entries: *const u32,
pub(crate) flags: *const atomic::AtomicU32,
dropped: *const atomic::AtomicU32,
#[allow(dead_code)]
array: *mut u32,
pub(crate) sqes: *mut sys::io_uring_sqe
}
pub struct AvailableQueue<'a> {
head: u32,
tail: u32,
ring_mask: u32,
ring_entries: u32,
queue: &'a mut SubmissionQueue
}
#[repr(transparent)]
#[derive(Clone)]
pub struct Entry(pub(crate) sys::io_uring_sqe);
bitflags!{
pub struct Flags: u8 {
#[doc(hidden)]
const FIXED_FILE = sys::IOSQE_FIXED_FILE as _;
const IO_DRAIN = sys::IOSQE_IO_DRAIN as _;
const IO_LINK = sys::IOSQE_IO_LINK as _;
#[cfg(feature = "unstable")]
const IO_HARDLINK = sys::IOSQE_IO_HARDLINK as _;
}
}
impl SubmissionQueue {
pub(crate) unsafe fn new(sq_mmap: &Mmap, sqe_mmap: &Mmap, p: &sys::io_uring_params) -> SubmissionQueue {
mmap_offset!{
let head = sq_mmap + p.sq_off.head => *const atomic::AtomicU32;
let tail = sq_mmap + p.sq_off.tail => *const atomic::AtomicU32;
let ring_mask = sq_mmap + p.sq_off.ring_mask => *const u32;
let ring_entries = sq_mmap + p.sq_off.ring_entries => *const u32;
let flags = sq_mmap + p.sq_off.flags => *const atomic::AtomicU32;
let dropped = sq_mmap + p.sq_off.dropped => *const atomic::AtomicU32;
let array = sq_mmap + p.sq_off.array => *mut u32;
let sqes = sqe_mmap + 0 => *mut sys::io_uring_sqe;
}
for i in 0..*ring_entries {
*array.add(i as usize) = i;
}
SubmissionQueue {
head, tail,
ring_mask, ring_entries,
flags, dropped,
array,
sqes
}
}
pub fn need_wakeup(&self) -> bool {
unsafe {
(*self.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP
!= 0
}
}
pub fn dropped(&self) -> u32 {
unsafe {
(*self.dropped).load(atomic::Ordering::Acquire)
}
}
pub fn capacity(&self) -> usize {
unsafe {
self.ring_entries.read_volatile() as usize
}
}
pub fn len(&self) -> usize {
let head = unsafe { (*self.head).load(atomic::Ordering::Acquire) };
let tail = unsafe { unsync_load(self.tail) };
tail.wrapping_sub(head) as usize
}
pub fn is_empty(&self) -> bool {
let head = unsafe { (*self.head).load(atomic::Ordering::Acquire) };
let tail = unsafe { unsync_load(self.tail) };
head == tail
}
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
pub fn available(&mut self) -> AvailableQueue<'_> {
unsafe {
AvailableQueue {
head: (*self.head).load(atomic::Ordering::Acquire),
tail: unsync_load(self.tail),
ring_mask: self.ring_mask.read_volatile(),
ring_entries: self.ring_entries.read_volatile(),
queue: self
}
}
}
}
impl AvailableQueue<'_> {
pub fn sync(&mut self) {
unsafe {
(*self.queue.tail).store(self.tail, atomic::Ordering::Release);
self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
}
}
pub fn capacity(&self) -> usize {
self.ring_entries as usize
}
pub fn len(&self) -> usize {
self.tail.wrapping_sub(self.head) as usize
}
pub fn is_empty(&self) -> bool {
self.head == self.tail
}
pub fn is_full(&self) -> bool {
self.tail.wrapping_sub(self.head) == self.ring_entries
}
pub unsafe fn push(&mut self, Entry(entry): Entry) -> Result<(), Entry> {
if self.len() < self.capacity() {
*self.queue.sqes.add((self.tail & self.ring_mask) as usize)
= entry;
self.tail = self.tail.wrapping_add(1);
Ok(())
} else {
Err(Entry(entry))
}
}
}
impl Drop for AvailableQueue<'_> {
fn drop(&mut self) {
unsafe {
(*self.queue.tail).store(self.tail, atomic::Ordering::Release);
}
}
}
impl Entry {
pub fn flags(mut self, flags: Flags) -> Entry {
self.0.flags |= flags.bits();
self
}
pub fn user_data(mut self, user_data: u64) -> Entry {
self.0.user_data = user_data;
self
}
}