1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
//! Structured async task spawn implementations for Tokio
//!
//! Read ["Tree-Structured
//! Concurrency"](https://blog.yoshuawuyts.com/tree-structured-concurrency/) for
//! a complete overview of what structured concurrency is, and how the popular
//! implementations of `task::spawn` fail to implement it. This crate provides
//! two flavors of structured spawn implementations for the Tokio async runtime:
//!
//! - **spawn_relaxed**: this implementation guarantees error propagation and
//! cancellation propagation. But it does not guarantee ordering of operations.
//! - **spawn**: this implementation guarantees error propagation, cancellation
//! propagation, and ordering of operations. But it will block on drop to
//! provide those guarantees. This may deadlock on single-threaded systems.
//!
//! In the majority of cases `spawn` should be preferred over `spawn_relaxed`,
//! unless you're able to provide other ways of mitigating ordering issues, or
//! you know for a fact that logical ordering of drop won't matter. In those
//! case you should document those invariants and preferably test them too.
//! In the future we hope that async Rust will gain the ability to implement
//! some form of "async Drop", which would resolve the tension between waiting
//! for destructors to complete asynchronously and not wanting to block inside
//! drop.
//!
//! # Differences with `tokio::spawn`
//!
//! In Rust `Future`s are structured: if they're fallible they will always
//! propoagate their errors, when dropped they will always propagate
//! cancellation, and they will not return before all the work of their
//! sub-futures has completed. Tasks on the other hand are not structured, since
//! they've been modeled after `std::thread` rather than `std::future`. The
//! solution we adopt in this crate is to instead of treating tasks as:
//! _"`async/.await` versions of threads"_ we instead treat them as:
//! _"parallelizable versions of futures"_. That means making the following
//! changes:
//!
//! - `JoinHandle` has been renamed to `TaskHandle`  since tasks under this crate's
//!   model are lazy, not eager
//! - `TaskHandle`s are marked with `#[must_use]` to ensure they're awaited
//! - Tasks won't start until their handle is `.await`ed
//! - When a `TaskHandle` is dropped, the underlying task is cancelled
//! - Cancelling a task will by default block until the cancellation of the task
//!   has completed
//! - In order for tasks to execute concurrently, you have to concurrently await
//! the handles
//! - Because the relationship between handles and their tasks is now
//! guaranteed, tasks will no longer produce cancellation errors
//!
//! # Implementing Concurrency
//!
//! Because in this crate tasks behave like futures rather than threads, you
//! have to poll the `TaskHandle`s concurrently in order to make progress on
//! them - just like you would with regular futures. For this we recommend using
//! the [`futures-concurrency`] library where possible, and [`futures-util`]
//! where not. `futures-concurrency` provides composable async concurrency
//! operations such as `join`, `race`, and `merge`. As well as fallible versions
//! of those operations, and operations such as `zip` and `chain`. Here's an
//! example of concurrently awaiting multiple async `TaskHandle`s:
//!
//! [`futures-concurrency`]: https://docs.rs/futures-concurrency
//! [`futures-util`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html
//!
//! ```rust
//! # #[tokio::main] async fn main() {
//! use structured_spawn::spawn;
//! use futures_concurrency::prelude::*;
//!
//! let mut handles = vec![];
//! for n in 0..100 {
//!     handles.push(spawn(async move { n * n }));   // 👈 Each task squares a number
//! }
//! let mut outputs: Vec<_> = handles.join().await;  // 👈 The tasks start executing here
//! # assert_eq!(outputs.len(), 100);
//! # }
//! ```
//!
//! The `futures-concurrency` library does not yet implement concurrency
//! operations for variable numbers of futures or streams. This is something
//! we're actively exploring and will eventually be adding. For now it's
//! recommended to instead use APIs such as [`FuturesUnordered`] from the
//! `futures-util` library instead.
//!
//! [`FuturesUnordered`]: https://docs.rs/futures-util/latest/futures_util/stream/struct.FuturesUnordered.html

#![forbid(unsafe_code)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, future_incompatible, unreachable_pub)]

use std::{
    future::Future,
    panic,
    pin::Pin,
    sync::mpsc,
    task::{self, Poll},
};

/// Spawn a task on the tokio multi-threaded executor using "strict" semantics.
///
/// This function guarantees that the `TaskHandle` will not complete until the
/// underlying task has completed - even in the case of an error. It achieves this
/// by blocking inside the `TaskHandle`'s destructor until the future has been dropped.
///
/// This matches all of the required criteria for structured concurrency, and
/// in doubt crate authors should prefer to use this function.
///
/// # Examples
///
/// ```
/// use structured_spawn::spawn;
///
/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
/// let value = 12usize;
/// let handle = spawn(async move {
///     value * value
/// });
/// assert_eq!(handle.await?, 144);
/// # Ok(()) }
/// ```
pub fn spawn<T>(future: T) -> TaskHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    let (sender, receiver) = mpsc::channel();
    let ordering = Ordering::Strict(receiver);
    let tokio_handle = tokio::task::spawn(async move {
        let output = future.await;

        // The future has completed, it's safe to drop the sender.
        drop(sender);

        output
    });
    TaskHandle {
        tokio_handle,
        ordering,
        completed: false,
    }
}

/// Spawn a task on the tokio multi-threaded executor using "relaxed" semantics.
///
/// The `TaskHandle` returned by this function does not provide any ordering
/// guarantees with respect to the underlying task. This may lead to logical
/// races, and authors are expected to manually synchronize ordering via other
/// means.
///
/// # Examples
///
/// ```
/// use structured_spawn::spawn_relaxed;
///
/// # #[tokio::main] async fn main() -> std::thread::Result<()> {
/// let value = 12usize;
/// let handle = spawn_relaxed(async move {
///     value * value
/// });
/// assert_eq!(handle.await?, 144);
/// # Ok(()) }
/// ```
pub fn spawn_relaxed<T>(future: T) -> TaskHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    TaskHandle {
        tokio_handle: tokio::task::spawn(future),
        ordering: Ordering::Relaxed,
        completed: false,
    }
}

/// A handle which references a task.
///
/// This handle behaves much like a regular future: it is marked as
/// `#[must_use]`, it won't start doing work until `.await`ed, and when dropped
/// will cancel the underlying task. See [`spawn`] or [`spawn_relaxed`] for more details.
#[derive(Debug)]
#[must_use = "Tasks do nothing unless `.await`ed"]
#[pin_project::pin_project(PinnedDrop)]
pub struct TaskHandle<T: Send> {
    /// A handle to a tokio task
    tokio_handle: tokio::task::JoinHandle<T>,
    /// Which ordering guarantees do our tasks provide?
    ordering: Ordering,
    /// Did the task susccessfully complete?
    completed: bool,
}

/// The future for the task handle. Either produces a value, or an error if the
/// task has panicked during execution.
impl<T: Send> Future for TaskHandle<T> {
    type Output = std::thread::Result<T>;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        let this = self.project();
        match Pin::new(this.tokio_handle).poll(cx) {
            task::Poll::Ready(value) => {
                *this.completed = true;
                match value {
                    Ok(value) => Poll::Ready(Ok(value)),
                    Err(err) => {
                        if err.is_cancelled() {
                            panic!("The `TaskHandle` should never be able to poll the future again after completion");
                        } else if err.is_panic() {
                            // Resume panicking
                            Poll::Ready(Err(err.into_panic()))
                        } else {
                            unreachable!("The only way an error can be triggered from a handle is either through cancellation of panicking");
                        }
                    }
                }
            }
            task::Poll::Pending => Poll::Pending,
        }
    }
}

/// Which ordering guarantees do our tasks provide?
#[derive(Debug)]
enum Ordering {
    /// Block on drop until the task has finished dropping
    Strict(mpsc::Receiver<()>),
    /// Don't block on drop, but the ordering will be unstructured
    Relaxed,
}

#[pin_project::pinned_drop]
impl<T: Send> PinnedDrop for TaskHandle<T> {
    fn drop(self: Pin<&mut Self>) {
        let this = self.project();

        // All work is done, nothing to do here.
        if *this.completed {
            return;
        }

        // If we're using "strict" ordering, we wait for the receiver to return.
        // It will only return once the sender in the task has dropped - causing an error.
        if let Ordering::Strict(receiver) = this.ordering {
            let result = receiver.recv();
            debug_assert!(
                result.is_err(),
                "The receiver should only ever complete if the sender has been dropped"
            );
        }
    }
}