pub trait IndexStreamExt where
    Self: Stream<Item = (usize, Self::IndexedItem)>, 
{ type IndexedItem; fn reorder_enumerated(self) -> ReorderEnumerated<Self, Self::IndexedItem>; }
Expand description

The trait extends Stream types with ordering manipulation combinators.

Associated Types

Required methods

Reorders the input items (index, item) according to the index number and returns item.

It can be combined with enumerate() and parallel unordered tasks.

The index numbers must start from zero, be unique and contiguous. Index not starting from zero causes the stream to hang indefinitely.

Panics

The repeating of an index will cause the stream to panic.

use futures::prelude::*;
use par_stream::prelude::*;

let doubled: Vec<_> = stream::iter(0..1000)
    // add index number
    .enumerate()
    // double the values in parallel
    .par_then_unordered(None, move |(index, value)| {
        // the closure is sent to parallel worker
        async move { (index, value * 2) }
    })
    // add values by one in parallel
    .par_then_unordered(None, move |(index, value)| {
        // the closure is sent to parallel worker
        async move { (index, value + 1) }
    })
    // reorder the values according to index number
    .reorder_enumerated()
    .collect()
    .await;
let expect: Vec<_> = (0..1000).map(|value| value * 2 + 1).collect();
assert_eq!(doubled, expect);

Implementors