Crate par_stream[][src]

Expand description

Asynchronous parallel streams analogous to rayon.

Cargo Features

The following cargo features select the backend runtime for concurrent workers. One of them must be specified, otherwise the crate raises a compile error.

  • runtime-tokio enables the tokio multi-threaded runtime.
  • runtime-async-std enables the async-std default runtime.
  • runtime-smol enables the smol default runtime.

Combinators

Usage

The crate provides extension traits to add new combinators to existing streams, that are targeted for parallel computing and concurrent data processing. Most traits can be found at prelude.

The extension traits can be imported from prelude.

use par_stream::prelude::*;

Parallel Processing

Distributing Patterns

Scatter-Gather Pattern

The combinators can construct a scatter-gather pattern that passes each to one of concurrent workers, and gathers the outputs together.


async fn main_async() {
    let orig = futures::stream::iter(0..1000);

    // scatter stream items to two receivers
    let rx1 = orig.scatter(None);
    let rx2 = rx1.clone();

    // gather back from two receivers
    let values: HashSet<_> = par_stream::gather(None, vec![rx1, rx2]).collect().await;

    // the gathered values have equal content with the original
    assert_eq!(values, (0..1000).collect::<HashSet<_>>());
}

Broadcast-Zip Pattern

Another example is to construct a broadcast-zip pattern that clones each element to several concurrent workers, and pairs up outputs from each worker.


async fn main_async() {
    let data = vec![2, -1, 3, 5];

    let mut guard = futures::stream::iter(data.clone()).broadcast(3);
    let rx1 = guard.register();
    let rx2 = guard.register();
    let rx3 = guard.register();
    guard.finish(); // the guard is dropped so that registered streams can start

    let join = rx1
        .map(|v| v * 2)
        .zip(rx2.map(|v| v * 3))
        .zip(rx3.map(|v| v * 5));

    let collected: Vec<_> = join.collect().await;
    assert_eq!(
        collected,
        vec![((4, 6), 10), ((-2, -3), -5), ((6, 9), 15), ((10, 15), 25)]
    );
}

Item Ordering

The item ordering combinators are usually combined with unordered concurrent processing methods, allowing on-demand data passing between stages.

stream
    // mark items with index numbers
    .enumerate()
    // a series of unordered maps
    .par_then_unordered(config, map_fn)
    .par_then_unordered(config, map_fn)
    .par_then_unordered(config, map_fn)
    // reorder the items back by indexes
    .reorder_enumerated()

Configure Number of Workers

The config parameter of stream.par_map(config, map_fn) controls the number of concurrent workers and internal buffer size. It accepts the following values.

  • None: The number of workers defaults to the number of system processors.
  • 10 or non-zero integers: 10 workers.
  • 2.5 or non-zero floating points: The number of worker is 2.5 times the system processors.
  • (10, 15): 10 workers and internal buffer size 15.

If the buffer size is not specified, the default is the double of number of workers.

Modules

Commonly used traits.

Structs

A stream combinator returned from batching().

The receiver type for batching function in batching().

The stream type that wraps over BatchingReceiver.

The sender type for batching function in batching().

The guard type returned from broadcast().

The receiver that consumes broadcasted messages from the stream.

A stream combinator returned from gather().

A stream combinator returned from map_spawned().

A stream combinator returned from par_batching_unordered().

A stream combinator returned from par_for_each() and its siblings.

A stream combinator returned from par_for_each_blocking() and its siblings.

A stream combinator returned from par_map() and its siblings.

A stream combinator returned from par_map_unordered() and its siblings.

A stream combinator returned from par_reduce().

A stream combinator returned from par_routing().

A stream combinator returned from par_routing_unordered().

Parallel stream configuration.

Parallel stream parameters.

A stream combinator returned from par_then() and its siblings.

A stream combinator returned from par_then_unordered() and its siblings.

A stream combinator returned from reorder_enumerated().

A stream combinator returned from scatter().

A stream combinator returned from sync_by_key().

A stream combinator returned from tee().

A stream combinator returned from then_spawned().

A fallible stream combinator returned from try_batching().

A fallible stream combinator returned from try_wrapping_enumerate().

A stream combinator returned from try_map_spawned().

A fallible stream combinator returned from try_par_batching_unordered().

A fallible stream combinator returned from try_par_for_each() and its siblings.

A fallible stream combinator returned from try_par_for_each() and its siblings.

A fallible stream combinator returned from try_par_map() and its siblings.

A fallible stream combinator returned from try_par_map_unordered() and its siblings.

A fallible stream combinator returned from try_par_then() and its siblings.

A fallible stream combinator returned from try_par_then_unordered() and its siblings.

A fallible stream combinator returned from try_reorder_enumerated().

A stream combinator returned from try_sync_by_key().

A fallible stream combinator returned from try_tee().

A stream combinator returned from try_then_spawned().

A fallible stream combinator returned from try_unfold_blocking().

A fallible stream combinator returned from try_wrapping_enumerate().

A stream combinator returned from unfold() or unfold_blocking().

A stream combinator returned from wrapping_enumerate().

Enums

Specifies an absolute value, a scaling factor, or a value determined in runtime.

Traits

An extension trait that controls ordering of items of fallible streams.

An extension trait that provides fallible combinators for parallel processing on streams.

An extension trait that controls ordering of stream items.

A conversion trait that converts an input type to parallel stream parameters.

An extension trait that provides parallel processing combinators on streams.

Functions

Collect multiple streams into single stream.

Creates a stream elements produced by multiple concurrent workers. It is a blocking analogous to par_unfold_unordered().

Creates a stream elements produced by multiple concurrent workers.

Synchronize streams by pairing up keys of each stream item.

Synchronize streams by pairing up keys of each stream item. It is fallible counterpart of sync_by_key.

A fallible analogue to unfold_blocking.

Creates a stream with elements produced by an asynchronous function.

Creates a stream with elements produced by a function.