orx_parallel/collect_into/
split_vec.rs1use super::par_collect_into::ParCollectIntoCore;
2use crate::{
3 computations::{M, Values, X, Xfx},
4 runner::ParallelRunner,
5};
6use orx_concurrent_iter::ConcurrentIter;
7use orx_pinned_vec::PinnedVec;
8use orx_split_vec::{GrowthWithConstantTimeAccess, PseudoDefault, SplitVec};
9
10impl<O, G> ParCollectIntoCore<O> for SplitVec<O, G>
11where
12 O: Send + Sync,
13 G: GrowthWithConstantTimeAccess,
14 Self: PseudoDefault,
15{
16 type BridgePinnedVec = Self;
17
18 fn empty(iter_len: Option<usize>) -> Self {
19 let mut vec = Self::pseudo_default();
20 reserve(&mut vec, iter_len);
21 vec
22 }
23
24 fn m_collect_into<R, I, M1>(mut self, m: M<I, O, M1>) -> Self
25 where
26 R: ParallelRunner,
27 I: ConcurrentIter,
28 M1: Fn(I::Item) -> O + Send + Sync,
29 {
30 reserve(&mut self, m.par_len());
31 let (_num_spawned, pinned_vec) = m.collect_into::<R, _>(self);
32 pinned_vec
33 }
34
35 fn x_collect_into<R, I, Vo, M1>(mut self, x: X<I, Vo, M1>) -> Self
36 where
37 R: ParallelRunner,
38 I: ConcurrentIter,
39 Vo: Values<Item = O> + Send + Sync,
40 Vo::Item: Send + Sync,
41 M1: Fn(I::Item) -> Vo + Send + Sync,
42 {
43 reserve(&mut self, x.par_len());
44 let (_num_spawned, pinned_vec) = x.collect_into::<R, _>(self);
45 pinned_vec
46 }
47
48 fn xfx_collect_into<R, I, Vt, Vo, M1, F, M2>(mut self, mfm: Xfx<I, Vt, Vo, M1, F, M2>) -> Self
49 where
50 R: ParallelRunner,
51 I: ConcurrentIter,
52 Vt: Values + Send + Sync,
53 Vt::Item: Send + Sync,
54 Vo: Values<Item = O> + Send + Sync,
55 M1: Fn(I::Item) -> Vt + Send + Sync,
56 F: Fn(&Vt::Item) -> bool + Send + Sync,
57 M2: Fn(Vt::Item) -> Vo + Send + Sync,
58 {
59 reserve(&mut self, mfm.par_len());
60 let (_num_spawned, pinned_vec) = mfm.collect_into::<R, _>(self);
61 pinned_vec
62 }
63
64 #[cfg(test)]
67 fn length(&self) -> usize {
68 self.len()
69 }
70}
71
72fn reserve<T, G: GrowthWithConstantTimeAccess>(
73 split_vec: &mut SplitVec<T, G>,
74 len_to_extend: Option<usize>,
75) {
76 match len_to_extend {
77 None => {
78 let capacity_bound = split_vec.capacity_bound();
79 split_vec.reserve_maximum_concurrent_capacity(capacity_bound)
80 }
81 Some(len) => split_vec.reserve_maximum_concurrent_capacity(split_vec.len() + len),
82 };
83}