use futures::stream::{self, StreamExt as _};
use par_stream::ParStreamExt as _;
fn main() {
par_stream::rt::block_on_executor(async move {
let rx1 = futures::stream::iter(1isize..=1000).spawned(None);
let rx2 = rx1.clone();
let gathered_values: Vec<_> = stream::select(rx1, rx2.map(|val| -val)).collect().await;
let n_pos = gathered_values
.iter()
.cloned()
.filter(|&value| value > 0)
.count();
let n_neg = gathered_values
.iter()
.cloned()
.filter(|&value| value < 0)
.count();
println!("worker1 process {} numbers", n_pos);
println!("worker2 process {} numbers", n_neg);
});
}