orx-parallel 3.4.0

High performance, configurable and expressive parallel computation library.
Documentation
use crate::{
    generic_values::{Values, runner_results::Fallibility},
    heap_sort::heap_sort_into,
};
use alloc::vec::Vec;
use core::fmt::Debug;
use orx_fixed_vec::IntoConcurrentPinnedVec;

pub enum OrderedPush<F: Fallibility> {
    Done,
    StoppedByWhileCondition { idx: usize },
    StoppedByError { idx: usize, error: F::Error },
}

pub enum ThreadCollect<V>
where
    V: Values,
{
    AllCollected {
        vec: Vec<(usize, V::Item)>,
    },
    StoppedByWhileCondition {
        vec: Vec<(usize, V::Item)>,
        stopped_idx: usize,
    },
    StoppedByError {
        error: <V::Fallibility as Fallibility>::Error,
    },
}

impl<V: Values> Debug for ThreadCollect<V> {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            Self::AllCollected { vec } => f
                .debug_struct("AllCollected")
                .field("vec-len", &vec.len())
                .finish(),
            Self::StoppedByWhileCondition { vec, stopped_idx } => f
                .debug_struct("StoppedByWhileCondition")
                .field("vec-len", &vec.len())
                .field("stopped_idx", stopped_idx)
                .finish(),
            Self::StoppedByError { error: _ } => f.debug_struct("StoppedByError").finish(),
        }
    }
}

impl<V: Values> ThreadCollect<V> {
    pub fn into_result(self) -> Result<Self, <V::Fallibility as Fallibility>::Error> {
        match self {
            Self::StoppedByError { error } => Err(error),
            _ => Ok(self),
        }
    }
}

pub enum ParallelCollect<V, P>
where
    V: Values,
    P: IntoConcurrentPinnedVec<V::Item>,
{
    AllCollected {
        pinned_vec: P,
    },
    StoppedByWhileCondition {
        pinned_vec: P,
        stopped_idx: usize,
    },
    StoppedByError {
        error: <V::Fallibility as Fallibility>::Error,
    },
}

impl<V, P> core::fmt::Debug for ParallelCollect<V, P>
where
    V: Values,
    P: IntoConcurrentPinnedVec<V::Item>,
{
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            Self::AllCollected { pinned_vec } => f
                .debug_struct("AllCollected")
                .field("pinned_vec.len()", &pinned_vec.len())
                .finish(),
            Self::StoppedByWhileCondition {
                pinned_vec,
                stopped_idx,
            } => f
                .debug_struct("StoppedByWhileCondition")
                .field("pinned_vec.len()", &pinned_vec.len())
                .field("stopped_idx", stopped_idx)
                .finish(),
            Self::StoppedByError { error: _ } => f.debug_struct("StoppedByError").finish(),
        }
    }
}

impl<V, P> ParallelCollect<V, P>
where
    V: Values,
    P: IntoConcurrentPinnedVec<V::Item>,
{
    pub fn reduce(results: Vec<ThreadCollect<V>>, mut pinned_vec: P) -> Self {
        let mut vectors = Vec::with_capacity(results.len());
        let mut min_stopped_idx = None;

        for x in results {
            match x {
                ThreadCollect::AllCollected { vec } => vectors.push(vec),
                ThreadCollect::StoppedByWhileCondition { vec, stopped_idx } => {
                    min_stopped_idx = match min_stopped_idx {
                        Some(x) => Some(core::cmp::min(x, stopped_idx)),
                        None => Some(stopped_idx),
                    };
                    vectors.push(vec);
                }
                ThreadCollect::StoppedByError { error } => return Self::StoppedByError { error },
            }
        }

        heap_sort_into(vectors, min_stopped_idx, &mut pinned_vec);

        match min_stopped_idx {
            Some(stopped_idx) => Self::StoppedByWhileCondition {
                pinned_vec,
                stopped_idx,
            },
            None => Self::AllCollected { pinned_vec },
        }
    }

    pub fn into_result(self) -> Result<P, <V::Fallibility as Fallibility>::Error> {
        match self {
            Self::AllCollected { pinned_vec } => Ok(pinned_vec),
            Self::StoppedByWhileCondition {
                pinned_vec,
                stopped_idx: _,
            } => Ok(pinned_vec),
            Self::StoppedByError { error } => Err(error),
        }
    }
}