use core::any::Any;
use core::convert::Infallible;
use core::fmt::{self, Debug, Formatter};
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::panic::{catch_unwind, AssertUnwindSafe};
use completion_core::CompletionFuture;
use pin_project_lite::pin_project;
mod tuple;
pub use tuple::*;
mod all;
pub use all::*;
pub struct Panic(Box<dyn Any + Send>);
unsafe impl Sync for Panic {}
impl Debug for Panic {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.pad("Panic")
}
}
impl Panic {
fn into_inner(self) -> Box<dyn Any + Send> {
self.0
}
}
pub trait TryFuture:
CompletionFuture<Output = Result<<Self as TryFuture>::Ok, <Self as TryFuture>::Error>>
{
type Ok;
type Error;
}
impl<T, E, F> TryFuture for F
where
F: CompletionFuture<Output = Result<T, E>> + ?Sized,
{
type Ok = T;
type Error = E;
}
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub enum ControlFlow<B, C = ()> {
Break(B),
Continue(C),
}
#[cfg(test)]
impl<B, C> ControlFlow<B, C> {
#[track_caller]
fn unwrap_break(self) -> B
where
C: Debug,
{
match self {
Self::Continue(c) => panic!("Expected `Break`, found `Continue({:?})`", c),
Self::Break(b) => b,
}
}
#[track_caller]
fn unwrap_continue(self) -> C
where
B: Debug,
{
match self {
Self::Continue(c) => c,
Self::Break(b) => panic!("Expected `Continue`, found `Break({:?})`", b),
}
}
}
pub trait ControlFlowFuture:
CompletionFuture<
Output = ControlFlow<<Self as ControlFlowFuture>::Break, <Self as ControlFlowFuture>::Continue>,
>
{
type Continue;
type Break;
}
impl<C, B, F> ControlFlowFuture for F
where
F: CompletionFuture<Output = ControlFlow<B, C>> + ?Sized,
{
type Continue = C;
type Break = B;
}
#[derive(Debug)]
pub enum FutureState<F: ControlFlowFuture> {
Running(F),
Completed(F::Continue),
Cancelled,
Taken,
}
impl<F: ControlFlowFuture + Unpin> Unpin for FutureState<F> {}
impl<F: ControlFlowFuture> FutureState<F> {
fn take_output(self: Pin<&mut Self>) -> Option<F::Continue> {
if let Self::Completed(_) = &*self {
Some(
match mem::replace(unsafe { Pin::into_inner_unchecked(self) }, Self::Taken) {
Self::Completed(c) => c,
_ => unreachable!(),
},
)
} else {
None
}
}
unsafe fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<Result<F::Break, Panic>>> {
let this = Pin::into_inner_unchecked(self);
match this {
Self::Running(fut) => {
let fut = Pin::new_unchecked(fut);
match catch_unwind(AssertUnwindSafe(|| fut.poll(cx))) {
Ok(Poll::Ready(ControlFlow::Continue(c))) => {
*this = Self::Completed(c);
Poll::Ready(ControlFlow::Continue(()))
}
Ok(Poll::Ready(ControlFlow::Break(b))) => {
*this = Self::Cancelled;
Poll::Ready(ControlFlow::Break(Ok(b)))
}
Ok(Poll::Pending) => Poll::Pending,
Err(panic) => {
*this = Self::Cancelled;
Poll::Ready(ControlFlow::Break(Err(Panic(panic))))
}
}
}
Self::Completed(_) => Poll::Ready(ControlFlow::Continue(())),
Self::Cancelled => panic!("Called `FutureState::poll` after cancellation"),
Self::Taken => panic!("Called `FutureState::poll` after output has been taken"),
}
}
unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ControlFlow<Panic>> {
let this = Pin::into_inner_unchecked(self);
match this {
Self::Running(fut) => {
let fut = Pin::new_unchecked(fut);
match catch_unwind(AssertUnwindSafe(|| fut.poll_cancel(cx))) {
Ok(Poll::Ready(())) => {
*this = Self::Cancelled;
Poll::Ready(ControlFlow::Continue(()))
}
Ok(Poll::Pending) => Poll::Pending,
Err(e) => {
*this = Self::Cancelled;
Poll::Ready(ControlFlow::Break(Panic(e)))
}
}
}
Self::Completed(_) | Self::Cancelled => Poll::Ready(ControlFlow::Continue(())),
Self::Taken => panic!("Called `FutureState::poll_cancel` after output has been taken"),
}
}
unsafe fn poll_panicked(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<Infallible>> {
let this = Pin::into_inner_unchecked(self);
match this {
Self::Running(fut) => {
let fut = Pin::new_unchecked(fut);
match catch_unwind(AssertUnwindSafe(|| fut.poll_cancel(cx))) {
Ok(Poll::Ready(())) | Err(_) => {
*this = Self::Cancelled;
Poll::Ready(ControlFlow::Continue(()))
}
Ok(Poll::Pending) => Poll::Pending,
}
}
Self::Completed(_) | Self::Cancelled => Poll::Ready(ControlFlow::Continue(())),
Self::Taken => {
panic!("Called `FutureState::poll_panicked` after output has been taken")
}
}
}
}
pin_project! {
#[derive(Debug)]
pub struct ZipFuture<F> {
#[pin]
inner: F,
}
}
impl<F> ZipFuture<F> {
fn new(inner: F) -> Self {
Self { inner }
}
}
impl<F: CompletionFuture> CompletionFuture for ZipFuture<F> {
type Output = ControlFlow<Infallible, F::Output>;
unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map(ControlFlow::Continue)
}
unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.project().inner.poll_cancel(cx)
}
}
pin_project! {
#[derive(Debug)]
pub struct TryZipFuture<F> {
#[pin]
inner: F,
}
}
impl<F> TryZipFuture<F> {
fn new(inner: F) -> Self {
Self { inner }
}
}
impl<F, T, E> CompletionFuture for TryZipFuture<F>
where
F: CompletionFuture<Output = Result<T, E>>,
{
type Output = ControlFlow<E, T>;
unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map(|res| match res {
Ok(val) => ControlFlow::Continue(val),
Err(e) => ControlFlow::Break(e),
})
}
unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.project().inner.poll_cancel(cx)
}
}
pin_project! {
#[derive(Debug)]
pub struct RaceFuture<F> {
#[pin]
inner: F,
}
}
impl<F> RaceFuture<F> {
fn new(inner: F) -> Self {
Self { inner }
}
}
impl<F: CompletionFuture> CompletionFuture for RaceFuture<F> {
type Output = ControlFlow<F::Output, Infallible>;
unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map(ControlFlow::Break)
}
unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.project().inner.poll_cancel(cx)
}
}
pin_project! {
#[derive(Debug)]
pub struct RaceOkFuture<F> {
#[pin]
inner: F,
}
}
impl<F> RaceOkFuture<F> {
fn new(inner: F) -> Self {
Self { inner }
}
}
impl<F, T, E> CompletionFuture for RaceOkFuture<F>
where
F: CompletionFuture<Output = Result<T, E>>,
{
type Output = ControlFlow<T, E>;
unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map(|res| match res {
Ok(val) => ControlFlow::Break(val),
Err(e) => ControlFlow::Continue(e),
})
}
unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.project().inner.poll_cancel(cx)
}
}