Trait par_stream::prelude::IndexedStreamExt [−][src]
pub trait IndexedStreamExt where
Self: Stream, {
fn wrapping_enumerate(self) -> WrappingEnumerate<Self>;
fn reorder_enumerated<T>(self) -> ReorderEnumerated<Self, T>
where
Self: Stream<Item = (usize, T)>;
}
Expand description
An extension trait that controls ordering of stream items.
Required methods
fn wrapping_enumerate(self) -> WrappingEnumerate<Self>
fn wrapping_enumerate(self) -> WrappingEnumerate<Self>
Gives the current iteration count that may overflow to zero as well as the next value.
fn reorder_enumerated<T>(self) -> ReorderEnumerated<Self, T> where
Self: Stream<Item = (usize, T)>,
fn reorder_enumerated<T>(self) -> ReorderEnumerated<Self, T> where
Self: Stream<Item = (usize, T)>,
Reorder the input items paired with a iteration count.
The combinator asserts the input item has tuple type (usize, T)
.
It reorders the items according to the first value of input tuple.
It is usually combined with IndexedStreamExt::wrapping_enumerate, then applies a series of unordered parallel mapping, and finally reorders the values back by this method. It avoids reordering the values after each parallel mapping step.
use futures::prelude::*;
use par_stream::prelude::*;
async fn main_async() {
let doubled = futures::stream::iter(0..1000)
// add enumerated index that does not panic on overflow
.wrapping_enumerate()
// double the values in parallel
.par_then_unordered(None, move |(index, value)| {
// the closure is sent to parallel worker
async move { (index, value * 2) }
})
// add values by one in parallel
.par_then_unordered(None, move |(index, value)| {
// the closure is sent to parallel worker
async move { (index, value + 1) }
})
// reorder the values by enumerated index
.reorder_enumerated()
.collect::<Vec<_>>()
.await;
let expect = (0..1000).map(|value| value * 2 + 1).collect::<Vec<_>>();
assert_eq!(doubled, expect);
}