par-stream 0.10.2

Asynchronous parallel streams analogous to rayon
Documentation
use futures::{
    future,
    stream::{self, StreamExt as _},
};
use par_stream::{rt, Shared};
use rand::{prelude::*, rngs::OsRng};
use std::time::Duration;
use structopt::StructOpt;

#[derive(StructOpt)]
struct Opts {
    pub num_jobs: usize,
    pub num_workers: usize,
    pub in_buf_size: usize,
    pub out_buf_size: usize,
    pub pow: u32,
    #[structopt(long)]
    pub spawn: bool,
}

fn main() {
    par_stream::rt::block_on_executor(async move {
        let opts = Opts::from_args();

        let elapsed_notifier = shared_stream_by_notifier_test(&opts).await;
        println!("elapsed for notifier\t{:?}ms", elapsed_notifier.as_millis());

        let elapsed_channel = shared_stream_by_channel_test(&opts).await;
        println!("elapsed for channel\t{:?}ms", elapsed_channel.as_millis());
    });
}

async fn shared_stream_by_notifier_test(opts: &Opts) -> Duration {
    let pow = opts.pow;
    let spawn = opts.spawn;

    let stream = stream::repeat(())
        .take(opts.num_jobs)
        .map(|()| -> u64 { OsRng.gen() });
    let stream = Shared::new(stream);
    let (out_tx, out_rx) = flume::bounded(opts.out_buf_size);

    let worker_futures = (0..(opts.num_workers)).map(move |_| {
        let out_tx = out_tx.clone();
        let stream = stream.clone();

        rt::spawn(async move {
            let _ = stream
                .then(|val| task(val, pow, spawn))
                .map(Ok)
                .forward(out_tx.into_sink())
                .await;
        })
    });

    let output_future = rt::spawn(async move {
        out_rx
            .into_stream()
            .fold(0u64, |sum, val| future::ready(sum.wrapping_add(val)))
            .await
    });

    let instant = std::time::Instant::now();
    futures::join!(output_future, future::join_all(worker_futures));

    instant.elapsed()
}

async fn shared_stream_by_channel_test(opts: &Opts) -> Duration {
    let pow = opts.pow;
    let spawn = opts.spawn;

    let stream = stream::repeat(())
        .take(opts.num_jobs)
        .map(|()| -> u64 { OsRng.gen() });
    let (in_tx, in_rx) = flume::bounded(opts.in_buf_size);
    let (out_tx, out_rx) = flume::bounded(opts.out_buf_size);

    let input_future = rt::spawn(async move {
        let _ = stream.map(Ok).forward(in_tx.into_sink()).await;
    });

    let worker_futures = (0..(opts.num_workers)).map(move |_| {
        let in_rx = in_rx.clone();
        let out_tx = out_tx.clone();

        rt::spawn(async move {
            let _ = in_rx
                .into_stream()
                .then(|val| task(val, pow, spawn))
                .map(Ok)
                .forward(out_tx.into_sink())
                .await;
        })
    });

    let output_future = rt::spawn(async move {
        out_rx
            .into_stream()
            .fold(0u64, |sum, val| future::ready(sum.wrapping_add(val)))
            .await
    });

    let instant = std::time::Instant::now();
    futures::join!(
        input_future,
        output_future,
        future::join_all(worker_futures)
    );
    instant.elapsed()
}

async fn task(input: u64, pow: u32, spawn: bool) -> u64 {
    if spawn {
        rt::spawn_blocking(move || compute(input, pow)).await
    } else {
        compute(input, pow)
    }
}

fn compute(input: u64, pow: u32) -> u64 {
    (0..pow).fold(1u64, move |product, _| product.wrapping_mul(input))
}