use std::{
cell::RefCell,
future::Future,
marker::PhantomData,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
};
use compio_buf::BufResult;
use compio_driver::{Extra, Key, OpCode, Proactor, PushEntry};
use futures_util::future::FusedFuture;
use crate::{
CancelToken,
future::{poll_task, poll_task_with_extra, submit_raw},
waker::{get_ext, get_waker},
};
pub(crate) trait ContextExt {
fn get_waker(&self) -> &Waker;
fn get_cancel(&mut self) -> Option<&CancelToken>;
fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra>;
}
impl ContextExt for Context<'_> {
fn get_waker(&self) -> &Waker {
get_waker(self.waker())
}
fn get_cancel(&mut self) -> Option<&CancelToken> {
get_ext(self.waker())?.get_cancel()
}
fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
let ext = get_ext(self.waker())?;
let mut extra = default();
ext.set_extra(&mut extra);
Some(extra)
}
}
pin_project_lite::pin_project! {
pub struct Submit<T: OpCode, E = ()> {
driver: Rc<RefCell<Proactor>>,
state: Option<State<T, E>>,
}
impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if let Some(State::Submitted { key, .. }) = this.state.take() {
this.driver.borrow_mut().cancel(key);
}
}
}
}
enum State<T: OpCode, E> {
Idle { op: T },
Submitted { key: Key<T>, _p: PhantomData<E> },
}
impl<T: OpCode, E> State<T, E> {
fn submitted(key: Key<T>) -> Self {
State::Submitted {
key,
_p: PhantomData,
}
}
}
impl<T: OpCode> Submit<T, ()> {
pub(crate) fn new(driver: Rc<RefCell<Proactor>>, op: T) -> Self {
Submit {
driver,
state: Some(State::Idle { op }),
}
}
pub fn with_extra(mut self) -> Submit<T, Extra> {
let driver = self.driver.clone();
let Some(state) = self.state.take() else {
return Submit {
driver,
state: None,
};
};
let state = match state {
State::Submitted { key, .. } => State::Submitted {
key,
_p: PhantomData,
},
State::Idle { op } => State::Idle { op },
};
Submit {
driver,
state: Some(state),
}
}
}
impl<T: OpCode + 'static> Future for Submit<T, ()> {
type Output = BufResult<usize, T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
loop {
match this.state.take().expect("Cannot poll after ready") {
State::Submitted { key, .. } => {
let entry = poll_task(&mut this.driver.borrow_mut(), cx.get_waker(), key);
match entry {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));
return Poll::Pending;
}
PushEntry::Ready(res) => return Poll::Ready(res),
}
}
State::Idle { op } => {
let extra = cx.as_extra(|| this.driver.borrow().default_extra());
let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
match entry {
PushEntry::Pending(key) => {
if let Some(cancel) = cx.get_cancel() {
cancel.register(&key);
};
*this.state = Some(State::submitted(key))
}
PushEntry::Ready(res) => {
return Poll::Ready(res);
}
}
}
}
}
}
}
impl<T: OpCode + 'static> Future for Submit<T, Extra> {
type Output = (BufResult<usize, T>, Extra);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
loop {
match this.state.take().expect("Cannot poll after ready") {
State::Submitted { key, .. } => {
let entry =
poll_task_with_extra(&mut this.driver.borrow_mut(), cx.get_waker(), key);
match entry {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));
return Poll::Pending;
}
PushEntry::Ready(res) => return Poll::Ready(res),
}
}
State::Idle { op } => {
let extra = cx.as_extra(|| this.driver.borrow().default_extra());
let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
match entry {
PushEntry::Pending(key) => {
if let Some(cancel) = cx.get_cancel() {
cancel.register(&key);
}
*this.state = Some(State::submitted(key))
}
PushEntry::Ready(res) => {
return Poll::Ready((res, this.driver.borrow().default_extra()));
}
}
}
}
}
}
}
impl<T: OpCode, E> FusedFuture for Submit<T, E>
where
Submit<T, E>: Future,
{
fn is_terminated(&self) -> bool {
self.state.is_none()
}
}