pub(crate) mod pending_state;
use core::{
future::Future as CoreFuture,
marker::PhantomPinned,
pin::Pin,
task::{
Context,
Poll,
},
};
use futures_core::future::FusedFuture;
use tracing::error;
use crate::{
op::Op,
reactor::{
Reactor,
REACTOR,
TASK,
},
task::TaskHeader,
};
struct Pending<O: Op> {
state: pending_state::Packed,
op: O,
}
pub struct Future<O: Op> {
pending: Option<Pending<O>>,
_marker: PhantomPinned,
}
impl<O: Op> Future<O> {
pub(crate) fn new(op: O) -> Self {
Self {
pending: Some(Pending {
state: pending_state::Packed::from(pending_state::Unpacked::New),
op,
}),
_marker: PhantomPinned,
}
}
}
impl<O: Op> CoreFuture for Future<O> {
type Output = O::Output;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let pending = this
.pending
.as_mut()
.expect("polling on a terminated future");
match pending_state::Unpacked::from(pending.state) {
pending_state::Unpacked::New => {
unsafe {
Reactor::submit(
&REACTOR,
&mut pending.op,
&mut pending.state as *mut pending_state::Packed,
)
.unwrap()
};
let mut task = unsafe { (*TASK.get()).unwrap() };
unsafe { task.as_mut().increment_ref_count() };
pending.state =
pending_state::Packed::from(pending_state::Unpacked::Submitted { task });
Poll::Pending
}
pending_state::Unpacked::Submitted { .. } => Poll::Pending,
pending_state::Unpacked::Completed { ret } => {
let pending = this.pending.take().unwrap();
Poll::Ready(pending.op.complete(ret))
}
}
}
}
impl<O: Op> FusedFuture for Future<O> {
fn is_terminated(&self) -> bool {
self.pending.is_none()
}
}
#[cold]
fn cancel(pending_state: &mut pending_state::Packed) {
match pending_state::Unpacked::from(*pending_state) {
pending_state::Unpacked::Submitted { task } => {
unsafe { TaskHeader::decrement_ref_count(task) };
}
_ => return,
};
if let Err(err) = Reactor::cancel(&REACTOR, pending_state as *mut pending_state::Packed) {
error!("failed to cancel operation: {err}");
crate::freeze();
}
}
impl<O: Op> Drop for Future<O> {
fn drop(&mut self) {
if let Some(pending) = &mut self.pending {
cancel(&mut pending.state);
}
}
}