Trait par_stream::IndexStreamExt
source · [−]pub trait IndexStreamExt where
Self: Stream<Item = (usize, Self::IndexedItem)>, {
type IndexedItem;
fn reorder_enumerated(self) -> ReorderEnumerated<Self, Self::IndexedItem>;
}
Expand description
The trait extends Stream types with ordering manipulation combinators.
Associated Types
type IndexedItem
Required methods
fn reorder_enumerated(self) -> ReorderEnumerated<Self, Self::IndexedItem>
fn reorder_enumerated(self) -> ReorderEnumerated<Self, Self::IndexedItem>
Reorders the input items (index, item)
according to the index number and returns item
.
It can be combined with enumerate() and parallel unordered tasks.
The index numbers must start from zero, be unique and contiguous. Index not starting from zero causes the stream to hang indefinitely.
Panics
The repeating of an index will cause the stream to panic.
use futures::prelude::*;
use par_stream::prelude::*;
let doubled: Vec<_> = stream::iter(0..1000)
// add index number
.enumerate()
// double the values in parallel
.par_then_unordered(None, move |(index, value)| {
// the closure is sent to parallel worker
async move { (index, value * 2) }
})
// add values by one in parallel
.par_then_unordered(None, move |(index, value)| {
// the closure is sent to parallel worker
async move { (index, value + 1) }
})
// reorder the values according to index number
.reorder_enumerated()
.collect()
.await;
let expect: Vec<_> = (0..1000).map(|value| value * 2 + 1).collect();
assert_eq!(doubled, expect);