Expand description

Concurrency extensions for Future and Stream.

Companion library for the “Futures Concurrency” blog post series.

The purpose of this library is to serve as a staging ground for what eventually may become the futures concurrency methods provided by the stdlib. While most of this library is compatible with stable Rust, some functions require nightly features. To use these functions, enable the unstable feature of this crate (requires a nightly compiler).

Examples

use futures_concurrency::prelude::*;
use futures_lite::future::block_on;
use std::future;

fn main() {
    block_on(async {
        // Await multiple similarly-typed futures.
        let a = future::ready(1);
        let b = future::ready(2);
        let c = future::ready(3);
        assert_eq!([a, b, c].join().await, [1, 2, 3]);
    
        // Await multiple differently-typed futures.
        let a = future::ready(1u8);
        let b = future::ready("hello");
        let c = future::ready(3u16);
        assert_eq!((a, b, c).join().await, (1, "hello", 3));

        // It even works with vectors of futures, providing an alternative
        // to futures-rs' `join_all`.
        let a = future::ready(1);
        let b = future::ready(2);
        let c = future::ready(3);
        assert_eq!(vec![a, b, c].join().await, vec![1, 2, 3]);
    })
}

Or merge multiple streams to handle values as soon as they’re ready, without ever dropping a single value:

use futures_concurrency::prelude::*;
use futures_lite::future::block_on;
use futures_lite::{stream, StreamExt};

fn main() {
    block_on(async {
        let a = stream::once(1);
        let b = stream::once(2);
        let c = stream::once(3);
        let mut s = (a, b, c).merge();

        let mut counter = 0;
        s.for_each(|n| counter += n).await;
        assert_eq!(counter, 6);
    })
}

Base Futures Concurrency

Often it’s desireable to await multiple futures as if it was a single future. The join family of operations converts multiple futures into a single future that returns all of their outputs. The race family of operations converts multiple future into a single future that returns the first output.

For operating on futures the following functions can be used:

NameReturn signatureWhen does it return?
Join(T1, T2)Wait for all to complete
RaceTReturn on value

Fallible Futures Concurrency

For operating on futures that return Result additional try_ variants of the functions mentioned before can be used. These functions are aware of Result, and will behave slightly differently from their base variants.

In the case of try_join, if any of the futures returns Err all futures are dropped and an error is returned. This is referred to as “short-circuiting”.

In the case of race_ok, instead of returning the future that completes it returns the first future that successfully completes. This means race_ok will keep going until any one of the futures returns Ok, or all futures have returned Err.

However sometimes it can be useful to use the base variants of the functions even on futures that return Result. Here is an overview of operations that work on Result, and their respective semantics:

NameReturn signatureWhen does it return?
Join(Result<T, E>, Result<T, E>)Wait for all to complete
TryJoinResult<(T1, T2), E>Return on Err, wait for all to complete
RaceResult<T, E>Return on value
RaceOkResult<T, E>Return on Ok, reject on last Err

Streams Concurrency

For streams we expose a single concurrency method: merge. This allows multiple streams to be merged into one, with items handled as soon as they’re ready.

By their nature streams can be short-circuited on a per-item basis, so we don’t need to decide up front how we want to handle errors.

NameReturn signatureWhen does it return?
MergeTEach value as soon as it’s ready.

Modules

Implementations for the array type.
The futures concurrency prelude.
Composable asynchronous iteration.
Implementations for the tuple type.
Implementations for the vec type.

Traits

Wait for the first successful future to complete.
Wait for all futures to complete.
Combines multiple streams into a single stream of all their outputs.
Wait for the first future to complete.
Wait for all futures to complete successfully, or abort early on error.