orx_parallel/collect_into/
vec.rs

1use super::par_collect_into::ParCollectIntoCore;
2use crate::computations::{M, Values, X, Xfx};
3use crate::runner::ParallelRunner;
4use orx_concurrent_iter::ConcurrentIter;
5use orx_fixed_vec::FixedVec;
6use orx_pinned_vec::PinnedVec;
7use orx_split_vec::{GrowthWithConstantTimeAccess, SplitVec};
8
9impl<O> ParCollectIntoCore<O> for Vec<O>
10where
11    O: Send + Sync,
12{
13    type BridgePinnedVec = FixedVec<O>;
14
15    fn empty(iter_len: Option<usize>) -> Self {
16        match iter_len {
17            Some(len) => Vec::with_capacity(len),
18            None => Vec::new(),
19        }
20    }
21
22    fn m_collect_into<R, I, M1>(mut self, m: M<I, O, M1>) -> Self
23    where
24        R: ParallelRunner,
25        I: ConcurrentIter,
26        M1: Fn(I::Item) -> O + Send + Sync,
27    {
28        match m.par_len() {
29            None => {
30                let split_vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
31                let split_vec = split_vec.m_collect_into::<R, _, _>(m);
32                extend_from_split(self, split_vec)
33            }
34            Some(len) => {
35                self.reserve(len);
36                let fixed_vec = FixedVec::from(self);
37                let (_num_spawned, fixed_vec) = m.collect_into::<R, _>(fixed_vec);
38                Vec::from(fixed_vec)
39            }
40        }
41    }
42
43    fn x_collect_into<R, I, Vo, M1>(self, x: X<I, Vo, M1>) -> Self
44    where
45        R: ParallelRunner,
46        I: ConcurrentIter,
47        Vo: Values<Item = O> + Send + Sync,
48        Vo::Item: Send + Sync,
49        M1: Fn(I::Item) -> Vo + Send + Sync,
50    {
51        let split_vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
52        let split_vec = split_vec.x_collect_into::<R, _, _, _>(x);
53        extend_from_split(self, split_vec)
54    }
55
56    fn xfx_collect_into<R, I, Vt, Vo, M1, F, M2>(self, xfx: Xfx<I, Vt, Vo, M1, F, M2>) -> Self
57    where
58        R: ParallelRunner,
59        I: ConcurrentIter,
60        Vt: Values + Send + Sync,
61        Vt::Item: Send + Sync,
62        Vo: Values<Item = O> + Send + Sync,
63        M1: Fn(I::Item) -> Vt + Send + Sync,
64        F: Fn(&Vt::Item) -> bool + Send + Sync,
65        M2: Fn(Vt::Item) -> Vo + Send + Sync,
66    {
67        let split_vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
68        let split_vec = split_vec.xfx_collect_into::<R, _, _, _, _, _, _>(xfx);
69        extend_from_split(self, split_vec)
70    }
71
72    // test
73
74    #[cfg(test)]
75    fn length(&self) -> usize {
76        self.len()
77    }
78}
79
80fn extend_from_split<T, G>(mut initial_vec: Vec<T>, collected_split_vec: SplitVec<T, G>) -> Vec<T>
81where
82    G: GrowthWithConstantTimeAccess,
83{
84    match initial_vec.len() {
85        0 => collected_split_vec.to_vec(),
86        _ => {
87            initial_vec.reserve(collected_split_vec.len());
88            initial_vec.extend(collected_split_vec);
89            initial_vec
90        }
91    }
92}