dpc-pariter 0.5.1

Parallel iterator processing
Documentation
use crate::TotalTimeProfiler;

use super::IteratorExt;
use quickcheck_macros::quickcheck;

#[quickcheck]
fn map_vs_map_parallel(v: Vec<usize>, threads: usize, max_in_flight: usize) -> bool {
    let m: Vec<_> = v.clone().into_iter().map(|x| x / 2).collect();
    let mp: Vec<_> = v
        .clone()
        .into_iter()
        .parallel_map_custom(
            |o| o.threads(threads % 32).buffer_size(max_in_flight % 128),
            |x| x / 2,
        )
        .collect();

    m == mp
}

#[quickcheck]
fn map_vs_map_parallel_double(v: Vec<usize>, threads: usize, max_in_flight: usize) -> bool {
    let m: Vec<_> = v.clone().into_iter().map(|x| x / 2).collect();
    let mp: Vec<_> = v
        .clone()
        .into_iter()
        .parallel_map_custom(
            |o| o.threads(threads % 32).buffer_size(max_in_flight % 128),
            |x| x / 2,
        )
        .parallel_map_custom(
            |o| o.threads(threads % 32).buffer_size(max_in_flight % 128),
            |x| x,
        )
        .collect();

    m == mp
}

#[quickcheck]
fn map_vs_map_parallel_scoped_double(v: Vec<usize>, threads: usize, max_in_flight: usize) -> bool {
    let m: Vec<usize> = v.iter().map(|x| x / 2).collect();
    let mp: Vec<usize> = super::scope(|s| {
        v.iter()
            .parallel_map_scoped_custom(s, |o| o.threads(threads % 32), |x| x / 2)
            .parallel_map_scoped_custom(s, |o| o.buffer_size(max_in_flight % 128), |x| x)
            .collect()
    })
    .expect("failed");

    m == mp
}

#[quickcheck]
fn map_vs_map_parallel_with_readahead(v: Vec<usize>, threads: usize) -> bool {
    let m: Vec<_> = v.clone().into_iter().map(|x| x / 2).collect();
    let mp: Vec<_> = v
        .clone()
        .into_iter()
        .parallel_map_custom(|o| o.threads(threads % 32), |x| x / 2)
        .readahead()
        .parallel_map_custom(|o| o.threads(threads % 32), |x| x)
        .readahead()
        .parallel_map_custom(|o| o.threads(threads % 32), |x| x)
        .collect();

    m == mp
}

#[quickcheck]
fn map_vs_map_parallel_scoped_with_readahead(
    v: Vec<usize>,
    threads: usize,
    max_in_flight: usize,
) -> bool {
    let m: Vec<usize> = v.iter().map(|x| x / 2).collect();
    let mp: Vec<usize> = super::scope(|s| {
        v.iter()
            .parallel_map_scoped_custom(s, |o| o.threads(threads % 32), |x| x / 2)
            .readahead_scoped(s)
            .parallel_map_scoped_custom(s, |o| o.buffer_size(max_in_flight % 128), |x| x)
            .readahead_scoped(s)
            .parallel_map_scoped(s, |x| x)
            .readahead_scoped(s)
            .parallel_map_scoped_custom(s, |o| o.threads(threads % 32), |x| x)
            .collect()
    })
    .expect("failed");

    m == mp
}

#[quickcheck]
fn check_profile_compiles(v: Vec<usize>) -> bool {
    let m: Vec<usize> = v.iter().map(|x| x / 2).collect();
    let mp: Vec<usize> = super::scope(|s| {
        v.iter()
            .parallel_map_scoped(s, |x| x / 2)
            .profile_egress(TotalTimeProfiler::periodically_millis(10_000, || {
                eprintln!("Blocked on sending")
            }))
            .profile_ingress(TotalTimeProfiler::periodically_millis(10_000, || {
                eprintln!("Blocked on sending")
            }))
            .readahead_scoped(s)
            .parallel_map_scoped_custom(s, |o| o, |x| x)
            .readahead_scoped(s)
            .profile_egress(TotalTimeProfiler::periodically_millis(10_000, || {
                eprintln!("Blocked on sending")
            }))
            .profile_ingress(TotalTimeProfiler::periodically_millis(10_000, || {
                eprintln!("Blocked on sending")
            }))
            .parallel_map_scoped_custom(s, |o| o, |x| x)
            .readahead_scoped(s)
            .parallel_map_scoped_custom(s, |o| o, |x| x)
            .collect()
    })
    .expect("failed");

    m == mp
}

#[quickcheck]
fn iter_vs_readhead(v: Vec<usize>, out: usize) -> bool {
    let m: Vec<_> = v.clone().into_iter().map(|x| x / 2).collect();
    let mp: Vec<_> = v
        .clone()
        .into_iter()
        .readahead_custom(|o| o.buffer_size(out % 32))
        .map(|x| x / 2)
        .collect();

    m == mp
}

#[quickcheck]
fn iter_vs_readhead_scoped(v: Vec<usize>, out: usize) -> bool {
    let m: Vec<_> = v.iter().map(|x| x / 2).collect();
    let mp: Vec<_> = super::scope(|s| {
        v.iter()
            .readahead_scoped_custom(s, |o| o.buffer_size(out % 32))
            .map(|x| x / 2)
            .collect()
    })
    .expect("failed");

    m == mp
}

#[quickcheck]
fn filter_vs_parallel_filter(v: Vec<usize>) -> bool {
    let m: Vec<_> = v.clone().into_iter().filter(|x| x % 2 == 0).collect();
    let mp: Vec<_> = v
        .clone()
        .into_iter()
        .parallel_filter(|x| x % 2 == 0)
        .collect();

    m == mp
}

#[quickcheck]
fn filter_vs_parallel_filter_scoped(v: Vec<usize>) -> bool {
    let m: Vec<_> = v.iter().filter(|x| *x % 2 == 0).collect();
    let mp: Vec<_> = super::scope(|s| {
        v.iter()
            .parallel_filter_scoped(s, |x| *x % 2 == 0)
            .collect()
    })
    .expect("failed");

    m == mp
}

#[test]
#[should_panic]
fn panic_always_1() {
    (0..10)
        .parallel_map_custom(|o| o.threads(1), |_| panic!("foo"))
        .count();
}

#[test]
#[should_panic]
fn panic_always_8() {
    (0..10)
        .parallel_map_custom(|o| o.threads(8), |_| panic!("foo"))
        .count();
}

#[test]
#[should_panic]
fn panic_once_1() {
    (0..10)
        .parallel_map_custom(
            |o| o.threads(1),
            |i| {
                if i == 5 {
                    panic!("foo");
                } else {
                    i
                }
            },
        )
        .count();
}

#[test]
#[should_panic]
fn panic_once_8() {
    (0..10)
        .parallel_map_custom(
            |o| o.threads(8),
            |i| {
                if i == 5 {
                    panic!("foo");
                } else {
                    i
                }
            },
        )
        .count();
}

#[test]
#[should_panic]
fn panic_after_a_point_1() {
    (0..10)
        .parallel_map_custom(
            |o| o.threads(1),
            |i| {
                if i > 5 {
                    panic!("foo");
                } else {
                    i
                }
            },
        )
        .count();
}

#[test]
#[should_panic]
fn panic_after_a_point_8() {
    (0..10)
        .parallel_map_custom(
            |o| o.threads(8),
            |i| {
                if i > 5 {
                    panic!("foo");
                } else {
                    i
                }
            },
        )
        .count();
}

#[test]
#[should_panic]
fn panic_before_a_point_1() {
    (0..10)
        .parallel_map_custom(
            |o| o.threads(1),
            |i| {
                if i < 5 {
                    panic!("foo");
                } else {
                    i
                }
            },
        )
        .count();
}

#[test]
#[should_panic]
fn panic_before_a_point_8() {
    (0..10)
        .parallel_map_custom(
            |o| o.threads(8),
            |i| {
                if i < 5 {
                    panic!("foo");
                } else {
                    i
                }
            },
        )
        .count();
}