1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
//! Enables controlled spawning of non-`'static` futures
//! when using the [async-std][async_std] executor. Note
//! that this idea is similar to `crossbeam::scope`, and
//! `rayon::scope` but asynchronous.
//!
//! ## Motivation
//!
//! Executors like async_std, tokio, etc. support spawning
//! `'static` futures onto a thread-pool. However, it is
//! often useful to spawn futures that may not be `'static`.
//!
//! While the future combinators such as
//! [`for_each_concurrent`][for_each_concurrent] offer
//! concurrency, they are bundled as a single [`Task`][Task]
//! structure by the executor, and hence are not driven
//! parallelly.
//!
//! ## Scope API
//!
//! We propose an API similar to
//! [`crossbeam::scope`](crossbeam::scope) to allow spawning
//! futures that are not `'static`. The key API is approximately:
//!
//! ``` rust, ignore
//! pub unsafe fn scope<'a, T: Send + 'static,
//!              F: FnOnce(&mut Scope<'a, T>)>(f: F)
//!              -> impl Stream {
//!     // ...
//! }
//! ```
//!
//! See [`scope`][scope] for the exact definition, and
//! safety guidelines. The simplest and safest API is
//! [`scope_and_block`][scope_and_block], used as follows:
//!
//! ``` rust
//! #[async_std::test]
//! async fn scoped_futures() {
//!     let not_copy = String::from("hello world!");
//!     let not_copy_ref = &not_copy;
//!     let (foo, outputs) = async_scoped::scope_and_block(|s| {
//!         for _ in 0..10 {
//!             let proc = || async {
//!                 assert_eq!(not_copy_ref, "hello world!");
//!             };
//!             s.spawn(proc());
//!         }
//!         42
//!     });
//!     assert_eq!(foo, 42);
//!     assert_eq!(outputs.len(), 10);
//! }
//! ```
//!
//! The [`scope_and_block`][scope_and_block] function above
//! blocks the current thread until all spawned futures are
//! driven in order to guarantee safety.
//!
//! We also provide an unsafe
//! [`scope_and_collect`][scope_and_collect], which is
//! asynchronous, and does not block the current thread.
//! However, the user should ensure that the returned future
//! _is not forgetten_ before being driven to completion.
//!
//! ## Cancellation
//!
//! To support cancellation, `Scope` provides a
//! [`spawn_cancellable`][spawn_cancellable] which wraps a
//! future to make it cancellable. When a `Scope` is
//! dropped, (or if `cancel` method is invoked), all the
//! cancellable futures are scheduled for cancellation. In
//! the next poll of the futures, they are dropped and a
//! default value (provided by a closure during spawn) is
//! returned as the output of the future.
//!
//! **Note:** this is an abrupt, hard cancellation. It also
//! requires a reasonable behaviour: futures that do not
//! return control to the executor cannot be cancelled once
//! it has started.
//!
//! ## Safety Considerations
//!
//! The [`scope`][scope] API provided in this crate is
//! unsafe as it is possible to `forget` the stream received
//! from the API without driving it to completion. The only
//! completely (without any additional assumptions) safe API
//! is the [`scope_and_block`][scope_and_block] function,
//! which _blocks the current thread_ until all spawned
//! futures complete.
//!
//! The [`scope_and_block`][scope_and_block] may not be
//! convenient in an asynchronous setting. In this case, the
//! [`scope_and_collect`][scope_and_collect] API may be
//! used. Care must be taken to ensure the returned future
//! is not forgotten before being driven to completion.
//!
//! Note that dropping this future will lead to it being
//! driven to completion, while blocking the current thread
//! to ensure safety. However, it is unsafe to forget this
//! future before it is fully driven.
//!
//! ## Implementation
//!
//! Our current implementation simply uses _unsafe_ glue to
//! `transmute` the lifetime, to actually spawn the futures
//! in the executor. The original lifetime is recorded in
//! the `Scope`. This allows the compiler to enforce the
//! necessary lifetime requirements as long as this returned
//! stream is not forgotten.
//!
//! For soundness, we drive the stream to completion in the
//! [`Drop`][Drop] impl. The current thread is blocked until
//! the stream is fully driven.
//!
//! Unfortunately, since the [`std::mem::forget`][forget]
//! method is allowed in safe Rust, the purely asynchronous
//! API here is _inherently unsafe_.
//!
//! [async-std]: async_std
//! [poll]: std::futures::Future::poll
//! [Task]: std::task::Task
//! [forget]: std::mem::forget
//! [Stream]: futures::Stream
//! [for_each_concurrent]: futures::StreamExt::for_each_concurrent
mod cancellable_future;
pub(crate) use cancellable_future::CancellableFuture;

mod scoped;
pub use scoped::Scope;

mod usage;
pub use usage::{scope, scope_and_block, scope_and_collect};

mod cancellation;
pub(crate) use cancellation::Cancellation;

#[cfg(test)]
mod tests;