async_scoped/
lib.rs

1//! Enables controlled spawning of non-`'static` futures
2//! when using the [async-std] or [tokio]
3//! executors. Note that this idea is similar to
4//! `crossbeam::scope`, and `rayon::scope` but asynchronous.
5//!
6//! ## Motivation
7//!
8//! Executors like async_std, tokio, etc. support spawning
9//! `'static` futures onto a thread-pool. However, it is
10//! often useful to spawn futures that may not be `'static`.
11//!
12//! While the future combinators such as
13//! [`for_each_concurrent`][for_each_concurrent] offer
14//! concurrency, they are bundled as a single [`Task`][Task]
15//! structure by the executor, and hence are not driven
16//! in parallel.
17//!
18//! ## Scope API
19//!
20//! We propose an API similar to
21//! [`crossbeam::scope`](crossbeam::scope) to allow spawning
22//! futures that are not `'static`. The key API is approximately:
23//!
24//! ``` rust, ignore
25//! pub unsafe fn scope<'a, T: Send + 'static,
26//!              F: FnOnce(&mut TokioScope<'a, T>)>(f: F)
27//!              -> impl Stream {
28//!     // ...
29//! }
30//! ```
31//!
32//! See [`scope`][Scope::scope] for the exact definition, and
33//! safety guidelines. The simplest and safest API is
34//! [`scope_and_block`][Scope::scope_and_block], used as follows:
35//!
36//! ``` rust, ignore
37//! async fn scoped_futures() {
38//!     let not_copy = String::from("hello world!");
39//!     let not_copy_ref = &not_copy;
40//!     let (foo, outputs) = async_scoped::AsyncStdScope::scope_and_block(|s| {
41//!         for _ in 0..10 {
42//!             let proc = || async {
43//!                 assert_eq!(not_copy_ref, "hello world!");
44//!                 eprintln!("Hello world!")
45//!             };
46//!             s.spawn(proc());
47//!         }
48//!         42
49//!     });
50//!     assert_eq!(foo, 42);
51//!     assert_eq!(outputs.len(), 10);
52//! }
53//! ```
54//!
55//! The [`scope_and_block`][Scope::scope_and_block] function above
56//! blocks the current thread until all spawned futures are
57//! driven in order to guarantee safety.
58//!
59//! We also provide an unsafe
60//! [`scope_and_collect`][Scope::scope_and_collect], which is
61//! asynchronous, and does not block the current thread.
62//! However, the user should ensure that the returned future
63//! _is not forgetten_ before being driven to completion.
64//!
65//! ## Executor Selection
66//!
67//! Users may use "use-async-std", or the
68//! "use-tokio" features to enable specific executor implementations.
69//! Those are not necessary, you may freely implement traits `Spawner`, `Blocker`, etc for your own
70//! runtime. Just ensure you follow the safety idea.
71//!
72//! Some notes on default implementations:
73//! 1. [`AsyncScope`] may run into a dead-lock if used in
74//! deep recursions (depth > #num-cores / 2).
75//!
76//! 2. [`TokioScope::scope_and_block`][Scope::scope_and_block] may only be used
77//! within a multi-threaded. An incompletely driven
78//! `TokioScope` also needs a multi-threaded context to be
79//! dropped.
80//!
81//! ## Cancellation
82//!
83//! To support cancellation, `Scope` provides a
84//! [`spawn_cancellable`][Scope::spawn_cancellable] which wraps a
85//! future to make it cancellable. When a `Scope` is
86//! dropped, (or if `cancel` method is invoked), all the
87//! cancellable futures are scheduled for cancellation. In
88//! the next poll of the futures, they are dropped and a
89//! default value (provided by a closure during spawn) is
90//! returned as the output of the future.
91//!
92//! **Note:** this is an abrupt, hard cancellation. It also
93//! requires a reasonable behaviour: futures that do not
94//! return control to the executor cannot be cancelled once
95//! it has started.
96//!
97//! ## Safety Considerations
98//!
99//! The [`scope`][Scope::scope] API provided in this crate is
100//! unsafe as it is possible to `forget` the stream received
101//! from the API without driving it to completion. The only
102//! completely (without any additional assumptions) safe API
103//! is the [`scope_and_block`][Scope::scope_and_block] function,
104//! which _blocks the current thread_ until all spawned
105//! futures complete.
106//!
107//! The [`scope_and_block`][Scope::scope_and_block] may not be
108//! convenient in an asynchronous setting. In this case, the
109//! [`scope_and_collect`][Scope::scope_and_collect] API may be
110//! used. Care must be taken to ensure the returned future
111//! is not forgotten before being driven to completion.
112//!
113//! Note that dropping this future will lead to it being
114//! driven to completion, while blocking the current thread
115//! to ensure safety. However, it is unsafe to forget this
116//! future before it is fully driven.
117//!
118//! ## Implementation
119//!
120//! Our current implementation simply uses _unsafe_ glue to
121//! `transmute` the lifetime, to actually spawn the futures
122//! in the executor. The original lifetime is recorded in
123//! the `Scope`. This allows the compiler to enforce the
124//! necessary lifetime requirements as long as this returned
125//! stream is not forgotten.
126//!
127//! For soundness, we drive the stream to completion in the
128//! [`Drop`][Drop] impl. The current thread is blocked until
129//! the stream is fully driven.
130//!
131//! Unfortunately, since the [`std::mem::forget`][forget]
132//! method is allowed in safe Rust, the purely asynchronous
133//! API here is _inherently unsafe_.
134//!
135//! [async-std]: /async_std
136//! [tokio]: /tokio
137//! [poll]: std::futures::Future::poll
138//! [Task]: std::task
139//! [forget]: std::mem::forget
140//! [Stream]: futures::Stream
141//! [for_each_concurrent]: futures::StreamExt::for_each_concurrent
142
143#[macro_use]
144mod utils;
145
146mod scoped;
147pub use scoped::Scope;
148
149#[cfg(feature = "use-tokio")]
150pub type TokioScope<'a, T> = Scope<'a, T, spawner::use_tokio::Tokio>;
151
152#[cfg(feature = "use-async-std")]
153pub type AsyncStdScope<'a, T> = Scope<'a, T, spawner::use_async_std::AsyncStd>;
154
155pub mod spawner;
156mod usage;
157
158#[cfg(test)]
159mod tests;