use crate::io::blocking::Buf;
use crate::io::uring::open::Open;
use crate::io::uring::read::Read;
use crate::io::uring::utils::ArcFd;
use crate::io::uring::write::Write;
use crate::runtime::Handle;
use io_uring::cqueue;
use io_uring::squeue::Entry;
use std::future::Future;
use std::io::{self, Error};
use std::mem;
use std::os::fd::OwnedFd;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
#[allow(dead_code)]
#[derive(Debug)]
pub(crate) enum CancelData {
Open(Open),
Write(Write),
ReadVec(Read<Vec<u8>, OwnedFd>),
ReadBuf(Read<Buf, ArcFd>),
}
#[derive(Debug)]
pub(crate) enum Lifecycle {
Submitted,
Waiting(Waker),
Cancelled(
#[allow(dead_code)] CancelData,
),
Completed(io_uring::cqueue::Entry),
}
pub(crate) enum State {
Initialize(Option<Entry>),
Polled(usize),
Complete,
}
pub(crate) struct Op<T: Cancellable> {
handle: Handle,
state: State,
data: Option<T>,
}
impl<T: Cancellable> Op<T> {
pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
let handle = Handle::current();
Self {
handle,
data: Some(data),
state: State::Initialize(Some(entry)),
}
}
pub(crate) fn take_data(&mut self) -> Option<T> {
self.data.take()
}
}
impl<T: Cancellable> Drop for Op<T> {
fn drop(&mut self) {
match self.state {
State::Complete => (),
State::Polled(index) => {
let data = self.take_data();
let handle = &mut self.handle;
handle.inner.driver().io().cancel_op(index, data);
}
State::Initialize(_) => (),
}
}
}
pub(crate) struct CqeResult {
pub(crate) result: io::Result<u32>,
}
impl From<cqueue::Entry> for CqeResult {
fn from(cqe: cqueue::Entry) -> Self {
let res = cqe.result();
let result = if res >= 0 {
Ok(res as u32)
} else {
Err(io::Error::from_raw_os_error(-res))
};
CqeResult { result }
}
}
pub(crate) trait Completable {
type Output;
fn complete(self, cqe: CqeResult) -> Self::Output;
fn complete_with_error(self, error: Error) -> Self::Output;
}
pub(crate) trait Cancellable {
fn cancel(self) -> CancelData;
}
impl<T: Cancellable> Unpin for Op<T> {}
impl<T: Cancellable + Completable + Send> Future for Op<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let handle = &mut this.handle;
let driver = handle.inner.driver().io();
match &mut this.state {
State::Initialize(entry_opt) => {
let entry = entry_opt.take().expect("Entry must be present");
let waker = cx.waker().clone();
match unsafe { driver.register_op(entry, waker) } {
Ok(idx) => this.state = State::Polled(idx),
Err(err) => {
let data = this
.take_data()
.expect("Data must be present on Initialization");
this.state = State::Complete;
return Poll::Ready(data.complete_with_error(err));
}
};
Poll::Pending
}
State::Polled(idx) => {
let mut ctx = driver.get_uring().lock();
let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present");
match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => {
let waker = cx.waker().clone();
*lifecycle = Lifecycle::Waiting(waker);
Poll::Pending
}
Lifecycle::Waiting(prev) => {
*lifecycle = Lifecycle::Waiting(prev);
Poll::Pending
}
Lifecycle::Completed(cqe) => {
ctx.remove_op(*idx);
this.state = State::Complete;
drop(ctx);
let data = this
.take_data()
.expect("Data must be present on completion");
Poll::Ready(data.complete(cqe.into()))
}
Lifecycle::Submitted => {
unreachable!("Submitted lifecycle should never be seen here");
}
Lifecycle::Cancelled(_) => {
unreachable!("Cancelled lifecycle should never be seen here");
}
}
}
State::Complete => {
panic!("Future polled after completion");
}
}
}
}