1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
//! Structured async task spawn implementations for Tokio
//!
//! Read ["Tree-Structured
//! Concurrency"](https://blog.yoshuawuyts.com/tree-structured-concurrency/) for
//! a complete overview of what structured concurrency is, and how the popular
//! implementations of `task::spawn` fail to implement it. This crate provides
//! two flavors of structured spawn implementations for the Tokio async runtime:
//!
//! - **spawn_relaxed**: this implementation guarantees error propagation and
//! cancellation propagation. But it does not guarantee ordering of operations.
//! - **spawn**: this implementation guarantees error propagation, cancellation
//! propagation, and ordering of operations. But it will block on drop to
//! provide those guarantees. This may deadlock on single-threaded systems.
//!
//! In the majority of cases `spawn` should be preferred over `spawn_relaxed`,
//! unless you're able to provide other ways of mitigating ordering issues, or
//! you know for a fact that logical ordering of drop won't matter. In those
//! case you should document those invariants and preferably test them too.
//! In the future we hope that async Rust will gain the ability to implement
//! some form of "async Drop", which would resolve the tension between waiting
//! for destructors to complete asynchronously and not wanting to block inside
//! drop.
//!
//! # Differences with `tokio::spawn`
//!
//! In Rust `Future`s are structured: if they're fallible they will always
//! propoagate their errors, when dropped they will always propagate
//! cancellation, and they will not return before all the work of their
//! sub-futures has completed. Tasks on the other hand are not structured, since
//! they've been modeled after `std::thread` rather than `std::future`. The
//! solution we adopt in this crate is to instead of treating tasks as:
//! _"`async/.await` versions of threads"_ we instead treat them as:
//! _"parallelizable versions of futures"_. That means making the following
//! changes:
//!
//! - `JoinHandle` has been renamed to `TaskHandle` since tasks under this crate's
//! model are lazy, not eager
//! - `TaskHandle`s are marked with `#[must_use]` to ensure they're awaited
//! - Tasks won't start until their handle is `.await`ed
//! - When a `TaskHandle` is dropped, the underlying task is cancelled
//! - Cancelling a task will by default block until the cancellation of the task
//! has completed
//! - In order for tasks to execute concurrently, you have to concurrently await
//! the handles
//! - Because the relationship between handles and their tasks is now
//! guaranteed, tasks will no longer produce cancellation errors
//!
//! # Implementing Concurrency
//!
//! Because in this crate tasks behave like futures rather than threads, you
//! have to poll the `TaskHandle`s concurrently in order to make progress on
//! them - just like you would with regular futures. For this we recommend using
//! the [`futures-concurrency`] library where possible, and [`futures-util`]
//! where not. `futures-concurrency` provides composable async concurrency
//! operations such as `join`, `race`, and `merge`. As well as fallible versions
//! of those operations, and operations such as `zip` and `chain`. Here's an
//! example of concurrently awaiting multiple async `TaskHandle`s:
//!
//! [`futures-concurrency`]: https://docs.rs/futures-concurrency
//! [`futures-util`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html
//!
//! ```rust
//! # #[tokio::main] async fn main() {
//! use structured_spawn::spawn;
//! use futures_concurrency::prelude::*;
//!
//! let mut handles = vec![];
//! for n in 0..100 {
//! handles.push(spawn(async move { n * n })); // 👈 Each task squares a number
//! }
//! let mut outputs: Vec<_> = handles.join().await; // 👈 The tasks start executing here
//! # assert_eq!(outputs.len(), 100);
//! # }
//! ```
//!
//! The `futures-concurrency` library does not yet implement concurrency
//! operations for variable numbers of futures or streams. This is something
//! we're actively exploring and will eventually be adding. For now it's
//! recommended to instead use APIs such as [`FuturesUnordered`] from the
//! `futures-util` library instead.
//!
//! [`FuturesUnordered`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html
#![forbid(unsafe_code)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, future_incompatible, unreachable_pub)]
use std::{
future::Future,
panic,
pin::Pin,
sync::mpsc,
task::{self, Poll},
};
/// Spawn a task on the tokio multi-threaded executor using "strict" semantics.
///
/// This function guarantees that the `TaskHandle` will not complete until the
/// underlying task has completed - even in the case of an error. It achieves this
/// by blocking inside the `TaskHandle`'s destructor until the future has been dropped.
///
/// This matches all of the required criteria for structured concurrency, and
/// in doubt crate authors should prefer to use this function.
///
/// # Examples
///
/// ```
/// use structured_spawn::spawn;
///
/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
/// let value = 12usize;
/// let handle = spawn(async move {
/// value * value
/// });
/// assert_eq!(handle.await?, 144);
/// # Ok(()) }
/// ```
pub fn spawn<T>(future: T) -> TaskHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (sender, receiver) = mpsc::channel();
let ordering = Ordering::Strict(receiver);
let tokio_handle = tokio::task::spawn(async move {
let output = future.await;
// The future has completed, it's safe to drop the sender.
drop(sender);
output
});
TaskHandle {
tokio_handle,
ordering,
completed: false,
}
}
/// Spawn a task on the tokio multi-threaded executor using "relaxed" semantics.
///
/// The `TaskHandle` returned by this function does not provide any ordering
/// guarantees with respect to the underlying task. This may lead to logical
/// races, and authors are expected to manually synchronize ordering via other
/// means.
///
/// # Examples
///
/// ```
/// use structured_spawn::spawn_relaxed;
///
/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
/// let value = 12usize;
/// let handle = spawn_relaxed(async move {
/// value * value
/// });
/// assert_eq!(handle.await?, 144);
/// # Ok(()) }
/// ```
pub fn spawn_relaxed<T>(future: T) -> TaskHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
TaskHandle {
tokio_handle: tokio::task::spawn(future),
ordering: Ordering::Relaxed,
completed: false,
}
}
/// A handle which references a task.
///
/// This handle behaves much like a regular future: it is marked as
/// `#[must_use]`, it won't start doing work until `.await`ed, and when dropped
/// will cancel the underlying task. See [`spawn`] or [`spawn_relaxed`] for more details.
#[derive(Debug)]
#[must_use = "Tasks do nothing unless `.await`ed"]
#[pin_project::pin_project(PinnedDrop)]
pub struct TaskHandle<T: Send> {
/// A handle to a tokio task
tokio_handle: tokio::task::JoinHandle<T>,
/// Which ordering guarantees do our tasks provide?
ordering: Ordering,
/// Did the task susccessfully complete?
completed: bool,
}
/// The future for the task handle. Either produces a value, or an error if the
/// task has panicked during execution.
impl<T: Send> Future for TaskHandle<T> {
type Output = std::thread::Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let this = self.project();
match Pin::new(this.tokio_handle).poll(cx) {
task::Poll::Ready(value) => {
*this.completed = true;
match value {
Ok(value) => Poll::Ready(Ok(value)),
Err(err) => {
if err.is_cancelled() {
panic!("The `TaskHandle` should never be able to poll the future again after completion");
} else if err.is_panic() {
// Resume panicking
Poll::Ready(Err(err.into_panic()))
} else {
unreachable!("The only way an error can be triggered from a handle is either through cancellation of panicking");
}
}
}
}
task::Poll::Pending => Poll::Pending,
}
}
}
/// Which ordering guarantees do our tasks provide?
#[derive(Debug)]
enum Ordering {
/// Block on drop until the task has finished dropping
Strict(mpsc::Receiver<()>),
/// Don't block on drop, but the ordering will be unstructured
Relaxed,
}
#[pin_project::pinned_drop]
impl<T: Send> PinnedDrop for TaskHandle<T> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
// All work is done, nothing to do here.
if *this.completed {
return;
}
// If we're using "strict" ordering, we wait for the receiver to return.
// It will only return once the sender in the task has dropped - causing an error.
if let Ordering::Strict(receiver) = this.ordering {
let result = receiver.recv();
debug_assert!(
result.is_err(),
"The receiver should only ever complete if the sender has been dropped"
);
}
}
}