orx-parallel 3.4.0

High performance, configurable and expressive parallel computation library.
Documentation
use crate::{
    ThreadExecutor,
    generic_values::Values,
    generic_values::runner_results::{Next, NextWithIdx},
};
use orx_concurrent_iter::{ChunkPuller, ConcurrentIter};

pub fn m<U, C, I, O, M1>(
    mut using: U,
    mut runner: C,
    iter: &I,
    shared_state: &C::SharedState,
    map1: &M1,
) -> Option<(usize, O)>
where
    C: ThreadExecutor,
    I: ConcurrentIter,
    M1: Fn(&mut U, I::Item) -> O,
{
    let u = &mut using;
    let mut chunk_puller = iter.chunk_puller(0);
    let mut item_puller = iter.item_puller_with_idx();

    loop {
        let chunk_size = runner.next_chunk_size(shared_state, iter);

        runner.begin_chunk(chunk_size);

        match chunk_size {
            0 | 1 => match item_puller.next() {
                Some((idx, i)) => {
                    let first = map1(u, i);
                    iter.skip_to_end();
                    runner.complete_chunk(shared_state, chunk_size);
                    runner.complete_task(shared_state);
                    return Some((idx, first));
                }
                None => break,
            },
            c => {
                if c > chunk_puller.chunk_size() {
                    chunk_puller = iter.chunk_puller(c);
                }

                match chunk_puller.pull_with_idx() {
                    Some((idx, mut chunk)) => {
                        if let Some(i) = chunk.next() {
                            let first = map1(u, i);
                            iter.skip_to_end();
                            runner.complete_chunk(shared_state, chunk_size);
                            runner.complete_task(shared_state);
                            return Some((idx, first));
                        }
                    }
                    None => break,
                }
            }
        }

        runner.complete_chunk(shared_state, chunk_size);
    }

    runner.complete_task(shared_state);
    None
}

pub fn x<U, C, I, Vo, X1>(
    mut using: U,
    mut runner: C,
    iter: &I,
    shared_state: &C::SharedState,
    xap1: &X1,
) -> NextWithIdx<Vo>
where
    C: ThreadExecutor,
    I: ConcurrentIter,
    Vo: Values,
    X1: Fn(&mut U, I::Item) -> Vo,
{
    let u = &mut using;
    let mut chunk_puller = iter.chunk_puller(0);
    let mut item_puller = iter.item_puller_with_idx();

    loop {
        let chunk_size = runner.next_chunk_size(shared_state, iter);

        runner.begin_chunk(chunk_size);

        match chunk_size {
            0 | 1 => match item_puller.next() {
                Some((idx, i)) => {
                    let vt = xap1(u, i);
                    match vt.next() {
                        Next::Done { value } => {
                            if let Some(value) = value {
                                iter.skip_to_end();
                                runner.complete_chunk(shared_state, chunk_size);
                                runner.complete_task(shared_state);
                                return NextWithIdx::Found { idx, value };
                            }
                        }
                        Next::StoppedByError { error } => {
                            iter.skip_to_end();
                            runner.complete_chunk(shared_state, chunk_size);
                            runner.complete_task(shared_state);
                            return NextWithIdx::StoppedByError { error };
                        }
                        Next::StoppedByWhileCondition => {
                            iter.skip_to_end();
                            runner.complete_chunk(shared_state, chunk_size);
                            runner.complete_task(shared_state);
                            return NextWithIdx::StoppedByWhileCondition { idx };
                        }
                    }
                }
                None => break,
            },
            c => {
                if c > chunk_puller.chunk_size() {
                    chunk_puller = iter.chunk_puller(c);
                }

                match chunk_puller.pull_with_idx() {
                    Some((idx, chunk)) => {
                        for i in chunk {
                            let vt = xap1(u, i);
                            match vt.next() {
                                Next::Done { value } => {
                                    if let Some(value) = value {
                                        iter.skip_to_end();
                                        runner.complete_chunk(shared_state, chunk_size);
                                        runner.complete_task(shared_state);
                                        return NextWithIdx::Found { idx, value };
                                    }
                                }
                                Next::StoppedByError { error } => {
                                    iter.skip_to_end();
                                    runner.complete_chunk(shared_state, chunk_size);
                                    runner.complete_task(shared_state);
                                    return NextWithIdx::StoppedByError { error };
                                }
                                Next::StoppedByWhileCondition => {
                                    iter.skip_to_end();
                                    runner.complete_chunk(shared_state, chunk_size);
                                    runner.complete_task(shared_state);
                                    return NextWithIdx::StoppedByWhileCondition { idx };
                                }
                            }
                        }
                    }
                    None => break,
                }
            }
        }

        runner.complete_chunk(shared_state, chunk_size);
    }

    runner.complete_task(shared_state);
    NextWithIdx::NotFound
}