#[cfg(feature = "allocator_api")]
use std::alloc::Allocator;
#[doc(no_inline)]
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{io, marker::PhantomData, time::Duration};
use io_uring::{
cqueue,
opcode::AsyncCancel,
squeue,
types::{SubmitArgs, Timespec},
IoUring,
};
pub(crate) use libc::{sockaddr_storage, socklen_t};
use crate::{
driver::{CompleteIo, Entry, OpObject, Operation},
vec_deque_alloc,
};
pub(crate) mod op;
pub trait OpCode {
fn create_entry(&mut self) -> squeue::Entry;
}
pub struct Driver<'arena> {
inner: IoUring,
squeue_buffer: Vec<squeue::Entry>,
_lifetime: PhantomData<&'arena ()>,
}
impl<'arena> Driver<'arena> {
pub fn new() -> io::Result<Self> {
Self::with_entries(1024)
}
pub fn with_entries(entries: u32) -> io::Result<Self> {
Ok(Self {
inner: IoUring::new(entries)?,
squeue_buffer: Vec::with_capacity(entries as usize),
_lifetime: PhantomData,
})
}
fn submit(&mut self, timeout: Option<Duration>) -> io::Result<()> {
loop {
let res = match timeout {
None => self.inner.submit_and_wait(1),
Some(Duration::ZERO) => self.inner.submit(),
Some(duration) => {
let timespec = timespec(duration);
let args = SubmitArgs::new().timespec(×pec);
self.inner.submitter().submit_with_args(1, &args)
}
};
match res {
Ok(_) => break Ok(()),
Err(e) => match e.raw_os_error() {
Some(libc::EINTR) => continue,
Some(libc::ETIME) => break Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
Some(libc::EBUSY) | Some(libc::EAGAIN) => break Ok(()),
_ => break Err(e),
},
}
}
}
fn complete_entries(&mut self, entries: &mut impl Extend<Entry>) {
const TIMER_EXPIRED: i32 = -libc::ETIME;
const NO_ENTRY: i32 = -libc::ENOENT;
const NOT_CANCELLABLE: i32 = -libc::EALREADY;
let completed_entries = self.inner.completion().filter_map(|entry| {
match entry.result() {
TIMER_EXPIRED => Some(Entry::new(entry.user_data() as usize, Ok(0))),
NO_ENTRY |
NOT_CANCELLABLE => None,
_ => Some(create_entry(entry)),
}
});
entries.extend(completed_entries);
}
}
impl<'arena> CompleteIo<'arena> for Driver<'arena> {
#[inline]
fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
Ok(())
}
#[inline]
fn try_cancel(&mut self, user_data: usize) -> Result<(), ()> {
let squeue_entry = AsyncCancel::new(user_data as u64)
.build()
.user_data(user_data as u64);
unsafe { self.inner.submission().push(&squeue_entry) }.map_err(|_| ())
}
#[inline]
fn try_push<O: OpCode>(
&mut self,
mut op: Operation<'arena, O>,
) -> Result<(), Operation<'arena, O>> {
let user_data = op.user_data();
let squeue_entry = op.opcode().create_entry().user_data(user_data as _);
unsafe { self.inner.submission().push(&squeue_entry) }.map_err(|_| op)
}
#[inline]
fn try_push_dyn(&mut self, mut op: OpObject<'arena>) -> Result<(), OpObject<'arena>> {
let user_data = op.user_data();
let squeue_entry = op.opcode().create_entry().user_data(user_data as _);
unsafe { self.inner.submission().push(&squeue_entry) }.map_err(|_| op)
}
#[inline]
fn push_queue<#[cfg(feature = "allocator_api")] A: Allocator + Unpin + 'arena>(
&mut self,
ops_queue: &mut vec_deque_alloc!(OpObject<'arena>, A),
) {
let to_drain = self.capacity_left().min(ops_queue.len());
if to_drain == 0 {
return;
};
let mut squeue = self.inner.submission();
let drain_iter = ops_queue.drain(..to_drain).map(|mut op| {
let user_data = op.user_data();
op.opcode().create_entry().user_data(user_data as _)
});
self.squeue_buffer.clear();
self.squeue_buffer.extend(drain_iter);
unsafe {
squeue
.push_multiple(&self.squeue_buffer)
.expect("in capacity")
};
}
#[inline]
fn capacity_left(&self) -> usize {
let squeue = unsafe { self.inner.submission_shared() };
squeue.capacity() - squeue.len()
}
unsafe fn submit_and_wait_completed(
&mut self,
timeout: Option<Duration>,
completed: &mut impl Extend<Entry>,
) -> io::Result<()> {
self.inner.submission().sync();
self.submit(timeout)?;
self.complete_entries(completed);
Ok(())
}
}
impl AsRawFd for Driver<'_> {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
#[inline]
fn create_entry(entry: cqueue::Entry) -> Entry {
let result = entry.result();
let result = if result < 0 {
Err(io::Error::from_raw_os_error(-result))
} else {
Ok(result as _)
};
Entry::new(entry.user_data() as _, result)
}
#[inline]
fn timespec(duration: std::time::Duration) -> Timespec {
Timespec::new()
.sec(duration.as_secs())
.nsec(duration.subsec_nanos())
}