pub trait ParStreamExt where
    Self: 'static + Send + Stream,
    Self::Item: 'static + Send
{
Show 14 methods fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
    where
        B: Into<BufSize>
;
fn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T>
    where
        B: Into<BufSize>,
        T: Send,
        F: 'static + Send + FnMut(Self::Item) -> T
;
fn pull_routing<B, K, Q, F>(
        self,
        buf_size: B,
        key_fn: F
    ) -> PullBuilder<Self, K, F, Q>
    where
        Self: 'static + Send + Stream,
        Self::Item: 'static + Send,
        F: 'static + Send + FnMut(&Self::Item) -> Q,
        K: 'static + Send + Hash + Eq + Borrow<Q>,
        Q: Send + Hash + Eq,
        B: Into<BufSize>
;
fn par_builder(self) -> ParBuilder<Self>;
fn par_batching<T, P, F, Fut>(
        self,
        params: P,
        f: F
    ) -> RecvStream<'static, T>
    where
        Self: Sized,
        F: 'static + Send + Clone + FnMut(usize, Receiver<Self::Item>) -> Fut,
        Fut: 'static + Future<Output = Option<(T, Receiver<Self::Item>)>> + Send,
        T: 'static + Send,
        P: Into<ParParams>
;
fn tee<B>(self, buf_size: B) -> Tee<Self::Item>
    where
        Self::Item: Clone,
        B: Into<BufSize>
;
fn broadcast<B>(
        self,
        buf_size: B,
        send_all: bool
    ) -> BroadcastBuilder<Self::Item>
    where
        Self::Item: Clone,
        B: Into<BufSize>
;
fn par_then<T, P, F, Fut>(self, params: P, f: F) -> ParThen<T>
    where
        T: 'static + Send,
        F: 'static + FnMut(Self::Item) -> Fut + Send,
        Fut: 'static + Future<Output = T> + Send,
        P: Into<ParParams>
;
fn par_then_unordered<T, P, F, Fut>(
        self,
        params: P,
        f: F
    ) -> RecvStream<'static, T>
    where
        T: 'static + Send,
        F: 'static + FnMut(Self::Item) -> Fut + Send,
        Fut: 'static + Future<Output = T> + Send,
        P: Into<ParParams>
;
fn par_map<T, P, F, Func>(self, params: P, f: F) -> ParMap<T>
    where
        T: 'static + Send,
        F: 'static + FnMut(Self::Item) -> Func + Send,
        Func: 'static + FnOnce() -> T + Send,
        P: Into<ParParams>
;
fn par_map_unordered<T, P, F, Func>(
        self,
        params: P,
        f: F
    ) -> RecvStream<'static, T>
    where
        T: 'static + Send,
        F: 'static + FnMut(Self::Item) -> Func + Send,
        Func: 'static + FnOnce() -> T + Send,
        P: Into<ParParams>
;
fn par_reduce<P, F, Fut>(
        self,
        params: P,
        reduce_fn: F
    ) -> BoxFuture<'static, Option<Self::Item>>
    where
        P: Into<ParParams>,
        F: 'static + FnMut(Self::Item, Self::Item) -> Fut + Send + Clone,
        Fut: 'static + Future<Output = Self::Item> + Send
;
fn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
    where
        F: 'static + FnMut(Self::Item) -> Fut + Send,
        Fut: 'static + Future<Output = ()> + Send,
        P: Into<ParParams>
;
fn par_for_each_blocking<P, F, Func>(
        self,
        params: P,
        f: F
    ) -> BoxFuture<'static, ()>
    where
        F: 'static + FnMut(Self::Item) -> Func + Send,
        Func: 'static + FnOnce() + Send,
        P: Into<ParParams>
;
}
Expand description

The trait extends Stream types with parallel processing combinators.

Required methods

Moves the stream to a spawned worker and forwards stream items to a channel with buf_size.

It returns a receiver stream that buffers the items. The receiver stream is cloneable so that items are sent in anycast manner.

This combinator is similar to shared(). The difference is that spawned() spawns a worker that actively forwards stream items to the channel, and the receivers shares the channel. The shared() combinator directly poll the underlying stream whenever a receiver polls in lock-free manner. The choice of these combinator depends on the performance considerations.

use futures::prelude::*;
use par_stream::prelude::*;

// Creates two sharing handles to the stream
let stream = stream::iter(0..100);
let recv1 = stream.spawned(None); // spawn with default buffer size
let recv2 = recv1.clone(); // creates the second receiver

// Consumes the shared streams individually
let collect1 = par_stream::rt::spawn(recv1.collect());
let collect2 = par_stream::rt::spawn(recv2.collect());
let (vec1, vec2): (Vec<_>, Vec<_>) = futures::join!(collect1, collect2);

// Checks that the combined values of two vecs are equal to original values
let mut all_vec: Vec<_> = vec1.into_iter().chain(vec2).collect();
all_vec.sort();
itertools::assert_equal(all_vec, 0..100);

Maps this stream’s items to a different type on an blocking thread.

The combinator iteratively maps the stream items and places the output items to a channel with buf_size. The function f is executed on a separate blocking thread to prevent from blocking the asynchronous runtime.

use futures::{prelude::*, stream};
use par_stream::prelude::*;

let vec: Vec<_> = stream::iter(0..100)
    .map_blocking(None, |_| {
        // runs a CPU-bounded work here
        (0..1000).sum::<u64>()
    })
    .collect()
    .await;

Creates a builder that routes each input item according to key_fn to a destination receiver.

Call builder.register("key") to obtain the receiving stream for that key. The builder must be finished by builder.build() so that receivers start consuming items. builder.build() also returns a special leaking receiver for items which key is not registered or target receiver is closed. Dropping the builder without builder.build() will cause receivers to get empty input.

Creates a builder that setups parallel tasks.

The combinator maintains a collection of concurrent workers, each consuming as many elements as it likes, for each output element.

use futures::prelude::*;
use par_stream::prelude::*;

let data = vec![1, 2, -3, 4, 5, -6, 7, 8];
stream::iter(data).par_batching(None, |_worker_index, rx| async move {
    while let Ok(value) = rx.recv_async().await {
        if value > 0 {
            return Some((value, rx));
        }
    }
    None
});

Converts the stream to cloneable receivers, each receiving a copy for each input item.

It spawns a task to consume the stream, and forwards item copies to receivers. The buf_size sets the interal channel size. Dropping a receiver does not cause another receiver to stop.

Receivers are not guaranteed to get the same initial item due to the time difference among receiver creation time. Use broadcast() instead if you need this guarantee.

use futures::{join, prelude::*};
use par_stream::prelude::*;

let orig: Vec<_> = (0..1000).collect();

let rx1 = stream::iter(orig.clone()).tee(1);
let rx2 = rx1.clone();
let rx3 = rx1.clone();

let fut1 = rx1.map(|val| val).collect();
let fut2 = rx2.map(|val| val * 2).collect();
let fut3 = rx3.map(|val| val * 3).collect();

let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = join!(fut1, fut2, fut3);

Creates a builder to register broadcast receivers.

Call builder.register() to create a receiver. Once the registration is done. builder.build() must be called so that receivers start comsuming item copies. If the builder is droppped without build, receivers get empty input.

Each receiver maintains an internal buffer of buf_size. The send_all configures the behavior if any one of receiver closes. If send_all is true, closing of one receiver casues the other receivers to stop, otherwise it does not.

use futures::{join, prelude::*};
use par_stream::prelude::*;

let mut builder = stream::iter(0..).broadcast(2, true);
let rx1 = builder.register();
let rx2 = builder.register();
builder.build();

let (ret1, ret2): (Vec<_>, Vec<_>) = join!(rx1.take(100).collect(), rx2.take(100).collect());
let expect: Vec<_> = (0..100).collect();

assert_eq!(ret1, expect);
assert_eq!(ret2, expect);

Runs an asynchronous task on parallel workers and produces items respecting the input order.

The params sets the worker pool size and output buffer size. Each parallel worker shares the stream and executes a future for each input item. Output items are gathered to a channel and are reordered respecting to input order.

use futures::prelude::*;
use par_stream::prelude::*;

let doubled: Vec<_> = stream::iter(0..1000)
    // doubles the values in parallel
    .par_then(None, move |value| async move { value * 2 })
    // the collected values will be ordered
    .collect()
    .await;
let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);

Runs an asynchronous task on parallel workers and produces items without respecting input order.

The params sets the worker pool size and output buffer size. Each parallel worker shares the stream and executes a future for each input item. The worker forwards the output to a channel as soon as it finishes.

use futures::prelude::*;
use par_stream::prelude::*;
use std::collections::HashSet;

let doubled: HashSet<_> = stream::iter(0..1000)
    // doubles the values in parallel
    .par_then_unordered(None, move |value| {
        // the future is sent to a parallel worker
        async move { value * 2 }
    })
    // the collected values may NOT be ordered
    .collect()
    .await;
let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);

Runs a blocking task on parallel workers and produces items respecting the input order.

The params sets the worker pool size and output buffer size. Each parallel worker shares the stream and executes a future for each input item. Output items are gathered to a channel and are reordered respecting to input order.

use futures::prelude::*;
use par_stream::prelude::*;

// the variable will be shared by parallel workers
let doubled: Vec<_> = stream::iter(0..1000)
    // doubles the values in parallel
    .par_map(None, move |value| {
        // the closure is sent to parallel worker
        move || value * 2
    })
    // the collected values may NOT be ordered
    .collect()
    .await;
let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);

Runs a blocking task on parallel workers and produces items without respecting input order.

The params sets the worker pool size and output buffer size. Each parallel worker shares the stream and executes a future for each input item. The worker forwards the output to a channel as soon as it finishes.

use futures::prelude::*;
use par_stream::prelude::*;
use std::collections::HashSet;

// the variable will be shared by parallel workers

let doubled: HashSet<_> = stream::iter(0..1000)
    // doubles the values in parallel
    .par_map_unordered(None, move |value| {
        // the closure is sent to parallel worker
        move || value * 2
    })
    // the collected values may NOT be ordered
    .collect()
    .await;
let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);

Reduces the input stream into a single value in parallel.

It maintains a parallel worker pool of num_workers. Each worker reduces the input items from the stream into a single value. Once all parallel worker finish, the values from each worker are reduced into one in treefold manner.

use futures::prelude::*;
use par_stream::prelude::*;

// the variable will be shared by parallel workers
let sum = stream::iter(1..=1000)
    // sum up the values in parallel
    .par_reduce(None, move |lhs, rhs| {
        // the closure is sent to parallel worker
        async move { lhs + rhs }
    })
    .await;
assert_eq!(sum, Some((1 + 1000) * 1000 / 2));

Runs an asynchronous task on parallel workers.

Runs a blocking task on parallel workers.

Implementors