Trait par_stream::prelude::IndexedStreamExt[][src]

pub trait IndexedStreamExt where
    Self: Stream, 
{ fn wrapping_enumerate(self) -> WrappingEnumerate<Self>;
fn reorder_enumerated<T>(self) -> ReorderEnumerated<Self, T>
    where
        Self: Stream<Item = (usize, T)>
; }
Expand description

An extension trait that controls ordering of stream items.

Required methods

Gives the current iteration count that may overflow to zero as well as the next value.

Reorder the input items paired with a iteration count.

The combinator asserts the input item has tuple type (usize, T). It reorders the items according to the first value of input tuple.

It is usually combined with IndexedStreamExt::wrapping_enumerate, then applies a series of unordered parallel mapping, and finally reorders the values back by this method. It avoids reordering the values after each parallel mapping step.

use futures::prelude::*;
use par_stream::prelude::*;

async fn main_async() {
    let doubled = futures::stream::iter(0..1000)
        // add enumerated index that does not panic on overflow
        .wrapping_enumerate()
        // double the values in parallel
        .par_then_unordered(None, move |(index, value)| {
            // the closure is sent to parallel worker
            async move { (index, value * 2) }
        })
        // add values by one in parallel
        .par_then_unordered(None, move |(index, value)| {
            // the closure is sent to parallel worker
            async move { (index, value + 1) }
        })
        // reorder the values by enumerated index
        .reorder_enumerated()
        .collect::<Vec<_>>()
        .await;
    let expect = (0..1000).map(|value| value * 2 + 1).collect::<Vec<_>>();
    assert_eq!(doubled, expect);
}

Implementors