use core::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll, Waker, ready},
};
use std::io;
use io_uring::{cqueue, squeue};
mod slab_list;
use slab::Slab;
use slab_list::{SlabListEntry, SlabListIndices};
use crate::runtime::{CONTEXT, driver};
pub(crate) type Completion = SlabListEntry<CqeResult>;
#[must_use = "Operation does nothing unless submitted to driver with UnsubmittedOneshot::submit"]
pub struct UnsubmittedOneshot<D: 'static, T: OneshotOutputTransform<StoredData = D>> {
stable_data: D,
post_op: T,
sqe: squeue::Entry,
}
impl<D, T: OneshotOutputTransform<StoredData = D>> UnsubmittedOneshot<D, T> {
pub fn new(stable_data: D, post_op: T, sqe: squeue::Entry) -> Self {
Self {
stable_data,
post_op,
sqe,
}
}
pub fn submit(self) -> InFlightOneshot<D, T> {
let handle = CONTEXT
.with(|x| x.handle())
.expect("Could not submit op; not in runtime context");
self.submit_with_driver(&handle)
}
fn submit_with_driver(self, driver: &driver::Handle) -> InFlightOneshot<D, T> {
let index = driver.submit_op_2(self.sqe);
let driver = driver.into();
let inner = InFlightOneshotInner {
index,
driver,
stable_data: self.stable_data,
post_op: self.post_op,
};
InFlightOneshot { inner: Some(inner) }
}
}
pub struct InFlightOneshot<D: 'static, T: OneshotOutputTransform<StoredData = D>> {
inner: Option<InFlightOneshotInner<D, T>>,
}
struct InFlightOneshotInner<D, T: OneshotOutputTransform<StoredData = D>> {
driver: driver::WeakHandle,
index: usize,
stable_data: D,
post_op: T,
}
impl<D: Unpin, T: OneshotOutputTransform<StoredData = D> + Unpin> Future for InFlightOneshot<D, T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let inner = this.inner.as_mut().expect("Cannot poll already-completed operation");
let index = inner.index;
let upgraded = inner
.driver
.upgrade()
.expect("Failed to poll op: driver no longer exists");
let cqe = ready!(upgraded.poll_op_2(index, cx));
let inner = this.inner.take().unwrap();
Poll::Ready(inner.post_op.transform_oneshot_output(inner.stable_data, cqe))
}
}
impl<D: 'static, T: OneshotOutputTransform<StoredData = D>> Drop for InFlightOneshot<D, T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
if let Some(driver) = inner.driver.upgrade() {
driver.remove_op_2(inner.index, inner.stable_data)
}
}
}
}
pub trait OneshotOutputTransform {
type Output;
type StoredData;
fn transform_oneshot_output(self, data: Self::StoredData, cqe: cqueue::Entry) -> Self::Output;
}
pub(crate) struct Op<T: 'static, CqeType = SingleCQE> {
driver: driver::WeakHandle,
index: usize,
data: Option<T>,
_cqe_type: PhantomData<CqeType>,
}
pub(crate) struct SingleCQE;
pub(crate) struct MultiCQEFuture;
pub(crate) trait Completable {
type Output;
fn complete(self, cqe: CqeResult) -> Self::Output;
}
pub(crate) trait Updateable: Completable {
fn update(&mut self, cqe: CqeResult);
}
#[allow(dead_code)]
pub(crate) enum Lifecycle {
Submitted,
Waiting(Waker),
Ignored(Box<dyn std::any::Any>),
Completed(cqueue::Entry),
CompletionList(SlabListIndices),
}
pub(crate) struct CqeResult {
pub(crate) result: io::Result<u32>,
pub(crate) flags: u32,
}
impl From<cqueue::Entry> for CqeResult {
fn from(cqe: cqueue::Entry) -> Self {
let res = cqe.result();
let flags = cqe.flags();
let result = if res >= 0 {
Ok(res as u32)
} else {
Err(io::Error::from_raw_os_error(-res))
};
CqeResult { result, flags }
}
}
impl<T, CqeType> Op<T, CqeType> {
pub(super) fn new(driver: driver::WeakHandle, data: T, index: usize) -> Self {
Op {
driver,
index,
data: Some(data),
_cqe_type: PhantomData,
}
}
pub(super) fn index(&self) -> usize {
self.index
}
pub(super) fn take_data(&mut self) -> Option<T> {
self.data.take()
}
pub(super) fn insert_data(&mut self, data: T) {
self.data = Some(data);
}
}
impl<T> Future for Op<T, SingleCQE>
where
T: Unpin + 'static + Completable,
{
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.driver
.upgrade()
.expect("Not in runtime context")
.poll_op(self.get_mut(), cx)
}
}
impl<T> Future for Op<T, MultiCQEFuture>
where
T: Unpin + 'static + Completable + Updateable,
{
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.driver
.upgrade()
.expect("Not in runtime context")
.poll_multishot_op(self.get_mut(), cx)
}
}
impl<T, CqeType> Drop for Op<T, CqeType> {
fn drop(&mut self) {
self.driver.upgrade().expect("Not in runtime context").remove_op(self)
}
}
impl Lifecycle {
pub(crate) fn complete(&mut self, completions: &mut Slab<Completion>, cqe: cqueue::Entry) -> bool {
use std::mem;
match mem::replace(self, Lifecycle::Submitted) {
x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => {
if io_uring::cqueue::more(cqe.flags()) {
let mut list = SlabListIndices::new().into_list(completions);
list.push(cqe.into());
*self = Lifecycle::CompletionList(list.into_indices());
} else {
*self = Lifecycle::Completed(cqe);
}
if let Lifecycle::Waiting(waker) = x {
waker.wake();
}
false
}
lifecycle @ Lifecycle::Ignored(..) => {
if io_uring::cqueue::more(cqe.flags()) {
*self = lifecycle;
false
} else {
true
}
}
Lifecycle::Completed(..) => {
unreachable!("invalid operation state")
}
Lifecycle::CompletionList(indices) => {
let mut list = indices.into_list(completions);
list.push(cqe.into());
*self = Lifecycle::CompletionList(list.into_indices());
false
}
}
}
}