orx_parallel/computational_variants/
fallible_option.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIterResult,
3    par_iter_option::{ParIterOption, ResultIntoOption},
4    runner::{DefaultRunner, ParallelRunner},
5};
6use core::marker::PhantomData;
7
8/// A parallel iterator for which the computation either completely succeeds,
9/// or fails and **early exits** with None.
10pub struct ParOption<F, T, R = DefaultRunner>
11where
12    R: ParallelRunner,
13    F: ParIterResult<R, Item = T, Err = ()>,
14{
15    par: F,
16    phantom: PhantomData<(T, R)>,
17}
18
19impl<F, T, R> ParOption<F, T, R>
20where
21    R: ParallelRunner,
22    F: ParIterResult<R, Item = T, Err = ()>,
23{
24    pub(crate) fn new(par: F) -> Self {
25        Self {
26            par,
27            phantom: PhantomData,
28        }
29    }
30}
31
32impl<F, T, R> ParIterOption<R> for ParOption<F, T, R>
33where
34    R: ParallelRunner,
35    F: ParIterResult<R, Item = T, Err = ()>,
36{
37    type Item = T;
38
39    // params transformations
40
41    fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self {
42        Self::new(self.par.num_threads(num_threads))
43    }
44
45    fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self {
46        Self::new(self.par.chunk_size(chunk_size))
47    }
48
49    fn iteration_order(self, order: IterationOrder) -> Self {
50        Self::new(self.par.iteration_order(order))
51    }
52
53    fn with_runner<Q: ParallelRunner>(
54        self,
55        orchestrator: Q,
56    ) -> impl ParIterOption<Q, Item = Self::Item> {
57        ParOption::new(self.par.with_runner(orchestrator))
58    }
59
60    // computation transformations
61
62    fn map<Out, Map>(self, map: Map) -> impl ParIterOption<R, Item = Out>
63    where
64        Map: Fn(Self::Item) -> Out + Sync + Clone,
65        Out: Send,
66    {
67        ParOption::new(self.par.map(map))
68    }
69
70    fn filter<Filter>(self, filter: Filter) -> impl ParIterOption<R, Item = Self::Item>
71    where
72        Self: Sized,
73        Filter: Fn(&Self::Item) -> bool + Sync + Clone,
74        Self::Item: Send,
75    {
76        ParOption::new(self.par.filter(filter))
77    }
78
79    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIterOption<R, Item = IOut::Item>
80    where
81        Self: Sized,
82        IOut: IntoIterator,
83        IOut::Item: Send,
84        FlatMap: Fn(Self::Item) -> IOut + Sync + Clone,
85    {
86        ParOption::new(self.par.flat_map(flat_map))
87    }
88
89    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIterOption<R, Item = Out>
90    where
91        Self: Sized,
92        FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone,
93        Out: Send,
94    {
95        ParOption::new(self.par.filter_map(filter_map))
96    }
97
98    fn inspect<Operation>(self, operation: Operation) -> impl ParIterOption<R, Item = Self::Item>
99    where
100        Self: Sized,
101        Operation: Fn(&Self::Item) + Sync + Clone,
102        Self::Item: Send,
103    {
104        ParOption::new(self.par.inspect(operation))
105    }
106
107    // collect
108
109    fn collect_into<C>(self, output: C) -> Option<C>
110    where
111        Self::Item: Send,
112        C: ParCollectInto<Self::Item>,
113    {
114        self.par.collect_into(output).into_option()
115    }
116
117    fn collect<C>(self) -> Option<C>
118    where
119        Self::Item: Send,
120        C: ParCollectInto<Self::Item>,
121    {
122        self.par.collect().into_option()
123    }
124
125    // reduce
126
127    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Option<Self::Item>>
128    where
129        Self::Item: Send,
130        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
131    {
132        self.par.reduce(reduce).into_option()
133    }
134
135    // early exit
136
137    fn first(self) -> Option<Option<Self::Item>>
138    where
139        Self::Item: Send,
140    {
141        self.par.first().into_option()
142    }
143}