structured_spawn/
lib.rs

1//! Structured async task spawn implementations for Tokio
2//!
3//! Read ["Tree-Structured
4//! Concurrency"](https://blog.yoshuawuyts.com/tree-structured-concurrency/) for
5//! a complete overview of what structured concurrency is, and how the popular
6//! implementations of `task::spawn` fail to implement it. This crate provides
7//! two flavors of structured spawn implementations for the Tokio async runtime:
8//!
9//! - **spawn_relaxed**: this implementation guarantees error propagation and
10//! cancellation propagation. But it does not guarantee ordering of operations.
11//! - **spawn**: this implementation guarantees error propagation, cancellation
12//! propagation, and ordering of operations. But it will block on drop to
13//! provide those guarantees. This may deadlock on single-threaded systems.
14//!
15//! In the majority of cases `spawn` should be preferred over `spawn_relaxed`,
16//! unless you're able to provide other ways of mitigating ordering issues, or
17//! you know for a fact that logical ordering of drop won't matter. In those
18//! case you should document those invariants and preferably test them too.
19//! In the future we hope that async Rust will gain the ability to implement
20//! some form of "async Drop", which would resolve the tension between waiting
21//! for destructors to complete asynchronously and not wanting to block inside
22//! drop.
23//!
24//! # Differences with `tokio::spawn`
25//!
26//! In Rust `Future`s are structured: if they're fallible they will always
27//! propoagate their errors, when dropped they will always propagate
28//! cancellation, and they will not return before all the work of their
29//! sub-futures has completed. Tasks on the other hand are not structured, since
30//! they've been modeled after `std::thread` rather than `std::future`. The
31//! solution we adopt in this crate is to instead of treating tasks as:
32//! _"`async/.await` versions of threads"_ we instead treat them as:
33//! _"parallelizable versions of futures"_. That means making the following
34//! changes:
35//!
36//! - `JoinHandle` has been renamed to `TaskHandle`  since tasks under this crate's
37//!   model are lazy, not eager
38//! - `TaskHandle`s are marked with `#[must_use]` to ensure they're awaited
39//! - Tasks won't start until their handle is `.await`ed
40//! - When a `TaskHandle` is dropped, the underlying task is cancelled
41//! - Cancelling a task will by default block until the cancellation of the task
42//!   has completed
43//! - In order for tasks to execute concurrently, you have to concurrently await
44//! the handles
45//! - Because the relationship between handles and their tasks is now
46//! guaranteed, tasks will no longer produce cancellation errors
47//!
48//! # Implementing Concurrency
49//!
50//! Because in this crate tasks behave like futures rather than threads, you
51//! have to poll the `TaskHandle`s concurrently in order to make progress on
52//! them - just like you would with regular futures. For this we recommend using
53//! the [`futures-concurrency`] library where possible, and [`futures-util`]
54//! where not. `futures-concurrency` provides composable async concurrency
55//! operations such as `join`, `race`, and `merge`. As well as fallible versions
56//! of those operations, and operations such as `zip` and `chain`. Here's an
57//! example of concurrently awaiting multiple async `TaskHandle`s:
58//!
59//! [`futures-concurrency`]: https://docs.rs/futures-concurrency
60//! [`futures-util`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html
61//!
62//! ```rust
63//! # #[tokio::main] async fn main() {
64//! use structured_spawn::spawn;
65//! use futures_concurrency::prelude::*;
66//!
67//! let mut handles = vec![];
68//! for n in 0..100 {
69//!     handles.push(spawn(async move { n * n }));   // 👈 Each task squares a number
70//! }
71//! let mut outputs: Vec<_> = handles.join().await;  // 👈 The tasks start executing here
72//! # assert_eq!(outputs.len(), 100);
73//! # }
74//! ```
75//!
76//! The `futures-concurrency` library does not yet implement concurrency
77//! operations for variable numbers of futures or streams. This is something
78//! we're actively exploring and will eventually be adding. For now it's
79//! recommended to instead use APIs such as [`FuturesUnordered`] from the
80//! `futures-util` library instead.
81//!
82//! [`FuturesUnordered`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html
83
84#![forbid(unsafe_code)]
85#![deny(missing_debug_implementations, nonstandard_style)]
86#![warn(missing_docs, future_incompatible, unreachable_pub)]
87
88use std::{
89    future::Future,
90    panic,
91    pin::Pin,
92    sync::mpsc,
93    task::{self, Poll},
94};
95
96/// Spawn a task on the tokio multi-threaded executor using "strict" semantics.
97///
98/// This function guarantees that the `TaskHandle` will not complete until the
99/// underlying task has completed - even in the case of an error. It achieves this
100/// by blocking inside the `TaskHandle`'s destructor until the future has been dropped.
101///
102/// This matches all of the required criteria for structured concurrency, and
103/// in doubt crate authors should prefer to use this function.
104///
105/// # Examples
106///
107/// ```
108/// use structured_spawn::spawn;
109///
110/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
111/// let value = 12usize;
112/// let handle = spawn(async move {
113///     value * value
114/// });
115/// assert_eq!(handle.await?, 144);
116/// # Ok(()) }
117/// ```
118pub fn spawn<T>(future: T) -> TaskHandle<T::Output>
119where
120    T: Future + Send + 'static,
121    T::Output: Send + 'static,
122{
123    let (sender, receiver) = mpsc::channel();
124    let ordering = Ordering::Strict(receiver);
125    let tokio_handle = tokio::task::spawn(async move {
126        let output = future.await;
127
128        // The future has completed, it's safe to drop the sender.
129        drop(sender);
130
131        output
132    });
133    TaskHandle {
134        tokio_handle,
135        ordering,
136        completed: false,
137    }
138}
139
140/// Spawn a task on the tokio multi-threaded executor using "relaxed" semantics.
141///
142/// The `TaskHandle` returned by this function does not provide any ordering
143/// guarantees with respect to the underlying task. This may lead to logical
144/// races, and authors are expected to manually synchronize ordering via other
145/// means.
146///
147/// # Examples
148///
149/// ```
150/// use structured_spawn::spawn_relaxed;
151///
152/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
153/// let value = 12usize;
154/// let handle = spawn_relaxed(async move {
155///     value * value
156/// });
157/// assert_eq!(handle.await?, 144);
158/// # Ok(()) }
159/// ```
160pub fn spawn_relaxed<T>(future: T) -> TaskHandle<T::Output>
161where
162    T: Future + Send + 'static,
163    T::Output: Send + 'static,
164{
165    TaskHandle {
166        tokio_handle: tokio::task::spawn(future),
167        ordering: Ordering::Relaxed,
168        completed: false,
169    }
170}
171
172/// A handle which references a task.
173///
174/// This handle behaves much like a regular future: it is marked as
175/// `#[must_use]`, it won't start doing work until `.await`ed, and when dropped
176/// will cancel the underlying task. See [`spawn`] or [`spawn_relaxed`] for more details.
177#[derive(Debug)]
178#[must_use = "Tasks do nothing unless `.await`ed"]
179#[pin_project::pin_project(PinnedDrop)]
180pub struct TaskHandle<T: Send> {
181    /// A handle to a tokio task
182    tokio_handle: tokio::task::JoinHandle<T>,
183    /// Which ordering guarantees do our tasks provide?
184    ordering: Ordering,
185    /// Did the task susccessfully complete?
186    completed: bool,
187}
188
189/// The future for the task handle. Either produces a value, or an error if the
190/// task has panicked during execution.
191impl<T: Send> Future for TaskHandle<T> {
192    type Output = std::thread::Result<T>;
193
194    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
195        let this = self.project();
196        match Pin::new(this.tokio_handle).poll(cx) {
197            task::Poll::Ready(value) => {
198                *this.completed = true;
199                match value {
200                    Ok(value) => Poll::Ready(Ok(value)),
201                    Err(err) => {
202                        if err.is_cancelled() {
203                            panic!("The `TaskHandle` should never be able to poll the future again after completion");
204                        } else if err.is_panic() {
205                            // Resume panicking
206                            Poll::Ready(Err(err.into_panic()))
207                        } else {
208                            unreachable!("The only way an error can be triggered from a handle is either through cancellation of panicking");
209                        }
210                    }
211                }
212            }
213            task::Poll::Pending => Poll::Pending,
214        }
215    }
216}
217
218/// Which ordering guarantees do our tasks provide?
219#[derive(Debug)]
220enum Ordering {
221    /// Block on drop until the task has finished dropping
222    Strict(mpsc::Receiver<()>),
223    /// Don't block on drop, but the ordering will be unstructured
224    Relaxed,
225}
226
227#[pin_project::pinned_drop]
228impl<T: Send> PinnedDrop for TaskHandle<T> {
229    fn drop(self: Pin<&mut Self>) {
230        let this = self.project();
231
232        // All work is done, nothing to do here.
233        if *this.completed {
234            return;
235        }
236
237        // Cancel the task.
238        this.tokio_handle.abort();
239
240        // If we're using "strict" ordering, we wait for the receiver to return.
241        // It will only return once the sender in the task has dropped - causing an error.
242        if let Ordering::Strict(receiver) = this.ordering {
243            let result = receiver.recv();
244            debug_assert!(
245                result.is_err(),
246                "The receiver should only ever complete if the sender has been dropped"
247            );
248        }
249    }
250}