orx_parallel/collect_into/
split_vec.rs

1use super::par_collect_into::ParCollectIntoCore;
2use crate::{
3    computations::{M, Values, X, Xfx},
4    runner::ParallelRunner,
5};
6use orx_concurrent_iter::ConcurrentIter;
7use orx_pinned_vec::PinnedVec;
8use orx_split_vec::{GrowthWithConstantTimeAccess, PseudoDefault, SplitVec};
9
10impl<O, G> ParCollectIntoCore<O> for SplitVec<O, G>
11where
12    O: Send + Sync,
13    G: GrowthWithConstantTimeAccess,
14    Self: PseudoDefault,
15{
16    type BridgePinnedVec = Self;
17
18    fn empty(iter_len: Option<usize>) -> Self {
19        let mut vec = Self::pseudo_default();
20        reserve(&mut vec, iter_len);
21        vec
22    }
23
24    fn m_collect_into<R, I, M1>(mut self, m: M<I, O, M1>) -> Self
25    where
26        R: ParallelRunner,
27        I: ConcurrentIter,
28        M1: Fn(I::Item) -> O + Send + Sync,
29    {
30        reserve(&mut self, m.par_len());
31        let (_num_spawned, pinned_vec) = m.collect_into::<R, _>(self);
32        pinned_vec
33    }
34
35    fn x_collect_into<R, I, Vo, M1>(mut self, x: X<I, Vo, M1>) -> Self
36    where
37        R: ParallelRunner,
38        I: ConcurrentIter,
39        Vo: Values<Item = O> + Send + Sync,
40        Vo::Item: Send + Sync,
41        M1: Fn(I::Item) -> Vo + Send + Sync,
42    {
43        reserve(&mut self, x.par_len());
44        let (_num_spawned, pinned_vec) = x.collect_into::<R, _>(self);
45        pinned_vec
46    }
47
48    fn xfx_collect_into<R, I, Vt, Vo, M1, F, M2>(mut self, mfm: Xfx<I, Vt, Vo, M1, F, M2>) -> Self
49    where
50        R: ParallelRunner,
51        I: ConcurrentIter,
52        Vt: Values + Send + Sync,
53        Vt::Item: Send + Sync,
54        Vo: Values<Item = O> + Send + Sync,
55        M1: Fn(I::Item) -> Vt + Send + Sync,
56        F: Fn(&Vt::Item) -> bool + Send + Sync,
57        M2: Fn(Vt::Item) -> Vo + Send + Sync,
58    {
59        reserve(&mut self, mfm.par_len());
60        let (_num_spawned, pinned_vec) = mfm.collect_into::<R, _>(self);
61        pinned_vec
62    }
63
64    // test
65
66    #[cfg(test)]
67    fn length(&self) -> usize {
68        self.len()
69    }
70}
71
72fn reserve<T, G: GrowthWithConstantTimeAccess>(
73    split_vec: &mut SplitVec<T, G>,
74    len_to_extend: Option<usize>,
75) {
76    match len_to_extend {
77        None => {
78            let capacity_bound = split_vec.capacity_bound();
79            split_vec.reserve_maximum_concurrent_capacity(capacity_bound)
80        }
81        Some(len) => split_vec.reserve_maximum_concurrent_capacity(split_vec.len() + len),
82    };
83}