[][src]Crate async_scoped

Enables controlled spawning of non-'static futures when using the async-std executor. Note that this idea is exactly as used in crossbeam::scope, and rayon::scope.

Motivation

Executors like async_std, tokio, etc. support spawning 'static futures onto a thread-pool. However, it is often useful to spawn a stream of futures that may not all be 'static.

While the future combinators such as for_each_concurrent offer concurrency, they are bundled as a single Task structure by the executor, and hence are not driven parallelly.

Scope API

We propose an API similar to crossbeam::scope to allow controlled spawning of futures that are not 'static. The key function is:

This example is not tested
pub unsafe fn scope<'a, T: Send + 'static,
             F: FnOnce(&mut Scope<'a, T>)>(f: F)
             -> impl Stream {
    // ...
}

This function is used as follows:

#[async_std::test]
async fn scoped_futures() {
    let not_copy = String::from("hello world!");
    let not_copy_ref = &not_copy;
    let (foo, outputs) = crate::scope_and_block!(|s| {
        for _ in 0..10 {
            let proc = || async {
                assert_eq!(not_copy_ref, "hello world!");
            };
            s.spawn(proc());
        }
        42
    });
    assert_eq!(foo, 42);
    assert_eq!(outputs.len(), 10);
}

Safety Considerations

The scope API provided in this crate is inherently unsafe. Here, we list the key reasons for unsafety, towards identifying a safe usage (facilitated only by scope_and_block! macro).

  1. Since safe Rust allows forget-ting the returned Stream, the onus of actually driving it to completion is on the user. Thus the scope function is inherently unsafe.

  2. The spawned future must be dropped immediately after completion as it may contain references that are soon to expire. This is the behaviour of async-std, and the safety of the API here relies upon it.

  3. The parent future hosting the values referred to by the spawned futures must not be moved while the spawned futures are running. To the best of our understanding, the compiler should deduce that an async function that uses scope is [!Unpin][!Unpin].

  4. The Task containing the parent Stream must not drop the future before completion. This is generally true, but must hold even if another associated future (running in the same Task) panics! Again, we hope this will not happen within poll because of above [!Unpin][!Unpin] considerations.

Implementation

Our current implementation simply uses unsafe glue to actually spawn the futures. Then, it records the lifetime of the futures in the returned Stream object.

Currently, for soundness, we panic! if the stream is dropped before it is fully driven. Another option (not implemented here), may be to drive the stream using a current-thread executor inside the Drop impl.

Unfortunately, since the std::mem::forget method is safe, the API here is inherently unsafe.

Macros

scope_and_block

A macro that creates a scope and immediately awaits, blocking the current thread for spawned futures to complete. The outputs of the futures are collected as a Vec and returned along with the output of the block.

unsafe_scope_and_collect

A macro that creates a scope and immediately awaits the stream. The outputs of the futures are collected as a Vec and returned along with the output of the block. This macro must be invoked within an async block.

unsafe_scope_and_iterate

A macro that creates a scope and immediately awaits the stream, and sends it through an FnMut. It takes two args, the first that spawns the futures, and the second is the function to call on the stream. This macro must be invoked within an async block.

Structs

Scope

A scope to allow controlled spawning of non 'static futures.

VerifiedStream

A stream wrapper that ensures the underlying stream is driven to completion before being dropped. The implementation panics if the stream is incomplete.

Functions

scope

Creates a Scope to spawn non-'static futures. The function is called with a block which takes an &mut Scope. The spawn method on this arg. can be used to spawn "local" futures.