orx_parallel/
par_iter_option.rs

1use crate::default_fns::{map_count, reduce_sum, reduce_unit};
2use crate::runner::{DefaultRunner, ParallelRunner};
3use crate::{
4    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParThreadPool, RunnerWithPool, Sum,
5};
6use core::cmp::Ordering;
7
8/// A parallel iterator for which the computation either completely succeeds,
9/// or fails and **early exits** with None.
10///
11/// # Examples
12///
13/// To demonstrate the difference of fallible iterator's behavior, consider the following simple example.
14/// We parse a series of strings into integers.
15/// We try this twice:
16/// * in the first one, all inputs are good, hence, we obtain Some of parsed numbers,
17/// * in the second one, the value in the middle is faulty, we expect the computation to fail.
18///
19/// In the following, we try to achieve this both with a regular parallel iterator ([`ParIter`]) and a fallible
20/// parallel iterator, `ParIterOption` in this case.
21///
22/// You may notice the following differences:
23/// * In the regular iterator, it is not very convenient to keep both the resulting numbers and a potential error.
24///   Here, we make use of `filter_map`.
25/// * On the other hand, the `collect` method of the fallible iterator directly returns an `Option` of the computation
26///   which is either Some of all parsed numbers or None if any computation fails.
27/// * Also importantly note that the regular iterator will try to parse all the strings, regardless of how many times
28///   the parsing fails.
29/// * Fallible iterator, on the other hand, stops immediately after observing the first None and short circuits the
30///   computation.
31///
32/// ```
33/// use orx_parallel::*;
34///
35/// let expected_results = [Some((0..100).collect::<Vec<_>>()), None];
36///
37/// for expected in expected_results {
38///     let expected_some = expected.is_some();
39///     let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
40///     if !expected_some {
41///         inputs.insert(50, "x".to_string()); // plant an error case
42///     }
43///
44///     // regular parallel iterator
45///     let results = inputs.par().map(|x| x.parse::<u32>().ok());
46///     let numbers: Vec<_> = results.filter_map(|x| x).collect();
47///     if expected_some {
48///         assert_eq!(&expected, &Some(numbers));
49///     } else {
50///         // otherwise, numbers contains some numbers, but we are not sure
51///         // if the computation completely succeeded or not
52///     }
53///
54///     // fallible parallel iterator
55///     let results = inputs.par().map(|x| x.parse::<u32>().ok());
56///     let result: Option<Vec<_>> = results.into_fallible_option().collect();
57///     assert_eq!(&expected, &result);
58/// }
59/// ```
60///
61/// These differences are not specific to `collect`; all fallible iterator methods return an option.
62/// The following demonstrate reduction examples, where the result is either the reduced value if the entire computation
63/// succeeds, or None.
64///
65/// ```
66/// use orx_parallel::*;
67///
68/// for will_fail in [false, true] {
69///     let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
70///     if will_fail {
71///         inputs.insert(50, "x".to_string()); // plant an error case
72///     }
73///
74///     // sum
75///     let results = inputs.par().map(|x| x.parse::<u32>().ok());
76///     let result: Option<u32> = results.into_fallible_option().sum();
77///     match will_fail {
78///         true => assert_eq!(result, None),
79///         false => assert_eq!(result, Some(4950)),
80///     }
81///
82///     // max
83///     let results = inputs.par().map(|x| x.parse::<u32>().ok());
84///     let result: Option<Option<u32>> = results.into_fallible_option().max();
85///     match will_fail {
86///         true => assert_eq!(result, None),
87///         false => assert_eq!(result, Some(Some(99))),
88///     }
89/// }
90/// ```
91///
92/// Finally, similar to regular iterators, a fallible parallel iterator can be tranformed using iterator methods.
93/// However, the transformation is on the success path, the failure case of None always short circuits and returns None.
94///
95/// ```
96/// use orx_parallel::*;
97///
98/// for will_fail in [false, true] {
99///     let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
100///     if will_fail {
101///         inputs.insert(50, "x".to_string()); // plant an error case
102///     }
103///
104///     // fallible iter
105///     let results = inputs.par().map(|x| x.parse::<u32>().ok());
106///     let fallible = results.into_fallible_option();
107///
108///     // transformations
109///
110///     let result: Option<usize> = fallible
111///         .filter(|x| x % 2 == 1)                                 // Item: u32
112///         .map(|x| 3 * x)                                         // Item: u32
113///         .filter_map(|x| (x % 10 != 0).then_some(x))             // Item: u32
114///         .flat_map(|x| [x.to_string(), (10 * x).to_string()])    // Item: String
115///         .map(|x| x.len())                                       // Item: usize
116///         .sum();
117///
118///     match will_fail {
119///         true => assert_eq!(result, None),
120///         false => assert_eq!(result, Some(312)),
121///     }
122/// }
123/// ```
124///
125/// [`ParIter`]: crate::ParIter
126pub trait ParIterOption<R = DefaultRunner>
127where
128    R: ParallelRunner,
129{
130    /// Type of the success element, to be received as the Some variant iff the entire computation succeeds.
131    type Item;
132
133    // params transformations
134
135    /// Sets the number of threads to be used in the parallel execution.
136    /// Integers can be used as the argument with the following mapping:
137    ///
138    /// * `0` -> `NumThreads::Auto`
139    /// * `1` -> `NumThreads::sequential()`
140    /// * `n > 0` -> `NumThreads::Max(n)`
141    ///
142    /// See [`NumThreads`] and [`crate::ParIter::num_threads`] for details.
143    fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
144
145    /// Sets the number of elements to be pulled from the concurrent iterator during the
146    /// parallel execution. When integers are used as argument, the following mapping applies:
147    ///
148    /// * `0` -> `ChunkSize::Auto`
149    /// * `n > 0` -> `ChunkSize::Exact(n)`
150    ///
151    /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
152    ///
153    /// See [`ChunkSize`] and [`crate::ParIter::chunk_size`] for details.
154    fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
155
156    /// Sets the iteration order of the parallel computation.
157    ///
158    /// See [`IterationOrder`] and [`crate::ParIter::iteration_order`] for details.
159    fn iteration_order(self, order: IterationOrder) -> Self;
160
161    /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
162    ///
163    /// See [`ParIter::with_runner`] for details.
164    ///
165    /// [`DefaultRunner`]: crate::DefaultRunner
166    /// [`ParIter::with_runner`]: crate::ParIter::with_runner
167    fn with_runner<Q: ParallelRunner>(
168        self,
169        orchestrator: Q,
170    ) -> impl ParIterOption<Q, Item = Self::Item>;
171
172    /// Rather than [`DefaultPool`], uses the parallel runner with the given `pool` implementing
173    /// [`ParThreadPool`].
174    ///
175    /// See [`ParIter::with_pool`] for details.
176    ///
177    /// [`DefaultPool`]: crate::DefaultPool
178    /// [`ParIter::with_pool`]: crate::ParIter::with_pool
179    fn with_pool<P: ParThreadPool>(
180        self,
181        pool: P,
182    ) -> impl ParIterOption<RunnerWithPool<P, R::Executor>, Item = Self::Item>
183    where
184        Self: Sized,
185    {
186        let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
187        self.with_runner(runner)
188    }
189
190    // computation transformations
191
192    /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
193    ///
194    /// Transformation is only for the success path where all elements are of the `Some` variant.
195    /// Any observation of a `None` case short-circuits the computation and immediately returns None.
196    ///
197    /// # Examples
198    ///
199    /// ```
200    /// use orx_parallel::*;
201    ///
202    /// // all succeeds
203    /// let a: Vec<Option<u32>> = vec![Some(1), Some(2), Some(3)];
204    /// let iter = a.into_par().into_fallible_option().map(|x| 2 * x);
205    ///
206    /// let b: Option<Vec<_>> = iter.collect();
207    /// assert_eq!(b, Some(vec![2, 4, 6]));
208    ///
209    /// // at least one fails
210    /// let a = vec![Some(1), None, Some(3)];
211    /// let iter = a.into_par().into_fallible_option().map(|x| 2 * x);
212    ///
213    /// let b: Option<Vec<_>> = iter.collect();
214    /// assert_eq!(b, None);
215    /// ```
216    fn map<Out, Map>(self, map: Map) -> impl ParIterOption<R, Item = Out>
217    where
218        Self: Sized,
219        Map: Fn(Self::Item) -> Out + Sync + Clone,
220        Out: Send;
221
222    /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
223    ///
224    /// Transformation is only for the success path where all elements are of the `Some` variant.
225    /// Any observation of a `None` case short-circuits the computation and immediately returns None.
226    ///
227    /// # Examples
228    ///
229    /// ```
230    /// use orx_parallel::*;
231    ///
232    /// // all succeeds
233    /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
234    /// let iter = a.into_par().into_fallible_option().filter(|x| x % 2 == 1);
235    ///
236    /// let b = iter.sum();
237    /// assert_eq!(b, Some(1 + 3));
238    ///
239    /// // at least one fails
240    /// let a = vec![Some(1), None, Some(3)];
241    /// let iter = a.into_par().into_fallible_option().filter(|x| x % 2 == 1);
242    ///
243    /// let b = iter.sum();
244    /// assert_eq!(b, None);
245    /// ```
246    fn filter<Filter>(self, filter: Filter) -> impl ParIterOption<R, Item = Self::Item>
247    where
248        Self: Sized,
249        Filter: Fn(&Self::Item) -> bool + Sync + Clone,
250        Self::Item: Send;
251
252    /// Creates an iterator that works like map, but flattens nested structure.
253    ///
254    /// Transformation is only for the success path where all elements are of the `Some` variant.
255    /// Any observation of a `None` case short-circuits the computation and immediately returns None.
256    ///
257    /// # Examples
258    ///
259    /// ```
260    /// use orx_parallel::*;
261    ///
262    /// // all succeeds
263    /// let words: Vec<Option<&str>> = vec![Some("alpha"), Some("beta"), Some("gamma")];
264    ///
265    /// let all_chars: Option<Vec<_>> = words
266    ///     .into_par()
267    ///     .into_fallible_option()
268    ///     .flat_map(|s| s.chars()) // chars() returns an iterator
269    ///     .collect();
270    ///
271    /// let merged: Option<String> = all_chars.map(|chars| chars.iter().collect());
272    /// assert_eq!(merged, Some("alphabetagamma".to_string()));
273    ///
274    /// // at least one fails
275    /// let words: Vec<Option<&str>> = vec![Some("alpha"), Some("beta"), None, Some("gamma")];
276    ///
277    /// let all_chars: Option<Vec<_>> = words
278    ///     .into_par()
279    ///     .into_fallible_option()
280    ///     .flat_map(|s| s.chars()) // chars() returns an iterator
281    ///     .collect();
282    ///
283    /// let merged: Option<String> = all_chars.map(|chars| chars.iter().collect());
284    /// assert_eq!(merged, None);
285    /// ```
286    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIterOption<R, Item = IOut::Item>
287    where
288        Self: Sized,
289        IOut: IntoIterator,
290        IOut::Item: Send,
291        FlatMap: Fn(Self::Item) -> IOut + Sync + Clone;
292
293    /// Creates an iterator that both filters and maps.
294    ///
295    /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
296    ///
297    /// `filter_map` can be used to make chains of `filter` and `map` more concise.
298    /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
299    ///
300    /// # Examples
301    ///
302    /// ```
303    /// use orx_parallel::*;
304    ///
305    /// // all succeeds
306    /// let a: Vec<Option<&str>> = vec![Some("1"), Some("two"), Some("NaN"), Some("four"), Some("5")];
307    ///
308    /// let numbers: Option<Vec<_>> = a
309    ///     .into_par()
310    ///     .into_fallible_option()
311    ///     .filter_map(|s| s.parse::<usize>().ok())
312    ///     .collect();
313    ///
314    /// assert_eq!(numbers, Some(vec![1, 5]));
315    ///
316    /// // at least one fails
317    /// let a: Vec<Option<&str>> = vec![Some("1"), Some("two"), None, Some("four"), Some("5")];
318    ///
319    /// let numbers: Option<Vec<_>> = a
320    ///     .into_par()
321    ///     .into_fallible_option()
322    ///     .filter_map(|s| s.parse::<usize>().ok())
323    ///     .collect();
324    ///
325    /// assert_eq!(numbers, None);
326    /// ```
327    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIterOption<R, Item = Out>
328    where
329        Self: Sized,
330        FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone,
331        Out: Send;
332
333    /// Does something with each successful element of an iterator, passing the value on, provided that all elements are of Some variant;
334    /// short-circuits and returns None otherwise.
335    ///
336    /// When using iterators, you’ll often chain several of them together.
337    /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
338    /// To do that, insert a call to `inspect()`.
339    ///
340    /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
341    /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
342    ///
343    /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
344    /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
345    /// collect some intermediate values during parallel execution for further inspection.
346    /// The following example demonstrates such a use case.
347    ///
348    /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
349    ///
350    /// ```
351    /// use orx_parallel::*;
352    /// use orx_concurrent_bag::*;
353    /// use std::num::ParseIntError;
354    ///
355    /// // all succeeds
356    /// let a: Vec<Option<u32>> = ["1", "4", "2", "3"]
357    ///     .into_iter()
358    ///     .map(|x| x.parse::<u32>().ok())
359    ///     .collect();
360    ///
361    /// // let's add some inspect() calls to investigate what's happening
362    /// // - log some events
363    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
364    /// let bag = ConcurrentBag::new();
365    ///
366    /// let sum = a
367    ///     .par()
368    ///     .cloned()
369    ///     .into_fallible_option()
370    ///     .inspect(|x| println!("about to filter: {x}"))
371    ///     .filter(|x| x % 2 == 0)
372    ///     .inspect(|x| {
373    ///         bag.push(*x);
374    ///         println!("made it through filter: {x}");
375    ///     })
376    ///     .sum();
377    /// assert_eq!(sum, Some(4 + 2));
378    ///
379    /// let mut values_made_through = bag.into_inner();
380    /// values_made_through.sort();
381    /// assert_eq!(values_made_through, [2, 4]);
382    ///
383    /// // at least one fails
384    /// let a: Vec<Option<u32>> = ["1", "4", "x", "3"]
385    ///     .into_iter()
386    ///     .map(|x| x.parse::<u32>().ok())
387    ///     .collect();
388    ///
389    /// // let's add some inspect() calls to investigate what's happening
390    /// // - log some events
391    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
392    /// let bag = ConcurrentBag::new();
393    ///
394    /// let sum = a
395    ///     .par()
396    ///     .cloned()
397    ///     .into_fallible_option()
398    ///     .inspect(|x| println!("about to filter: {x}"))
399    ///     .filter(|x| x % 2 == 0)
400    ///     .inspect(|x| {
401    ///         bag.push(*x);
402    ///         println!("made it through filter: {x}");
403    ///     })
404    ///     .sum();
405    /// assert_eq!(sum, None);
406    /// ```
407    fn inspect<Operation>(self, operation: Operation) -> impl ParIterOption<R, Item = Self::Item>
408    where
409        Self: Sized,
410        Operation: Fn(&Self::Item) + Sync + Clone,
411        Self::Item: Send;
412
413    // collect
414
415    /// Collects all the items from an iterator into a collection iff all elements are of Some variant.
416    /// Early exits and returns None if any of the elements is None.
417    ///
418    /// This is useful when you already have a collection and want to add the iterator items to it.
419    ///
420    /// The collection is passed in as owned value, and returned back with the additional elements.
421    ///
422    /// All collections implementing [`ParCollectInto`] can be used to collect into.
423    ///
424    /// [`ParCollectInto`]: crate::ParCollectInto
425    ///
426    /// # Examples
427    ///
428    /// ```
429    /// use orx_parallel::*;
430    ///
431    /// let vec: Vec<i32> = vec![0, 1];
432    ///
433    /// // all succeeds
434    /// let result = ["1", "2", "3"]
435    ///     .into_par()
436    ///     .map(|x| x.parse::<i32>().ok())
437    ///     .into_fallible_option()
438    ///     .map(|x| x * 10)
439    ///     .collect_into(vec);
440    /// assert_eq!(result, Some(vec![0, 1, 10, 20, 30]));
441    ///
442    /// let vec = result.unwrap();
443    ///
444    /// // at least one fails
445    ///
446    /// let result = ["1", "x!", "3"]
447    ///     .into_par()
448    ///     .map(|x| x.parse::<i32>().ok())
449    ///     .into_fallible_option()
450    ///     .map(|x| x * 10)
451    ///     .collect_into(vec);
452    /// assert_eq!(result, None);
453    /// ```
454    fn collect_into<C>(self, output: C) -> Option<C>
455    where
456        Self::Item: Send,
457        C: ParCollectInto<Self::Item>;
458
459    /// Transforms an iterator into a collection iff all elements are of Ok variant.
460    /// Early exits and returns the error if any of the elements is an Err.
461    ///
462    /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
463    /// the type of the result collection; or turbofish annotation can be used.
464    ///
465    /// All collections implementing [`ParCollectInto`] can be used to collect into.
466    ///
467    /// [`ParCollectInto`]: crate::ParCollectInto
468    ///
469    /// # Examples
470    ///
471    /// ```
472    /// use orx_parallel::*;
473    ///
474    /// // all succeeds
475    ///
476    /// let result_doubled: Option<Vec<i32>> = ["1", "2", "3"]
477    ///     .into_par()
478    ///     .map(|x| x.parse::<i32>().ok())
479    ///     .into_fallible_option()
480    ///     .map(|x| x * 2)
481    ///     .collect();
482    ///
483    /// assert_eq!(result_doubled, Some(vec![2, 4, 6]));
484    ///
485    /// // at least one fails
486    ///
487    /// let result_doubled: Option<Vec<i32>> = ["1", "x!", "3"]
488    ///     .into_par()
489    ///     .map(|x| x.parse::<i32>().ok())
490    ///     .into_fallible_option()
491    ///     .map(|x| x * 2)
492    ///     .collect();
493    ///
494    /// assert_eq!(result_doubled, None);
495    /// ```
496    fn collect<C>(self) -> Option<C>
497    where
498        Self::Item: Send,
499        C: ParCollectInto<Self::Item>;
500
501    // reduce
502
503    /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
504    /// Early exits and returns None if any of the elements is None.
505    ///
506    /// If the iterator is empty, returns `Some(None)`; otherwise, returns `Some` of result of the reduction.
507    ///
508    /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
509    ///
510    /// # Example
511    ///
512    /// ```
513    /// use orx_parallel::*;
514    ///
515    /// // all succeeds
516    /// let reduced: Option<Option<u32>> = (1..10)
517    ///     .par()
518    ///     .map(|x| 100u32.checked_div(x as u32))
519    ///     .into_fallible_option()
520    ///     .reduce(|acc, e| acc + e);
521    /// assert_eq!(reduced, Some(Some(281)));
522    ///
523    /// // all succeeds - empty iterator
524    /// let reduced: Option<Option<u32>> = (1..1)
525    ///     .par()
526    ///     .map(|x| 100u32.checked_div(x as u32))
527    ///     .into_fallible_option()
528    ///     .reduce(|acc, e| acc + e);
529    /// assert_eq!(reduced, Some(None));
530    ///
531    /// // at least one fails
532    /// let reduced: Option<Option<u32>> = (0..10)
533    ///     .par()
534    ///     .map(|x| 100u32.checked_div(x as u32))
535    ///     .into_fallible_option()
536    ///     .reduce(|acc, e| acc + e);
537    /// assert_eq!(reduced, None);
538    /// ```
539    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Option<Self::Item>>
540    where
541        Self::Item: Send,
542        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
543
544    /// Tests if every element of the iterator matches a predicate.
545    /// Early exits and returns None if any of the elements is None.
546    ///
547    /// `all` takes a `predicate` that returns true or false.
548    /// It applies this closure to each Ok element of the iterator,
549    /// and if they all return true, then so does `all`.
550    /// If any of them returns false, it returns false.
551    ///
552    /// Note that `all` computation is itself also short-circuiting; in other words, it will stop processing as soon as it finds a false,
553    /// given that no matter what else happens, the result will also be false.
554    ///
555    /// Therefore, in case the fallible iterator contains both a None element and a Some element which violates the `predicate`,
556    /// the result is **not deterministic**. It might be the `None` if it is observed first; or `Some(false)` if the violation is observed first.
557    ///
558    /// On the other hand, when it returns `Some(true)`, we are certain that all elements are of `Some` variant and all satisfy the `predicate`.
559    ///
560    /// An empty iterator returns `Some(true)`.
561    ///
562    /// # Examples
563    ///
564    /// ```
565    /// use orx_parallel::*;
566    ///
567    /// // all Some
568    /// let result = vec!["1", "2", "3"]
569    ///     .into_par()
570    ///     .map(|x| x.parse::<i32>().ok())
571    ///     .into_fallible_option()
572    ///     .all(|x| *x > 0);
573    /// assert_eq!(result, Some(true));
574    ///
575    /// let result = vec!["1", "2", "3"]
576    ///     .into_par()
577    ///     .map(|x| x.parse::<i32>().ok())
578    ///     .into_fallible_option()
579    ///     .all(|x| *x > 1);
580    /// assert_eq!(result, Some(false));
581    ///
582    /// let result = Vec::<&str>::new()
583    ///     .into_par()
584    ///     .map(|x| x.parse::<i32>().ok())
585    ///     .into_fallible_option()
586    ///     .all(|x| *x > 1);
587    /// assert_eq!(result, Some(true)); // empty iterator
588    ///
589    /// // at least one None
590    /// let result = vec!["1", "x!", "3"]
591    ///     .into_par()
592    ///     .map(|x| x.parse::<i32>().ok())
593    ///     .into_fallible_option()
594    ///     .all(|x| *x > 0);
595    /// assert_eq!(result, None);
596    /// ```
597    fn all<Predicate>(self, predicate: Predicate) -> Option<bool>
598    where
599        Self: Sized,
600        Self::Item: Send,
601        Predicate: Fn(&Self::Item) -> bool + Sync,
602    {
603        let violates = |x: &Self::Item| !predicate(x);
604        self.find(violates).map(|x| x.is_none())
605    }
606
607    /// Tests if any element of the iterator matches a predicate.
608    /// Early exits and returns None if any of the elements is None.
609    ///
610    /// `any` takes a `predicate` that returns true or false.
611    /// It applies this closure to each element of the iterator,
612    /// and if any of the elements returns true, then so does `any`.
613    /// If all of them return false, it returns false.
614    ///
615    /// Note that `any` computation is itself also short-circuiting; in other words, it will stop processing as soon as it finds a true,
616    /// given that no matter what else happens, the result will also be true.
617    ///
618    /// Therefore, in case the fallible iterator contains both a None element and a Some element which satisfies the `predicate`,
619    /// the result is **not deterministic**. It might be the `None` if it is observed first; or `Some(true)` if element satisfying the predicate
620    /// is observed first.
621    ///
622    /// On the other hand, when it returns `Some(false)`, we are certain that all elements are of `Some` variant and none of them satisfies the `predicate`.
623    ///
624    /// An empty iterator returns `Some(false)`.
625    ///
626    /// # Examples
627    ///
628    /// ```
629    /// use orx_parallel::*;
630    ///
631    /// // all Some
632    /// let result = vec!["1", "2", "3"]
633    ///     .into_par()
634    ///     .map(|x| x.parse::<i32>().ok())
635    ///     .into_fallible_option()
636    ///     .any(|x| *x > 1);
637    /// assert_eq!(result, Some(true));
638    ///
639    /// let result = vec!["1", "2", "3"]
640    ///     .into_par()
641    ///     .map(|x| x.parse::<i32>().ok())
642    ///     .into_fallible_option()
643    ///     .any(|x| *x > 3);
644    /// assert_eq!(result, Some(false));
645    ///
646    /// let result = Vec::<&str>::new()
647    ///     .into_par()
648    ///     .map(|x| x.parse::<i32>().ok())
649    ///     .into_fallible_option()
650    ///     .any(|x| *x > 1);
651    /// assert_eq!(result, Some(false)); // empty iterator
652    ///
653    /// // at least one None
654    /// let result = vec!["1", "x!", "3"]
655    ///     .into_par()
656    ///     .map(|x| x.parse::<i32>().ok())
657    ///     .into_fallible_option()
658    ///     .any(|x| *x > 5);
659    /// assert_eq!(result, None);
660    /// ```
661    fn any<Predicate>(self, predicate: Predicate) -> Option<bool>
662    where
663        Self: Sized,
664        Self::Item: Send,
665        Predicate: Fn(&Self::Item) -> bool + Sync,
666    {
667        self.find(predicate).map(|x| x.is_some())
668    }
669
670    /// Consumes the iterator, counting the number of iterations and returning it.
671    /// Early exits and returns None if any of the elements is None.
672    ///
673    /// # Examples
674    ///
675    /// ```
676    /// use orx_parallel::*;
677    ///
678    /// // all Some
679    /// let result = vec!["1", "2", "3"]
680    ///     .into_par()
681    ///     .map(|x| x.parse::<i32>().ok())
682    ///     .into_fallible_option()
683    ///     .filter(|x| *x >= 2)
684    ///     .count();
685    /// assert_eq!(result, Some(2));
686    ///
687    /// // at least one None
688    /// let result = vec!["x!", "2", "3"]
689    ///     .into_par()
690    ///     .map(|x| x.parse::<i32>().ok())
691    ///     .into_fallible_option()
692    ///     .filter(|x| *x >= 2)
693    ///     .count();
694    /// assert_eq!(result, None);
695    /// ```
696    fn count(self) -> Option<usize>
697    where
698        Self: Sized,
699    {
700        self.map(map_count)
701            .reduce(reduce_sum)
702            .map(|x| x.unwrap_or(0))
703    }
704
705    /// Calls a closure on each element of an iterator, and returns `Ok(())` if all elements succeed.
706    /// Early exits and returns None if any of the elements is None.
707    ///
708    /// # Examples
709    ///
710    /// Basic usage:
711    ///
712    /// ```
713    /// use orx_parallel::*;
714    /// use std::sync::mpsc::channel;
715    ///
716    /// // all Some
717    /// let (tx, rx) = channel();
718    /// let result = vec!["0", "1", "2", "3", "4"]
719    ///     .into_par()
720    ///     .map(|x| x.parse::<i32>().ok())
721    ///     .into_fallible_option()
722    ///     .map(|x| x * 2 + 1)
723    ///     .for_each(move |x| tx.send(x).unwrap());
724    ///
725    /// assert_eq!(result, Some(()));
726    ///
727    /// let mut v: Vec<_> = rx.iter().collect();
728    /// v.sort(); // order can be mixed, since messages will be sent in parallel
729    /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
730    ///
731    /// // at least one None
732    /// let (tx, _rx) = channel();
733    /// let result = vec!["0", "1", "2", "x!", "4"]
734    ///     .into_par()
735    ///     .map(|x| x.parse::<i32>().ok())
736    ///     .into_fallible_option()
737    ///     .map(|x| x * 2 + 1)
738    ///     .for_each(move |x| tx.send(x).unwrap());
739    ///
740    /// assert_eq!(result, None);
741    /// ```
742    fn for_each<Operation>(self, operation: Operation) -> Option<()>
743    where
744        Self: Sized,
745        Operation: Fn(Self::Item) + Sync,
746    {
747        let map = |x| operation(x);
748        self.map(map).reduce(reduce_unit).map(|_| ())
749    }
750
751    /// Returns Some of maximum element of an iterator if all elements succeed.
752    /// If the iterator is empty, `Some(None)` is returned.
753    /// Early exits and returns None if any of the elements is None.
754    ///
755    /// # Examples
756    ///
757    /// ```
758    /// use orx_parallel::*;
759    ///
760    /// let a = vec![Some(1), Some(2), Some(3)];
761    /// assert_eq!(a.par().copied().into_fallible_option().max(), Some(Some(3)));
762    ///
763    /// let b: Vec<Option<i32>> = vec![];
764    /// assert_eq!(b.par().copied().into_fallible_option().max(), Some(None));
765    ///
766    /// let c = vec![Some(1), Some(2), None];
767    /// assert_eq!(c.par().copied().into_fallible_option().max(), None);
768    /// ```
769    fn max(self) -> Option<Option<Self::Item>>
770    where
771        Self: Sized,
772        Self::Item: Ord + Send,
773    {
774        self.reduce(Ord::max)
775    }
776
777    /// Returns the element that gives the maximum value with respect to the specified `compare` function.
778    /// If the iterator is empty, `Some(None)` is returned.
779    /// Early exits and returns None if any of the elements is None.
780    ///
781    /// ```
782    /// use orx_parallel::*;
783    ///
784    /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
785    /// assert_eq!(
786    ///     a.par()
787    ///         .copied()
788    ///         .into_fallible_option()
789    ///         .max_by(|a, b| a.cmp(b)),
790    ///     Some(Some(3))
791    /// );
792    ///
793    /// let b: Vec<Option<i32>> = vec![];
794    /// assert_eq!(
795    ///     b.par()
796    ///         .copied()
797    ///         .into_fallible_option()
798    ///         .max_by(|a, b| a.cmp(b)),
799    ///     Some(None)
800    /// );
801    ///
802    /// let c: Vec<Option<i32>> = vec![Some(1), Some(2), None];
803    /// assert_eq!(
804    ///     c.par()
805    ///         .copied()
806    ///         .into_fallible_option()
807    ///         .max_by(|a, b| a.cmp(b)),
808    ///     None
809    /// );
810    /// ```
811    fn max_by<Compare>(self, compare: Compare) -> Option<Option<Self::Item>>
812    where
813        Self: Sized,
814        Self::Item: Send,
815        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
816    {
817        let reduce = |x, y| match compare(&x, &y) {
818            Ordering::Greater | Ordering::Equal => x,
819            Ordering::Less => y,
820        };
821        self.reduce(reduce)
822    }
823
824    /// Returns the element that gives the maximum value from the specified function.
825    /// If the iterator is empty, `Some(None)` is returned.
826    /// Early exits and returns None if any of the elements is None.
827    ///
828    /// # Examples
829    ///
830    /// ```
831    /// use orx_parallel::*;
832    ///
833    /// let a: Vec<Option<i32>> = vec![Some(-1), Some(2), Some(-3)];
834    /// assert_eq!(
835    ///     a.par()
836    ///         .copied()
837    ///         .into_fallible_option()
838    ///         .max_by_key(|x| x.abs()),
839    ///     Some(Some(-3))
840    /// );
841    ///
842    /// let b: Vec<Option<i32>> = vec![];
843    /// assert_eq!(
844    ///     b.par()
845    ///         .copied()
846    ///         .into_fallible_option()
847    ///         .max_by_key(|x| x.abs()),
848    ///     Some(None)
849    /// );
850    ///
851    /// let c: Vec<Option<i32>> = vec![Some(1), Some(2), None];
852    /// assert_eq!(
853    ///     c.par()
854    ///         .copied()
855    ///         .into_fallible_option()
856    ///         .max_by_key(|x| x.abs()),
857    ///     None
858    /// );
859    /// ```
860    fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Option<Self::Item>>
861    where
862        Self: Sized,
863        Self::Item: Send,
864        Key: Ord,
865        GetKey: Fn(&Self::Item) -> Key + Sync,
866    {
867        let reduce = |x, y| match key(&x).cmp(&key(&y)) {
868            Ordering::Greater | Ordering::Equal => x,
869            Ordering::Less => y,
870        };
871        self.reduce(reduce)
872    }
873
874    /// Returns Some of minimum element of an iterator if all elements succeed.
875    /// If the iterator is empty, `Some(None)` is returned.
876    /// Early exits and returns None if any of the elements is None.
877    ///
878    /// # Examples
879    ///
880    /// ```
881    /// use orx_parallel::*;
882    ///
883    /// let a = vec![Some(1), Some(2), Some(3)];
884    /// assert_eq!(a.par().copied().into_fallible_option().min(), Some(Some(1)));
885    ///
886    /// let b: Vec<Option<i32>> = vec![];
887    /// assert_eq!(b.par().copied().into_fallible_option().min(), Some(None));
888    ///
889    /// let c = vec![Some(1), Some(2), None];
890    /// assert_eq!(c.par().copied().into_fallible_option().min(), None);
891    /// ```
892    fn min(self) -> Option<Option<Self::Item>>
893    where
894        Self: Sized,
895        Self::Item: Ord + Send,
896    {
897        self.reduce(Ord::min)
898    }
899
900    /// Returns the element that gives the minimum value with respect to the specified `compare` function.
901    /// If the iterator is empty, `Some(None)` is returned.
902    /// Early exits and returns None if any of the elements is None.
903    ///
904    /// ```
905    /// use orx_parallel::*;
906    ///
907    /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
908    /// assert_eq!(
909    ///     a.par()
910    ///         .copied()
911    ///         .into_fallible_option()
912    ///         .min_by(|a, b| a.cmp(b)),
913    ///     Some(Some(1))
914    /// );
915    ///
916    /// let b: Vec<Option<i32>> = vec![];
917    /// assert_eq!(
918    ///     b.par()
919    ///         .copied()
920    ///         .into_fallible_option()
921    ///         .min_by(|a, b| a.cmp(b)),
922    ///     Some(None)
923    /// );
924    ///
925    /// let c: Vec<Option<i32>> = vec![Some(1), Some(2), None];
926    /// assert_eq!(
927    ///     c.par()
928    ///         .copied()
929    ///         .into_fallible_option()
930    ///         .min_by(|a, b| a.cmp(b)),
931    ///     None
932    /// );
933    /// ```
934    fn min_by<Compare>(self, compare: Compare) -> Option<Option<Self::Item>>
935    where
936        Self: Sized,
937        Self::Item: Send,
938        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
939    {
940        let reduce = |x, y| match compare(&x, &y) {
941            Ordering::Less | Ordering::Equal => x,
942            Ordering::Greater => y,
943        };
944        self.reduce(reduce)
945    }
946
947    /// Returns the element that gives the minimum value from the specified function.
948    /// If the iterator is empty, `Some(None)` is returned.
949    /// Early exits and returns None if any of the elements is None.
950    ///
951    /// # Examples
952    ///
953    /// ```
954    /// use orx_parallel::*;
955    ///
956    /// let a: Vec<Option<i32>> = vec![Some(-1), Some(2), Some(-3)];
957    /// assert_eq!(
958    ///     a.par()
959    ///         .copied()
960    ///         .into_fallible_option()
961    ///         .min_by_key(|x| x.abs()),
962    ///     Some(Some(-1))
963    /// );
964    ///
965    /// let b: Vec<Option<i32>> = vec![];
966    /// assert_eq!(
967    ///     b.par()
968    ///         .copied()
969    ///         .into_fallible_option()
970    ///         .min_by_key(|x| x.abs()),
971    ///     Some(None)
972    /// );
973    ///
974    /// let c: Vec<Option<i32>> = vec![Some(1), Some(2), None];
975    /// assert_eq!(
976    ///     c.par()
977    ///         .copied()
978    ///         .into_fallible_option()
979    ///         .min_by_key(|x| x.abs()),
980    ///     None
981    /// );
982    /// ```
983    fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Option<Self::Item>>
984    where
985        Self: Sized,
986        Self::Item: Send,
987        Key: Ord,
988        GetKey: Fn(&Self::Item) -> Key + Sync,
989    {
990        let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
991            Ordering::Less | Ordering::Equal => x,
992            Ordering::Greater => y,
993        };
994        self.reduce(reduce)
995    }
996
997    /// Sums the elements of an iterator.
998    /// Early exits and returns None if any of the elements is None.
999    ///
1000    /// If the iterator is empty, returns zero; otherwise, returns `Some` of the sum.
1001    ///
1002    /// # Example
1003    ///
1004    /// ```
1005    /// use orx_parallel::*;
1006    ///
1007    /// // all succeeds
1008    /// let reduced: Option<u32> = (1..10)
1009    ///     .par()
1010    ///     .map(|x| 100u32.checked_div(x as u32))
1011    ///     .into_fallible_option()
1012    ///     .sum();
1013    /// assert_eq!(reduced, Some(281));
1014    ///
1015    /// // all succeeds - empty iterator
1016    /// let reduced: Option<u32> = (1..1)
1017    ///     .par()
1018    ///     .map(|x| 100u32.checked_div(x as u32))
1019    ///     .into_fallible_option()
1020    ///     .sum();
1021    /// assert_eq!(reduced, Some(0));
1022    ///
1023    /// // at least one fails
1024    /// let reduced: Option<u32> = (0..10)
1025    ///     .par()
1026    ///     .map(|x| 100u32.checked_div(x as u32))
1027    ///     .into_fallible_option()
1028    ///     .sum();
1029    /// assert_eq!(reduced, None);
1030    /// ```
1031    fn sum<Out>(self) -> Option<Out>
1032    where
1033        Self: Sized,
1034        Self::Item: Sum<Out>,
1035        Out: Send,
1036    {
1037        self.map(Self::Item::map)
1038            .reduce(Self::Item::reduce)
1039            .map(|x| x.unwrap_or(Self::Item::zero()))
1040    }
1041
1042    // early exit
1043
1044    /// Returns the first (or any) element of the iterator.
1045    /// If the iterator is empty, `Some(None)` is returned.
1046    /// Early exits and returns None if a None element is observed first.
1047    ///
1048    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1049    /// * any element is returned if `IterationOrder::Arbitrary` is set.
1050    ///
1051    /// Note that `find` itself is short-circuiting in addition to fallible computation.
1052    /// Therefore, in case the fallible iterator contains both a None and a Some element,
1053    /// the result is **not deterministic**:
1054    /// * it might be the `None` if it is observed first;
1055    /// * or `Some(element)` if the Some element is observed first.
1056    ///
1057    /// # Examples
1058    ///
1059    /// ```
1060    /// use orx_parallel::*;
1061    ///
1062    /// let a: Vec<Option<i32>> = vec![];
1063    /// assert_eq!(a.par().copied().into_fallible_option().first(), Some(None));
1064    ///
1065    /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
1066    /// assert_eq!(
1067    ///     a.par().copied().into_fallible_option().first(),
1068    ///     Some(Some(1))
1069    /// );
1070    ///
1071    /// let a: Vec<Option<i32>> = vec![Some(1), None, Some(3)];
1072    /// let result = a.par().copied().into_fallible_option().first();
1073    /// // depends on whichever is observed first in parallel execution
1074    /// assert!(result == Some(Some(1)) || result == None);
1075    /// ```
1076    fn first(self) -> Option<Option<Self::Item>>
1077    where
1078        Self::Item: Send;
1079
1080    /// Returns the first (or any) element of the iterator that satisfies the `predicate`.
1081    /// If the iterator is empty, `Some(None)` is returned.
1082    /// Early exits and returns None if a None element is observed first.
1083    ///
1084    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1085    /// * any element is returned if `IterationOrder::Arbitrary` is set.
1086    ///
1087    /// Note that `find` itself is short-circuiting in addition to fallible computation.
1088    /// Therefore, in case the fallible iterator contains both a None and a Some element,
1089    /// the result is **not deterministic**:
1090    /// * it might be the `None` if it is observed first;
1091    /// * or `Some(element)` if the Some element satisfying the predicate is observed first.
1092    ///
1093    /// # Examples
1094    ///
1095    /// ```
1096    /// use orx_parallel::*;
1097    ///
1098    /// let a: Vec<Option<i32>> = vec![];
1099    /// assert_eq!(
1100    ///     a.par().copied().into_fallible_option().find(|x| *x > 2),
1101    ///     Some(None)
1102    /// );
1103    ///
1104    /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
1105    /// assert_eq!(
1106    ///     a.par().copied().into_fallible_option().find(|x| *x > 2),
1107    ///     Some(Some(3))
1108    /// );
1109    ///
1110    /// let a: Vec<Option<i32>> = vec![Some(1), None, Some(3)];
1111    /// let result = a.par().copied().into_fallible_option().find(|x| *x > 2);
1112    /// // depends on whichever is observed first in parallel execution
1113    /// assert!(result == Some(Some(3)) || result == None);
1114    /// ```
1115    fn find<Predicate>(self, predicate: Predicate) -> Option<Option<Self::Item>>
1116    where
1117        Self: Sized,
1118        Self::Item: Send,
1119        Predicate: Fn(&Self::Item) -> bool + Sync,
1120    {
1121        self.filter(&predicate).first()
1122    }
1123}
1124
1125pub trait IntoOption<T> {
1126    fn into_option(self) -> Option<T>;
1127
1128    fn into_result_with_unit_err(self) -> Result<T, ()>;
1129}
1130
1131impl<T> IntoOption<T> for Option<T> {
1132    #[inline(always)]
1133    fn into_option(self) -> Option<T> {
1134        self
1135    }
1136
1137    #[inline(always)]
1138    fn into_result_with_unit_err(self) -> Result<T, ()> {
1139        match self {
1140            Some(x) => Ok(x),
1141            None => Err(()),
1142        }
1143    }
1144}
1145
1146pub(crate) trait ResultIntoOption<T> {
1147    fn into_option(self) -> Option<T>;
1148}
1149
1150impl<T> ResultIntoOption<T> for Result<T, ()> {
1151    #[inline(always)]
1152    fn into_option(self) -> Option<T> {
1153        self.ok()
1154    }
1155}