use crate::io::uring::open::Open;
use crate::io::uring::write::Write;
use crate::runtime::Handle;
use io_uring::cqueue;
use io_uring::squeue::Entry;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use std::{io, mem};
#[allow(dead_code)]
#[derive(Debug)]
pub(crate) enum CancelData {
Open(Open),
Write(Write),
}
#[derive(Debug)]
pub(crate) enum Lifecycle {
Submitted,
Waiting(Waker),
Cancelled(
#[allow(dead_code)] CancelData,
),
Completed(io_uring::cqueue::Entry),
}
pub(crate) enum State {
Initialize(Option<Entry>),
Polled(usize),
Complete,
}
pub(crate) struct Op<T: Cancellable> {
handle: Handle,
state: State,
data: Option<T>,
}
impl<T: Cancellable> Op<T> {
pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
let handle = Handle::current();
Self {
handle,
data: Some(data),
state: State::Initialize(Some(entry)),
}
}
pub(crate) fn take_data(&mut self) -> Option<T> {
self.data.take()
}
}
impl<T: Cancellable> Drop for Op<T> {
fn drop(&mut self) {
match self.state {
State::Complete => (),
State::Polled(index) => {
let data = self.take_data();
let handle = &mut self.handle;
handle.inner.driver().io().cancel_op(index, data);
}
State::Initialize(_) => (),
}
}
}
pub(crate) struct CqeResult {
pub(crate) result: io::Result<u32>,
}
impl From<cqueue::Entry> for CqeResult {
fn from(cqe: cqueue::Entry) -> Self {
let res = cqe.result();
let result = if res >= 0 {
Ok(res as u32)
} else {
Err(io::Error::from_raw_os_error(-res))
};
CqeResult { result }
}
}
pub(crate) trait Completable {
type Output;
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output>;
}
pub(crate) trait Cancellable {
fn cancel(self) -> CancelData;
}
impl<T: Cancellable> Unpin for Op<T> {}
impl<T: Cancellable + Completable + Send> Future for Op<T> {
type Output = io::Result<T::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let handle = &mut this.handle;
let driver = handle.inner.driver().io();
match &mut this.state {
State::Initialize(entry_opt) => {
let entry = entry_opt.take().expect("Entry must be present");
let waker = cx.waker().clone();
let idx = unsafe { driver.register_op(entry, waker)? };
this.state = State::Polled(idx);
Poll::Pending
}
State::Polled(idx) => {
let mut ctx = driver.get_uring().lock();
let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present");
match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => {
let waker = cx.waker().clone();
*lifecycle = Lifecycle::Waiting(waker);
Poll::Pending
}
Lifecycle::Waiting(prev) => {
*lifecycle = Lifecycle::Waiting(prev);
Poll::Pending
}
Lifecycle::Completed(cqe) => {
ctx.remove_op(*idx);
this.state = State::Complete;
drop(ctx);
let data = this
.take_data()
.expect("Data must be present on completion");
Poll::Ready(data.complete(cqe.into()))
}
Lifecycle::Submitted => {
unreachable!("Submitted lifecycle should never be seen here");
}
Lifecycle::Cancelled(_) => {
unreachable!("Cancelled lifecycle should never be seen here");
}
}
}
State::Complete => {
panic!("Future polled after completion");
}
}
}
}