dewit 0.0.1

Define scheduling and execution of code separately from data flow
Documentation
use alloc::boxed::Box;
use core::pin::Pin;

use futures::future::join_all;

use crate::{
    mode::Mode,
    task::{
        Task,
        TaskOutput,
    },
};

/// Executes code as a series of futures, otherwise blocking in a single thread
/// for non-IO operations.
pub struct Async;

impl<'src> Mode<'src> for Async {
    type Output<O>
        = Pin<Box<dyn Future<Output = O> + Send + 'src>>
    where
        O: Send + 'src;

    #[inline(always)]
    fn invoke<I, O, T>(&'src self, task: T, input: I) -> TaskOutput<'src, Self, O>
    where
        T: Task<'src, I, O>,
        O: Send + 'src,
    {
        task.go(self, input)
    }

    #[inline(always)]
    fn spawn_blocking<O>(&self, f: impl FnOnce() -> O + Send + 'src) -> Self::Output<O>
    where
        O: Send + 'src,
    {
        // We immediately evaluate the function here to avoid putting blocking work into
        // an async runtime that may not be prepared for it.
        let res = f();
        Box::pin(async move { res })
    }
    #[inline(always)]
    fn spawn_cpu<O>(&self, f: impl FnOnce() -> O + Send + 'src) -> Self::Output<O>
    where
        O: Send + 'src,
    {
        // We immediately evaluate the function here to avoid putting CPU work into
        // an async runtime that may not be prepared for it.
        let res = f();
        Box::pin(async move { res })
    }
    #[inline(always)]
    fn spawn_io<O, Fut>(&self, f: impl FnOnce() -> Fut + Send + 'src) -> Self::Output<O>
    where
        O: Send + 'src,
        Fut: Future<Output = O> + Send + 'src,
    {
        Box::pin(async move { f().await })
    }

    #[inline(always)]
    fn join<AO, BO>(
        &'src self,
        a: impl FnOnce(&'src Self) -> Self::Output<AO> + Send + 'src,
        b: impl FnOnce(&'src Self) -> Self::Output<BO> + Send + 'src,
    ) -> Self::Output<(AO, BO)>
    where
        AO: Send + 'src,
        BO: Send + 'src,
    {
        // FIXME: don't depend on this nightly thing
        Box::pin(core::future::join!(a(self), b(self),))
    }

    #[inline(always)]
    fn then<I, O>(
        &'src self,
        input: Self::Output<I>,
        f: impl FnOnce(&'src Self, I) -> Self::Output<O> + Send + 'src,
    ) -> Self::Output<O>
    where
        I: Send + 'src,
        O: Send + 'src,
    {
        Box::pin(async move { f(self, input.await).await })
    }

    #[inline(always)]
    fn map_result<I, E, O>(
        &'src self,
        input: Self::Output<Result<I, E>>,
        f: impl FnOnce(&'src Self, I) -> Self::Output<O> + Send + 'src,
    ) -> Self::Output<Result<O, E>>
    where
        I: Send + 'src,
        E: Send + 'src,
        O: Send + 'src,
    {
        Box::pin(async move { Ok(f(self, input.await?).await) })
    }
    #[inline(always)]
    fn map_option<I, O>(
        &'src self,
        input: Self::Output<Option<I>>,
        f: impl FnOnce(&'src Self, I) -> Self::Output<O> + Send + 'src,
    ) -> Self::Output<Option<O>>
    where
        I: Send + 'src,
        O: Send + 'src,
    {
        Box::pin(async move { Some(f(self, input.await?).await) })
    }
    #[inline(always)]
    fn map_iterator<I, O, II>(
        &'src self,
        input: Self::Output<II>,
        f: impl Fn(&'src Self, I) -> Self::Output<O> + Send + 'src,
    ) -> Self::Output<impl Iterator<Item = O> + Send + 'src>
    where
        O: Send + 'src,
        II: Iterator<Item = I> + Send + 'src,
    {
        // FIXME: this is ugly as sin and also requires synchronization after each map,
        // ideally there'd be some magical world where we can just stream iterators into
        // one another.
        Box::pin(async move {
            join_all(input.await.map(move |input| f(self, input)))
                .await
                .into_iter()
        })
    }

    #[inline(always)]
    fn and_then_result<I, E, O>(
        &'src self,
        input: Self::Output<Result<I, E>>,
        f: impl FnOnce(&'src Self, I) -> Self::Output<Result<O, E>> + Send + 'src,
    ) -> Self::Output<Result<O, E>>
    where
        I: Send + 'src,
        E: Send + 'src,
        O: Send + 'src,
    {
        Box::pin(async move { f(self, input.await?).await })
    }
    #[inline(always)]
    fn and_then_option<I, O>(
        &'src self,
        input: Self::Output<Option<I>>,
        f: impl FnOnce(&'src Self, I) -> Self::Output<Option<O>> + Send + 'src,
    ) -> Self::Output<Option<O>>
    where
        I: Send + 'src,
        O: Send + 'src,
    {
        Box::pin(async move { f(self, input.await?).await })
    }
}