par-stream 0.10.2

Asynchronous parallel streams analogous to rayon
Documentation
use concurrent_slice::{Chunk, ConcurrentSlice};
use futures::{stream, stream::StreamExt as _};
use itertools::izip;
use par_stream::prelude::*;
use rand::prelude::*;
use std::time::Instant;

const LEN: usize = 100_000_000;
const MIN_CHUNK_LEN: usize = 2048;

fn main() {
    par_stream::rt::block_on_executor(async move {
        // fill array with random numbers
        let instant = Instant::now();
        let array = {
            let array = vec![0i32; LEN];

            let chunks: Vec<_> = stream::iter(array.concurrent_chunks_by_division(None))
                .par_map_unordered(None, |mut chunk| {
                    move || {
                        let mut rng = rand::thread_rng();
                        chunk.iter_mut().for_each(|elem| *elem = rng.gen());
                        chunk
                    }
                })
                .collect()
                .await;
            let guard = chunks[0].guard();
            drop(chunks);
            guard.try_unwrap().unwrap()
        };
        eprintln!("random vec generation:\t{:?}", instant.elapsed());

        // benchmark non-concurrent sorting
        let mut array_std = array.clone();

        let instant = Instant::now();
        let array_std = {
            array_std.sort();
            array_std
        };
        eprintln!("std sort:\t{:?}", instant.elapsed());

        // parallel merge sort
        let instant = Instant::now();
        let array_concurrent = {
            // generate chunks
            let chunks0: Vec<_> = array.concurrent_chunks(MIN_CHUNK_LEN).collect();
            let mut chunks1: Vec<_> = vec![0; LEN].concurrent_chunks(MIN_CHUNK_LEN).collect();

            // sort within chunks
            let mut chunks0: Vec<_> = stream::iter(chunks0)
                .par_map(None, |mut chunk| {
                    move || {
                        chunk.sort();
                        chunk
                    }
                })
                .collect()
                .await;

            // merge sort
            while chunks0.len() > 1 {
                let (chunks0_, chunks1_): (Vec<_>, Vec<_>) = stream::iter(izip!(chunks0, chunks1))
                    .chunks(2)
                    .par_map(None, |pair| {
                        move || {
                            let mut pair = pair.into_iter();

                            let (lchunk0, mut lchunk1) = pair.next().unwrap();
                            let (rchunk0, rchunk1) = match pair.next() {
                                Some(chunk) => chunk, // non-tail case
                                None => {
                                    // tail case
                                    lchunk1.copy_from_slice(&*lchunk0);
                                    return (lchunk0, lchunk1);
                                }
                            };

                            // source slices
                            let mut lslice0: &[_] = &*lchunk0;
                            let mut rslice0: &[_] = &*rchunk0;

                            // target slice
                            let mut chunk1 = Chunk::cat(vec![lchunk1, rchunk1]);
                            let mut slice1 = &mut *chunk1;

                            // merge sorted source slices into the target slice
                            loop {
                                match (lslice0.first(), rslice0.first()) {
                                    (Some(&lval), Some(&rval)) => {
                                        if lval <= rval {
                                            slice1[0] = lval;
                                            lslice0 = &lslice0[1..];
                                        } else {
                                            slice1[0] = rval;
                                            rslice0 = &rslice0[1..];
                                        }
                                        slice1 = &mut slice1[1..];
                                    }
                                    (Some(_), None) => {
                                        slice1.clone_from_slice(lslice0);
                                        break;
                                    }
                                    (None, Some(_)) => {
                                        slice1.clone_from_slice(rslice0);
                                        break;
                                    }
                                    (None, None) => break,
                                }
                            }

                            // merge source chunks
                            let chunk0 = Chunk::cat(vec![lchunk0, rchunk0]);

                            (chunk0, chunk1)
                        }
                    })
                    .unzip()
                    .await;

                // swap even and odd chunks
                chunks0 = chunks1_;
                chunks1 = chunks0_;
            }

            // merge chunks back to array
            let guard = chunks0[0].guard();
            drop(chunks0);
            guard.try_unwrap().unwrap()
        };
        eprintln!("merge sort:\t{:?}", instant.elapsed());

        // verify results
        assert_eq!(
            array_std, array_concurrent,
            "the outputs of std sort and merge-sort differ"
        );
    });
}