Expand description

Concurrency extensions for Future and Stream.

Companion library for the “Futures Concurrency” blog post series.

The purpose of this library is to serve as a staging ground for what eventually may become the futures concurrency methods provided by the stdlib. While most of this library is compatible with stable Rust, some functions require nightly features. To use these functions, enable the unstable feature of this crate (requires a nightly compiler).

Examples

use futures_concurrency::prelude::*;
use futures_lite::future::block_on;
use std::future;

fn main() {
    block_on(async {
        // Await multiple similarly-typed futures.
        let a = future::ready(1);
        let b = future::ready(2);
        let c = future::ready(3);
        assert_eq!([a, b, c].join().await, [1, 2, 3]);
    
        // Await multiple differently-typed futures.
        let a = future::ready(1u8);
        let b = future::ready("hello");
        let c = future::ready(3u16);
        assert_eq!((a, b, c).join().await, (1, "hello", 3));

        // It even works with vectors of futures, providing an alternative
        // to futures-rs' `join_all`.
        let a = future::ready(1);
        let b = future::ready(2);
        let c = future::ready(3);
        assert_eq!(vec![a, b, c].join().await, vec![1, 2, 3]);
    })
}

Or merge multiple streams to handle values as soon as they’re ready, without ever dropping a single value:

use futures_concurrency::prelude::*;
use futures_lite::future::block_on;
use futures_lite::{stream, StreamExt};

fn main() {
    block_on(async {
        let a = stream::once(1);
        let b = stream::once(2);
        let c = stream::once(3);
        let mut s = (a, b, c).merge();

        let mut counter = 0;
        s.for_each(|n| counter += n).await;
        assert_eq!(counter, 6);
    })
}

Progress

The following traits have been implemented.

  • Join (futures)
  • Merge (streams)
  • TryJoin (futures)
  • First (futures)
  • FirstOk (futures)

Base Futures Concurrency

Often it’s desireable to await multiple futures as if it was a single future. The join family of operations converts multiple futures into a single future that returns all of their outputs. The race family of operations converts multiple future into a single future that returns the first output.

For operating on futures the following functions can be used:

NameReturn signatureWhen does it return?
Join(T1, T2)Wait for all to complete
FirstTReturn on first value

Fallible Futures Concurrency

For operating on futures that return Result additional try_ variants of the functions mentioned before can be used. These functions are aware of Result, and will behave slightly differently from their base variants.

In the case of try_join, if any of the futures returns Err all futures are dropped and an error is returned. This is referred to as “short-circuiting”.

In the case of first_ok, instead of returning the first future that completes it returns the first future that successfully completes. This means first_ok will keep going until any one of the futures returns Ok, or all futures have returned Err.

However sometimes it can be useful to use the base variants of the functions even on futures that return Result. Here is an overview of operations that work on Result, and their respective semantics:

NameReturn signatureWhen does it return?
Join(Result<T, E>, Result<T, E>)Wait for all to complete
TryJoinResult<(T1, T2), E>Return on first Err, wait for all to complete
FirstResult<T, E>Return on first value
FirstOkResult<T, E>Return on first Ok, reject on last Err

Streams Concurrency

For streams we expose a single concurrency method: merge. This allows multiple streams to be merged into one, with items handled as soon as they’re ready.

By their nature streams can be short-circuited on a per-item basis, so we don’t need to decide up front how we want to handle errors.

NameReturn signatureWhen does it return?
MergeTEach value as soon as it’s ready.

Modules

Implementations for the array type.

Asynchronous values.

The futures concurrency prelude.

Composable asynchronous iteration.

Implementations for the tuple type.

Implementations for the vec type.

Traits

Wait for multiple futures to complete.

Combines multiple streams into a single stream of all their outputs.