orx_parallel/collect_into/
vec.rs1use super::par_collect_into::ParCollectIntoCore;
2use crate::computations::{M, Values, X, Xfx};
3use crate::runner::ParallelRunner;
4use orx_concurrent_iter::ConcurrentIter;
5use orx_fixed_vec::FixedVec;
6use orx_pinned_vec::PinnedVec;
7use orx_split_vec::{GrowthWithConstantTimeAccess, SplitVec};
8
9impl<O> ParCollectIntoCore<O> for Vec<O>
10where
11 O: Send + Sync,
12{
13 type BridgePinnedVec = FixedVec<O>;
14
15 fn empty(iter_len: Option<usize>) -> Self {
16 match iter_len {
17 Some(len) => Vec::with_capacity(len),
18 None => Vec::new(),
19 }
20 }
21
22 fn m_collect_into<R, I, M1>(mut self, m: M<I, O, M1>) -> Self
23 where
24 R: ParallelRunner,
25 I: ConcurrentIter,
26 M1: Fn(I::Item) -> O + Send + Sync,
27 {
28 match m.par_len() {
29 None => {
30 let split_vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
31 let split_vec = split_vec.m_collect_into::<R, _, _>(m);
32 extend_from_split(self, split_vec)
33 }
34 Some(len) => {
35 self.reserve(len);
36 let fixed_vec = FixedVec::from(self);
37 let (_num_spawned, fixed_vec) = m.collect_into::<R, _>(fixed_vec);
38 Vec::from(fixed_vec)
39 }
40 }
41 }
42
43 fn x_collect_into<R, I, Vo, M1>(self, x: X<I, Vo, M1>) -> Self
44 where
45 R: ParallelRunner,
46 I: ConcurrentIter,
47 Vo: Values<Item = O> + Send + Sync,
48 Vo::Item: Send + Sync,
49 M1: Fn(I::Item) -> Vo + Send + Sync,
50 {
51 let split_vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
52 let split_vec = split_vec.x_collect_into::<R, _, _, _>(x);
53 extend_from_split(self, split_vec)
54 }
55
56 fn xfx_collect_into<R, I, Vt, Vo, M1, F, M2>(self, xfx: Xfx<I, Vt, Vo, M1, F, M2>) -> Self
57 where
58 R: ParallelRunner,
59 I: ConcurrentIter,
60 Vt: Values + Send + Sync,
61 Vt::Item: Send + Sync,
62 Vo: Values<Item = O> + Send + Sync,
63 M1: Fn(I::Item) -> Vt + Send + Sync,
64 F: Fn(&Vt::Item) -> bool + Send + Sync,
65 M2: Fn(Vt::Item) -> Vo + Send + Sync,
66 {
67 let split_vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
68 let split_vec = split_vec.xfx_collect_into::<R, _, _, _, _, _, _>(xfx);
69 extend_from_split(self, split_vec)
70 }
71
72 #[cfg(test)]
75 fn length(&self) -> usize {
76 self.len()
77 }
78}
79
80fn extend_from_split<T, G>(mut initial_vec: Vec<T>, collected_split_vec: SplitVec<T, G>) -> Vec<T>
81where
82 G: GrowthWithConstantTimeAccess,
83{
84 match initial_vec.len() {
85 0 => collected_split_vec.to_vec(),
86 _ => {
87 initial_vec.reserve(collected_split_vec.len());
88 initial_vec.extend(collected_split_vec);
89 initial_vec
90 }
91 }
92}