structured-spawn 1.0.1

Structured async spawn implementations for Tokio
Documentation
//! Structured async task spawn implementations for Tokio
//!
//! Read ["Tree-Structured
//! Concurrency"](https://blog.yoshuawuyts.com/tree-structured-concurrency/) for
//! a complete overview of what structured concurrency is, and how the popular
//! implementations of `task::spawn` fail to implement it. This crate provides
//! two flavors of structured spawn implementations for the Tokio async runtime:
//!
//! - **spawn_relaxed**: this implementation guarantees error propagation and
//! cancellation propagation. But it does not guarantee ordering of operations.
//! - **spawn**: this implementation guarantees error propagation, cancellation
//! propagation, and ordering of operations. But it will block on drop to
//! provide those guarantees. This may deadlock on single-threaded systems.
//!
//! In the majority of cases `spawn` should be preferred over `spawn_relaxed`,
//! unless you're able to provide other ways of mitigating ordering issues, or
//! you know for a fact that logical ordering of drop won't matter. In those
//! case you should document those invariants and preferably test them too.
//! In the future we hope that async Rust will gain the ability to implement
//! some form of "async Drop", which would resolve the tension between waiting
//! for destructors to complete asynchronously and not wanting to block inside
//! drop.
//!
//! # Differences with `tokio::spawn`
//!
//! In Rust `Future`s are structured: if they're fallible they will always
//! propoagate their errors, when dropped they will always propagate
//! cancellation, and they will not return before all the work of their
//! sub-futures has completed. Tasks on the other hand are not structured, since
//! they've been modeled after `std::thread` rather than `std::future`. The
//! solution we adopt in this crate is to instead of treating tasks as:
//! _"`async/.await` versions of threads"_ we instead treat them as:
//! _"parallelizable versions of futures"_. That means making the following
//! changes:
//!
//! - `JoinHandle` has been renamed to `TaskHandle`  since tasks under this crate's
//!   model are lazy, not eager
//! - `TaskHandle`s are marked with `#[must_use]` to ensure they're awaited
//! - Tasks won't start until their handle is `.await`ed
//! - When a `TaskHandle` is dropped, the underlying task is cancelled
//! - Cancelling a task will by default block until the cancellation of the task
//!   has completed
//! - In order for tasks to execute concurrently, you have to concurrently await
//! the handles
//! - Because the relationship between handles and their tasks is now
//! guaranteed, tasks will no longer produce cancellation errors
//!
//! # Implementing Concurrency
//!
//! Because in this crate tasks behave like futures rather than threads, you
//! have to poll the `TaskHandle`s concurrently in order to make progress on
//! them - just like you would with regular futures. For this we recommend using
//! the [`futures-concurrency`] library where possible, and [`futures-util`]
//! where not. `futures-concurrency` provides composable async concurrency
//! operations such as `join`, `race`, and `merge`. As well as fallible versions
//! of those operations, and operations such as `zip` and `chain`. Here's an
//! example of concurrently awaiting multiple async `TaskHandle`s:
//!
//! [`futures-concurrency`]: https://docs.rs/futures-concurrency
//! [`futures-util`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html
//!
//! ```rust
//! # #[tokio::main] async fn main() {
//! use structured_spawn::spawn;
//! use futures_concurrency::prelude::*;
//!
//! let mut handles = vec![];
//! for n in 0..100 {
//!     handles.push(spawn(async move { n * n }));   // 👈 Each task squares a number
//! }
//! let mut outputs: Vec<_> = handles.join().await;  // 👈 The tasks start executing here
//! # assert_eq!(outputs.len(), 100);
//! # }
//! ```
//!
//! The `futures-concurrency` library does not yet implement concurrency
//! operations for variable numbers of futures or streams. This is something
//! we're actively exploring and will eventually be adding. For now it's
//! recommended to instead use APIs such as [`FuturesUnordered`] from the
//! `futures-util` library instead.
//!
//! [`FuturesUnordered`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html

#![forbid(unsafe_code)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, future_incompatible, unreachable_pub)]

use std::{
    future::Future,
    panic,
    pin::Pin,
    sync::mpsc,
    task::{self, Poll},
};

/// Spawn a task on the tokio multi-threaded executor using "strict" semantics.
///
/// This function guarantees that the `TaskHandle` will not complete until the
/// underlying task has completed - even in the case of an error. It achieves this
/// by blocking inside the `TaskHandle`'s destructor until the future has been dropped.
///
/// This matches all of the required criteria for structured concurrency, and
/// in doubt crate authors should prefer to use this function.
///
/// # Examples
///
/// ```
/// use structured_spawn::spawn;
///
/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
/// let value = 12usize;
/// let handle = spawn(async move {
///     value * value
/// });
/// assert_eq!(handle.await?, 144);
/// # Ok(()) }
/// ```
pub fn spawn<T>(future: T) -> TaskHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    let (sender, receiver) = mpsc::channel();
    let ordering = Ordering::Strict(receiver);
    let tokio_handle = tokio::task::spawn(async move {
        let output = future.await;

        // The future has completed, it's safe to drop the sender.
        drop(sender);

        output
    });
    TaskHandle {
        tokio_handle,
        ordering,
        completed: false,
    }
}

/// Spawn a task on the tokio multi-threaded executor using "relaxed" semantics.
///
/// The `TaskHandle` returned by this function does not provide any ordering
/// guarantees with respect to the underlying task. This may lead to logical
/// races, and authors are expected to manually synchronize ordering via other
/// means.
///
/// # Examples
///
/// ```
/// use structured_spawn::spawn_relaxed;
///
/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
/// let value = 12usize;
/// let handle = spawn_relaxed(async move {
///     value * value
/// });
/// assert_eq!(handle.await?, 144);
/// # Ok(()) }
/// ```
pub fn spawn_relaxed<T>(future: T) -> TaskHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    TaskHandle {
        tokio_handle: tokio::task::spawn(future),
        ordering: Ordering::Relaxed,
        completed: false,
    }
}

/// A handle which references a task.
///
/// This handle behaves much like a regular future: it is marked as
/// `#[must_use]`, it won't start doing work until `.await`ed, and when dropped
/// will cancel the underlying task. See [`spawn`] or [`spawn_relaxed`] for more details.
#[derive(Debug)]
#[must_use = "Tasks do nothing unless `.await`ed"]
#[pin_project::pin_project(PinnedDrop)]
pub struct TaskHandle<T: Send> {
    /// A handle to a tokio task
    tokio_handle: tokio::task::JoinHandle<T>,
    /// Which ordering guarantees do our tasks provide?
    ordering: Ordering,
    /// Did the task susccessfully complete?
    completed: bool,
}

/// The future for the task handle. Either produces a value, or an error if the
/// task has panicked during execution.
impl<T: Send> Future for TaskHandle<T> {
    type Output = std::thread::Result<T>;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        let this = self.project();
        match Pin::new(this.tokio_handle).poll(cx) {
            task::Poll::Ready(value) => {
                *this.completed = true;
                match value {
                    Ok(value) => Poll::Ready(Ok(value)),
                    Err(err) => {
                        if err.is_cancelled() {
                            panic!("The `TaskHandle` should never be able to poll the future again after completion");
                        } else if err.is_panic() {
                            // Resume panicking
                            Poll::Ready(Err(err.into_panic()))
                        } else {
                            unreachable!("The only way an error can be triggered from a handle is either through cancellation of panicking");
                        }
                    }
                }
            }
            task::Poll::Pending => Poll::Pending,
        }
    }
}

/// Which ordering guarantees do our tasks provide?
#[derive(Debug)]
enum Ordering {
    /// Block on drop until the task has finished dropping
    Strict(mpsc::Receiver<()>),
    /// Don't block on drop, but the ordering will be unstructured
    Relaxed,
}

#[pin_project::pinned_drop]
impl<T: Send> PinnedDrop for TaskHandle<T> {
    fn drop(self: Pin<&mut Self>) {
        let this = self.project();

        // All work is done, nothing to do here.
        if *this.completed {
            return;
        }

        // Cancel the task.
        this.tokio_handle.abort();

        // If we're using "strict" ordering, we wait for the receiver to return.
        // It will only return once the sender in the task has dropped - causing an error.
        if let Ordering::Strict(receiver) = this.ordering {
            let result = receiver.recv();
            debug_assert!(
                result.is_err(),
                "The receiver should only ever complete if the sender has been dropped"
            );
        }
    }
}