use std::io;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::ready;
use crate::completion::Completion;
use crate::drive::Completion as ExternalCompletion;
use crate::drive::Drive;
use crate::Cancellation;
use State::*;
pub struct Ring<D: Drive> {
state: State,
completion: Option<Completion>,
driver: D,
}
#[derive(Debug, Eq, PartialEq)]
enum State {
Inert = 0,
Prepared,
Submitted,
Lost,
}
impl<D: Default + Drive> Default for Ring<D> {
fn default() -> Ring<D> {
Ring::new(D::default())
}
}
impl<D: Drive + Clone> Clone for Ring<D> {
fn clone(&self) -> Ring<D> {
Ring::new(self.driver.clone())
}
}
impl<D: Drive> Ring<D> {
#[inline(always)]
pub fn new(driver: D) -> Ring<D> {
Ring {
state: Inert,
completion: None,
driver
}
}
pub fn driver(&self) -> &D {
&self.driver
}
#[inline]
pub fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
is_eager: bool,
prepare: impl FnOnce(&mut iou::SubmissionQueueEvent<'_>),
) -> Poll<io::Result<usize>> {
match self.state {
Inert => {
ready!(self.as_mut().poll_prepare(ctx, prepare));
ready!(self.as_mut().poll_submit(ctx, is_eager));
Poll::Pending
}
Prepared => {
match self.as_mut().poll_complete(ctx) {
ready @ Poll::Ready(..) => ready,
Poll::Pending => {
ready!(self.poll_submit(ctx, is_eager));
Poll::Pending
}
}
}
Submitted => self.poll_complete(ctx),
Lost => panic!("Ring in a bad state; driver is faulty"),
}
}
#[inline(always)]
fn poll_prepare(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
prepare: impl FnOnce(&mut iou::SubmissionQueueEvent<'_>),
) -> Poll<()> {
let (driver, state, completion_slot) = self.split();
let completion = ready!(driver.poll_prepare(ctx, |sqe, ctx| {
struct SubmissionCleaner<'a>(iou::SubmissionQueueEvent<'a>);
impl Drop for SubmissionCleaner<'_> {
fn drop(&mut self) {
unsafe {
self.0.prep_nop();
self.0.set_user_data(0);
}
}
}
let mut sqe = SubmissionCleaner(sqe);
*state = Lost;
prepare(&mut sqe.0);
let completion = Completion::new(ctx.waker().clone());
sqe.0.set_user_data(completion.addr());
mem::forget(sqe);
ExternalCompletion::new(completion, ctx)
}));
*state = Prepared;
*completion_slot = Some(completion.real);
Poll::Ready(())
}
#[inline(always)]
fn poll_submit(self: Pin<&mut Self>, ctx: &mut Context<'_>, is_eager: bool) -> Poll<()> {
let (driver, state, _) = self.split();
let _ = ready!(driver.poll_submit(ctx, is_eager));
*state = Submitted;
Poll::Ready(())
}
#[inline(always)]
fn poll_complete(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let (_, state, completion_slot) = self.split();
match completion_slot.take().unwrap().check(ctx.waker()) {
Ok(result) => {
*state = Inert;
Poll::Ready(result)
}
Err(completion) => {
*completion_slot = Some(completion);
Poll::Pending
}
}
}
#[inline]
pub fn cancel(&mut self, cancellation: Cancellation) {
if let Some(completion) = self.completion.take() {
completion.cancel(cancellation);
}
}
pub fn cancel_pinned(self: Pin<&mut Self>, cancellation: Cancellation) {
unsafe { Pin::get_unchecked_mut(self).cancel(cancellation) }
}
fn split(self: Pin<&mut Self>) -> (Pin<&mut D>, &mut State, &mut Option<Completion>) {
unsafe {
let this = Pin::get_unchecked_mut(self);
(Pin::new_unchecked(&mut this.driver), &mut this.state, &mut this.completion)
}
}
}