structured_spawn/lib.rs
1//! Structured async task spawn implementations for Tokio
2//!
3//! Read ["Tree-Structured
4//! Concurrency"](https://blog.yoshuawuyts.com/tree-structured-concurrency/) for
5//! a complete overview of what structured concurrency is, and how the popular
6//! implementations of `task::spawn` fail to implement it. This crate provides
7//! two flavors of structured spawn implementations for the Tokio async runtime:
8//!
9//! - **spawn_relaxed**: this implementation guarantees error propagation and
10//! cancellation propagation. But it does not guarantee ordering of operations.
11//! - **spawn**: this implementation guarantees error propagation, cancellation
12//! propagation, and ordering of operations. But it will block on drop to
13//! provide those guarantees. This may deadlock on single-threaded systems.
14//!
15//! In the majority of cases `spawn` should be preferred over `spawn_relaxed`,
16//! unless you're able to provide other ways of mitigating ordering issues, or
17//! you know for a fact that logical ordering of drop won't matter. In those
18//! case you should document those invariants and preferably test them too.
19//! In the future we hope that async Rust will gain the ability to implement
20//! some form of "async Drop", which would resolve the tension between waiting
21//! for destructors to complete asynchronously and not wanting to block inside
22//! drop.
23//!
24//! # Differences with `tokio::spawn`
25//!
26//! In Rust `Future`s are structured: if they're fallible they will always
27//! propoagate their errors, when dropped they will always propagate
28//! cancellation, and they will not return before all the work of their
29//! sub-futures has completed. Tasks on the other hand are not structured, since
30//! they've been modeled after `std::thread` rather than `std::future`. The
31//! solution we adopt in this crate is to instead of treating tasks as:
32//! _"`async/.await` versions of threads"_ we instead treat them as:
33//! _"parallelizable versions of futures"_. That means making the following
34//! changes:
35//!
36//! - `JoinHandle` has been renamed to `TaskHandle` since tasks under this crate's
37//! model are lazy, not eager
38//! - `TaskHandle`s are marked with `#[must_use]` to ensure they're awaited
39//! - Tasks won't start until their handle is `.await`ed
40//! - When a `TaskHandle` is dropped, the underlying task is cancelled
41//! - Cancelling a task will by default block until the cancellation of the task
42//! has completed
43//! - In order for tasks to execute concurrently, you have to concurrently await
44//! the handles
45//! - Because the relationship between handles and their tasks is now
46//! guaranteed, tasks will no longer produce cancellation errors
47//!
48//! # Implementing Concurrency
49//!
50//! Because in this crate tasks behave like futures rather than threads, you
51//! have to poll the `TaskHandle`s concurrently in order to make progress on
52//! them - just like you would with regular futures. For this we recommend using
53//! the [`futures-concurrency`] library where possible, and [`futures-util`]
54//! where not. `futures-concurrency` provides composable async concurrency
55//! operations such as `join`, `race`, and `merge`. As well as fallible versions
56//! of those operations, and operations such as `zip` and `chain`. Here's an
57//! example of concurrently awaiting multiple async `TaskHandle`s:
58//!
59//! [`futures-concurrency`]: https://docs.rs/futures-concurrency
60//! [`futures-util`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html
61//!
62//! ```rust
63//! # #[tokio::main] async fn main() {
64//! use structured_spawn::spawn;
65//! use futures_concurrency::prelude::*;
66//!
67//! let mut handles = vec![];
68//! for n in 0..100 {
69//! handles.push(spawn(async move { n * n })); // 👈 Each task squares a number
70//! }
71//! let mut outputs: Vec<_> = handles.join().await; // 👈 The tasks start executing here
72//! # assert_eq!(outputs.len(), 100);
73//! # }
74//! ```
75//!
76//! The `futures-concurrency` library does not yet implement concurrency
77//! operations for variable numbers of futures or streams. This is something
78//! we're actively exploring and will eventually be adding. For now it's
79//! recommended to instead use APIs such as [`FuturesUnordered`] from the
80//! `futures-util` library instead.
81//!
82//! [`FuturesUnordered`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html
83
84#![forbid(unsafe_code)]
85#![deny(missing_debug_implementations, nonstandard_style)]
86#![warn(missing_docs, future_incompatible, unreachable_pub)]
87
88use std::{
89 future::Future,
90 panic,
91 pin::Pin,
92 sync::mpsc,
93 task::{self, Poll},
94};
95
96/// Spawn a task on the tokio multi-threaded executor using "strict" semantics.
97///
98/// This function guarantees that the `TaskHandle` will not complete until the
99/// underlying task has completed - even in the case of an error. It achieves this
100/// by blocking inside the `TaskHandle`'s destructor until the future has been dropped.
101///
102/// This matches all of the required criteria for structured concurrency, and
103/// in doubt crate authors should prefer to use this function.
104///
105/// # Examples
106///
107/// ```
108/// use structured_spawn::spawn;
109///
110/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
111/// let value = 12usize;
112/// let handle = spawn(async move {
113/// value * value
114/// });
115/// assert_eq!(handle.await?, 144);
116/// # Ok(()) }
117/// ```
118pub fn spawn<T>(future: T) -> TaskHandle<T::Output>
119where
120 T: Future + Send + 'static,
121 T::Output: Send + 'static,
122{
123 let (sender, receiver) = mpsc::channel();
124 let ordering = Ordering::Strict(receiver);
125 let tokio_handle = tokio::task::spawn(async move {
126 let output = future.await;
127
128 // The future has completed, it's safe to drop the sender.
129 drop(sender);
130
131 output
132 });
133 TaskHandle {
134 tokio_handle,
135 ordering,
136 completed: false,
137 }
138}
139
140/// Spawn a task on the tokio multi-threaded executor using "relaxed" semantics.
141///
142/// The `TaskHandle` returned by this function does not provide any ordering
143/// guarantees with respect to the underlying task. This may lead to logical
144/// races, and authors are expected to manually synchronize ordering via other
145/// means.
146///
147/// # Examples
148///
149/// ```
150/// use structured_spawn::spawn_relaxed;
151///
152/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
153/// let value = 12usize;
154/// let handle = spawn_relaxed(async move {
155/// value * value
156/// });
157/// assert_eq!(handle.await?, 144);
158/// # Ok(()) }
159/// ```
160pub fn spawn_relaxed<T>(future: T) -> TaskHandle<T::Output>
161where
162 T: Future + Send + 'static,
163 T::Output: Send + 'static,
164{
165 TaskHandle {
166 tokio_handle: tokio::task::spawn(future),
167 ordering: Ordering::Relaxed,
168 completed: false,
169 }
170}
171
172/// A handle which references a task.
173///
174/// This handle behaves much like a regular future: it is marked as
175/// `#[must_use]`, it won't start doing work until `.await`ed, and when dropped
176/// will cancel the underlying task. See [`spawn`] or [`spawn_relaxed`] for more details.
177#[derive(Debug)]
178#[must_use = "Tasks do nothing unless `.await`ed"]
179#[pin_project::pin_project(PinnedDrop)]
180pub struct TaskHandle<T: Send> {
181 /// A handle to a tokio task
182 tokio_handle: tokio::task::JoinHandle<T>,
183 /// Which ordering guarantees do our tasks provide?
184 ordering: Ordering,
185 /// Did the task susccessfully complete?
186 completed: bool,
187}
188
189/// The future for the task handle. Either produces a value, or an error if the
190/// task has panicked during execution.
191impl<T: Send> Future for TaskHandle<T> {
192 type Output = std::thread::Result<T>;
193
194 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
195 let this = self.project();
196 match Pin::new(this.tokio_handle).poll(cx) {
197 task::Poll::Ready(value) => {
198 *this.completed = true;
199 match value {
200 Ok(value) => Poll::Ready(Ok(value)),
201 Err(err) => {
202 if err.is_cancelled() {
203 panic!("The `TaskHandle` should never be able to poll the future again after completion");
204 } else if err.is_panic() {
205 // Resume panicking
206 Poll::Ready(Err(err.into_panic()))
207 } else {
208 unreachable!("The only way an error can be triggered from a handle is either through cancellation of panicking");
209 }
210 }
211 }
212 }
213 task::Poll::Pending => Poll::Pending,
214 }
215 }
216}
217
218/// Which ordering guarantees do our tasks provide?
219#[derive(Debug)]
220enum Ordering {
221 /// Block on drop until the task has finished dropping
222 Strict(mpsc::Receiver<()>),
223 /// Don't block on drop, but the ordering will be unstructured
224 Relaxed,
225}
226
227#[pin_project::pinned_drop]
228impl<T: Send> PinnedDrop for TaskHandle<T> {
229 fn drop(self: Pin<&mut Self>) {
230 let this = self.project();
231
232 // All work is done, nothing to do here.
233 if *this.completed {
234 return;
235 }
236
237 // Cancel the task.
238 this.tokio_handle.abort();
239
240 // If we're using "strict" ordering, we wait for the receiver to return.
241 // It will only return once the sender in the task has dropped - causing an error.
242 if let Ordering::Strict(receiver) = this.ordering {
243 let result = receiver.recv();
244 debug_assert!(
245 result.is_err(),
246 "The receiver should only ever complete if the sender has been dropped"
247 );
248 }
249 }
250}