orx-parallel 3.4.0

High performance, configurable and expressive parallel computation library.
Documentation
use crate::Params;
use crate::generic_values::runner_results::{
    Infallible, ParallelCollect, ParallelCollectArbitrary,
};
use crate::runner::{NumSpawned, ParallelRunner};
use crate::using::executor::parallel_compute as prc;
use crate::using::using_variants::Using;
use crate::{IterationOrder, generic_values::Values};
use orx_concurrent_iter::ConcurrentIter;
use orx_fixed_vec::IntoConcurrentPinnedVec;

pub fn map_collect_into<U, R, I, O, M1, P>(
    using: U,
    orchestrator: R,
    params: Params,
    iter: I,
    map1: M1,
    pinned_vec: P,
) -> (NumSpawned, P)
where
    U: Using,
    R: ParallelRunner,
    I: ConcurrentIter,
    M1: Fn(&mut U::Item, I::Item) -> O + Sync,
    O: Send,
    P: IntoConcurrentPinnedVec<O>,
{
    match (params.is_sequential(), params.iteration_order) {
        (true, _) => (
            NumSpawned::zero(),
            map_collect_into_seq(using, iter, map1, pinned_vec),
        ),
        #[cfg(test)]
        (false, IterationOrder::Arbitrary) => {
            prc::collect_arbitrary::m(using, orchestrator, params, iter, map1, pinned_vec)
        }
        (false, _) => prc::collect_ordered::m(using, orchestrator, params, iter, map1, pinned_vec),
    }
}

fn map_collect_into_seq<U, I, O, M1, P>(using: U, iter: I, map1: M1, mut pinned_vec: P) -> P
where
    U: Using,
    I: ConcurrentIter,
    M1: Fn(&mut U::Item, I::Item) -> O + Sync,
    O: Send,
    P: IntoConcurrentPinnedVec<O>,
{
    let mut u = using.into_inner();
    let iter = iter.into_seq_iter();
    for i in iter {
        pinned_vec.push(map1(&mut u, i));
    }
    pinned_vec
}

pub fn xap_collect_into<U, R, I, Vo, X1, P>(
    using: U,
    orchestrator: R,
    params: Params,
    iter: I,
    xap1: X1,
    pinned_vec: P,
) -> (NumSpawned, P)
where
    U: Using,
    R: ParallelRunner,
    I: ConcurrentIter,
    Vo: Values<Fallibility = Infallible>,
    Vo::Item: Send,
    X1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
    P: IntoConcurrentPinnedVec<Vo::Item>,
{
    match (params.is_sequential(), params.iteration_order) {
        (true, _) => (
            NumSpawned::zero(),
            xap_collect_into_seq(using, iter, xap1, pinned_vec),
        ),
        (false, IterationOrder::Arbitrary) => {
            let (num_threads, result) =
                prc::collect_arbitrary::x(using, orchestrator, params, iter, xap1, pinned_vec);
            let pinned_vec = match result {
                ParallelCollectArbitrary::AllOrUntilWhileCollected { pinned_vec } => pinned_vec,
            };
            (num_threads, pinned_vec)
        }
        (false, IterationOrder::Ordered) => {
            let (num_threads, result) =
                prc::collect_ordered::x(using, orchestrator, params, iter, xap1, pinned_vec);
            let pinned_vec = match result {
                ParallelCollect::AllCollected { pinned_vec } => pinned_vec,
                ParallelCollect::StoppedByWhileCondition {
                    pinned_vec,
                    stopped_idx: _,
                } => pinned_vec,
            };
            (num_threads, pinned_vec)
        }
    }
}

fn xap_collect_into_seq<U, I, Vo, X1, P>(using: U, iter: I, xap1: X1, mut pinned_vec: P) -> P
where
    U: Using,
    I: ConcurrentIter,
    Vo: Values,
    Vo::Item: Send,
    X1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
    P: IntoConcurrentPinnedVec<Vo::Item>,
{
    let mut u = using.into_inner();
    let iter = iter.into_seq_iter();
    for i in iter {
        let vt = xap1(&mut u, i);
        let done = vt.push_to_pinned_vec(&mut pinned_vec);
        if Vo::sequential_push_to_stop(done).is_some() {
            break;
        }
    }

    pinned_vec
}