orx_parallel/
par_iter_result.rs

1use crate::default_fns::{map_count, reduce_sum, reduce_unit};
2use crate::runner::{DefaultRunner, ParallelRunner};
3use crate::{ChunkSize, IterationOrder, NumThreads, ParThreadPool, RunnerWithPool, Sum};
4use crate::{ParCollectInto, ParIter, generic_values::fallible_iterators::ResultOfIter};
5use core::cmp::Ordering;
6
7/// A parallel iterator for which the computation either completely succeeds,
8/// or fails and **early exits** with an error.
9///
10/// # Examples
11///
12/// To demonstrate the difference of fallible iterator's behavior, consider the following simple example.
13/// We parse a series of strings into integers.
14/// We try this twice:
15/// * in the first one, all inputs are good, hence, we obtain Ok of parsed numbers,
16/// * in the second one, the value in the middle is faulty, we expect the computation to fail.
17///
18/// In the following, we try to achieve this both with a regular parallel iterator ([`ParIter`]) and a fallible
19/// parallel iterator, `ParIterResult` in this case.
20///
21/// You may notice the following differences:
22/// * In the regular iterator, it is not very convenient to keep both the resulting numbers and a potential error.
23///   Here, we make use of `filter_map` and simply ignore the error.
24/// * On the other hand, the `collect` method of the fallible iterator directly returns a `Result` of the computation
25///   which is either Ok of all parsed numbers or the error.
26/// * Also importantly note that the regular iterator will try to parse all the strings, regardless of how many times
27///   the parsing fails.
28/// * Fallible iterator, on the other hand, stops immediately after observing the first error and short circuits the
29///   computation.
30///
31/// ```
32/// use orx_parallel::*;
33/// use std::num::{IntErrorKind, ParseIntError};
34///
35/// let expected_results = [
36///     Ok((0..100).collect::<Vec<_>>()),
37///     Err(IntErrorKind::InvalidDigit),
38/// ];
39///
40/// for expected in expected_results {
41///     let expected_ok = expected.is_ok();
42///     let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
43///     if !expected_ok {
44///         inputs.insert(50, "x".to_string()); // plant an error case
45///     }
46///
47///     // regular parallel iterator
48///     let results = inputs.par().map(|x| x.parse::<u32>());
49///     let numbers: Vec<_> = results.filter_map(|x| x.ok()).collect();
50///     if expected_ok {
51///         assert_eq!(&expected, &Ok(numbers));
52///     } else {
53///         // we lost the error
54///     }
55///
56///     // fallible parallel iterator
57///     let results = inputs.par().map(|x| x.parse::<u32>());
58///     let result: Result<Vec<_>, ParseIntError> = results.into_fallible_result().collect();
59///     assert_eq!(&expected, &result.map_err(|x| x.kind().clone()));
60/// }
61/// ```
62///
63/// These differences are not specific to `collect`; all fallible iterator methods return a result.
64/// The following demonstrate reduction examples, where the result is either the reduced value if the entire computation
65/// succeeds, or the error.
66///
67/// ```
68/// use orx_parallel::*;
69/// use std::num::ParseIntError;
70///
71/// for will_fail in [false, true] {
72///     let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
73///     if will_fail {
74///         inputs.insert(50, "x".to_string()); // plant an error case
75///     }
76///
77///     // sum
78///     let results = inputs.par().map(|x| x.parse::<u32>());
79///     let result: Result<u32, ParseIntError> = results.into_fallible_result().sum();
80///     match will_fail {
81///         true => assert!(result.is_err()),
82///         false => assert_eq!(result, Ok(4950)),
83///     }
84///
85///     // max
86///     let results = inputs.par().map(|x| x.parse::<u32>());
87///     let result: Result<Option<u32>, ParseIntError> = results.into_fallible_result().max();
88///     match will_fail {
89///         true => assert!(result.is_err()),
90///         false => assert_eq!(result, Ok(Some(99))),
91///     }
92/// }
93/// ```
94///
95/// Finally, similar to regular iterators, a fallible parallel iterator can be tranformed using iterator methods.
96/// However, the transformation is on the success path, the error case always short circuits and returns the error.
97/// Notice in the following example that the success type keeps changing through transformations while the error type
98/// remains the same.
99///
100/// ```
101/// use orx_parallel::*;
102/// use std::num::ParseIntError;
103///
104/// for will_fail in [false, true] {
105///     let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
106///     if will_fail {
107///         inputs.insert(50, "x".to_string()); // plant an error case
108///     }
109///
110///     // fallible iter
111///     let results = inputs.par().map(|x| x.parse::<u32>());
112///     let fallible = results.into_fallible_result();              // Ok: u32, Error: ParseIntError
113///
114///     // transformations
115///
116///     let result: Result<usize, ParseIntError> = fallible
117///         .filter(|x| x % 2 == 1)                                 // Ok: u32, Error: ParseIntError
118///         .map(|x| 3 * x)                                         // Ok: u32, Error: ParseIntError
119///         .filter_map(|x| (x % 10 != 0).then_some(x))             // Ok: u32, Error: ParseIntError
120///         .flat_map(|x| [x.to_string(), (10 * x).to_string()])    // Ok: String, Error: ParseIntError
121///         .map(|x| x.len())                                       // Ok: usize, Error: ParseIntError
122///         .sum();
123///
124///     match will_fail {
125///         true => assert!(result.is_err()),
126///         false => assert_eq!(result, Ok(312)),
127///     }
128/// }
129/// ```
130///
131/// [`ParIter`]: crate::ParIter
132pub trait ParIterResult<R = DefaultRunner>
133where
134    R: ParallelRunner,
135{
136    /// Type of the Ok element, to be received as the Ok variant iff the entire computation succeeds.
137    type Item;
138
139    /// Type of the Err element, to be received if any of the computations fails.
140    type Err;
141
142    /// Element type of the regular parallel iterator this fallible iterator can be converted to, simply `Result<Self::Ok, Self::Err>`.
143    type RegularItem: IntoResult<Self::Item, Self::Err>;
144
145    /// Regular parallel iterator this fallible iterator can be converted into.
146    type RegularParIter: ParIter<R, Item = Self::RegularItem>;
147
148    /// Returns a reference to the input concurrent iterator.
149    fn con_iter_len(&self) -> Option<usize>;
150
151    /// Converts this fallible iterator into a regular parallel iterator; i.e., [`ParIter`], with `Item = Result<Self::Ok, Self::Err>`.
152    fn into_regular_par(self) -> Self::RegularParIter;
153
154    /// Converts the `regular_par` iterator with `Item = Result<Self::Ok, Self::Err>` into fallible result iterator.
155    fn from_regular_par(regular_par: Self::RegularParIter) -> Self;
156
157    // params transformations
158
159    /// Sets the number of threads to be used in the parallel execution.
160    /// Integers can be used as the argument with the following mapping:
161    ///
162    /// * `0` -> `NumThreads::Auto`
163    /// * `1` -> `NumThreads::sequential()`
164    /// * `n > 0` -> `NumThreads::Max(n)`
165    ///
166    /// See [`NumThreads`] and [`ParIter::num_threads`] for details.
167    fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self
168    where
169        Self: Sized,
170    {
171        Self::from_regular_par(self.into_regular_par().num_threads(num_threads))
172    }
173
174    /// Sets the number of elements to be pulled from the concurrent iterator during the
175    /// parallel execution. When integers are used as argument, the following mapping applies:
176    ///
177    /// * `0` -> `ChunkSize::Auto`
178    /// * `n > 0` -> `ChunkSize::Exact(n)`
179    ///
180    /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
181    ///
182    /// See [`ChunkSize`] and [`ParIter::chunk_size`] for details.
183    fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self
184    where
185        Self: Sized,
186    {
187        Self::from_regular_par(self.into_regular_par().chunk_size(chunk_size))
188    }
189
190    /// Sets the iteration order of the parallel computation.
191    ///
192    /// See [`IterationOrder`] and [`ParIter::iteration_order`] for details.
193    fn iteration_order(self, order: IterationOrder) -> Self
194    where
195        Self: Sized,
196    {
197        Self::from_regular_par(self.into_regular_par().iteration_order(order))
198    }
199
200    /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
201    ///
202    /// See [`ParIter::with_runner`] for details.
203    ///
204    /// [`DefaultRunner`]: crate::DefaultRunner
205    /// [`ParIter::with_runner`]: crate::ParIter::with_runner
206    fn with_runner<Q: ParallelRunner>(
207        self,
208        orchestrator: Q,
209    ) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err>;
210
211    /// Rather than [`DefaultPool`], uses the parallel runner with the given `pool` implementing
212    /// [`ParThreadPool`].
213    ///
214    /// See [`ParIter::with_pool`] for details.
215    ///
216    /// [`DefaultPool`]: crate::DefaultPool
217    /// [`ParIter::with_pool`]: crate::ParIter::with_pool
218    fn with_pool<P: ParThreadPool>(
219        self,
220        pool: P,
221    ) -> impl ParIterResult<RunnerWithPool<P, R::Executor>, Item = Self::Item, Err = Self::Err>
222    where
223        Self: Sized,
224    {
225        let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
226        self.with_runner(runner)
227    }
228
229    // computation transformations
230
231    /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
232    ///
233    /// Transformation is only for the success path where all elements are of the `Ok` variant.
234    /// Any observation of an `Err` case short-circuits the computation and immediately returns the observed error.
235    ///
236    /// # Examples
237    ///
238    /// ```
239    /// use orx_parallel::*;
240    ///
241    /// // all succeeds
242    /// let a: Vec<Result<u32, char>> = vec![Ok(1), Ok(2), Ok(3)];
243    /// let iter = a.into_par().into_fallible_result().map(|x| 2 * x);
244    ///
245    /// let b: Result<Vec<_>, _> = iter.collect();
246    /// assert_eq!(b, Ok(vec![2, 4, 6]));
247    ///
248    /// // at least one fails
249    /// let a = vec![Ok(1), Err('x'), Ok(3)];
250    /// let iter = a.into_par().into_fallible_result().map(|x| 2 * x);
251    ///
252    /// let b: Result<Vec<_>, _> = iter.collect();
253    /// assert_eq!(b, Err('x'));
254    /// ```
255    fn map<Out, Map>(self, map: Map) -> impl ParIterResult<R, Item = Out, Err = Self::Err>
256    where
257        Self: Sized,
258        Map: Fn(Self::Item) -> Out + Sync + Clone,
259        Out: Send,
260    {
261        let par = self.into_regular_par();
262        let map = par.map(move |x| x.into_result().map(map.clone()));
263        map.into_fallible_result()
264    }
265
266    /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
267    ///
268    /// Transformation is only for the success path where all elements are of the `Ok` variant.
269    /// Any observation of an `Err` case short-circuits the computation and immediately returns the observed error.
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// use orx_parallel::*;
275    ///
276    /// // all succeeds
277    /// let a: Vec<Result<u32, char>> = vec![Ok(1), Ok(2), Ok(3)];
278    /// let iter = a.into_par().into_fallible_result().filter(|x| x % 2 == 1);
279    ///
280    /// let b = iter.sum();
281    /// assert_eq!(b, Ok(1 + 3));
282    ///
283    /// // at least one fails
284    /// let a = vec![Ok(1), Err('x'), Ok(3)];
285    /// let iter = a.into_par().into_fallible_result().filter(|x| x % 2 == 1);
286    ///
287    /// let b = iter.sum();
288    /// assert_eq!(b, Err('x'));
289    /// ```
290    fn filter<Filter>(
291        self,
292        filter: Filter,
293    ) -> impl ParIterResult<R, Item = Self::Item, Err = Self::Err>
294    where
295        Self: Sized,
296        Filter: Fn(&Self::Item) -> bool + Sync + Clone,
297        Self::Item: Send,
298    {
299        let par = self.into_regular_par();
300        let filter_map = par.filter_map(move |x| match x.into_result() {
301            Ok(x) => match filter(&x) {
302                true => Some(Ok(x)),
303                false => None,
304            },
305            Err(e) => Some(Err(e)),
306        });
307        filter_map.into_fallible_result()
308    }
309
310    /// Creates an iterator that works like map, but flattens nested structure.
311    ///
312    /// Transformation is only for the success path where all elements are of the `Ok` variant.
313    /// Any observation of an `Err` case short-circuits the computation and immediately returns the observed error.
314    ///
315    /// # Examples
316    ///
317    /// ```
318    /// use orx_parallel::*;
319    ///
320    /// // all succeeds
321    /// let words: Vec<Result<&str, char>> = vec![Ok("alpha"), Ok("beta"), Ok("gamma")];
322    ///
323    /// let all_chars: Result<Vec<_>, _> = words
324    ///     .into_par()
325    ///     .into_fallible_result()
326    ///     .flat_map(|s| s.chars()) // chars() returns an iterator
327    ///     .collect();
328    ///
329    /// let merged: Result<String, _> = all_chars.map(|chars| chars.iter().collect());
330    /// assert_eq!(merged, Ok("alphabetagamma".to_string()));
331    ///
332    /// // at least one fails
333    /// let words: Vec<Result<&str, char>> = vec![Ok("alpha"), Ok("beta"), Err('x'), Ok("gamma")];
334    ///
335    /// let all_chars: Result<Vec<_>, _> = words
336    ///     .into_par()
337    ///     .into_fallible_result()
338    ///     .flat_map(|s| s.chars()) // chars() returns an iterator
339    ///     .collect();
340    ///
341    /// let merged: Result<String, _> = all_chars.map(|chars| chars.iter().collect());
342    /// assert_eq!(merged, Err('x'));
343    /// ```
344    fn flat_map<IOut, FlatMap>(
345        self,
346        flat_map: FlatMap,
347    ) -> impl ParIterResult<R, Item = IOut::Item, Err = Self::Err>
348    where
349        Self: Sized,
350        IOut: IntoIterator,
351        IOut::Item: Send,
352        FlatMap: Fn(Self::Item) -> IOut + Sync + Clone,
353    {
354        let par = self.into_regular_par();
355        let map = par.flat_map(move |x| match x.into_result() {
356            Ok(x) => ResultOfIter::ok(flat_map(x).into_iter()),
357            Err(e) => ResultOfIter::err(e),
358        });
359        map.into_fallible_result()
360    }
361
362    /// Creates an iterator that both filters and maps.
363    ///
364    /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
365    ///
366    /// `filter_map` can be used to make chains of `filter` and `map` more concise.
367    /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
368    ///
369    /// # Examples
370    ///
371    /// ```
372    /// use orx_parallel::*;
373    ///
374    /// // all succeeds
375    /// let a: Vec<Result<&str, char>> = vec![Ok("1"), Ok("two"), Ok("NaN"), Ok("four"), Ok("5")];
376    ///
377    /// let numbers: Result<Vec<_>, char> = a
378    ///     .into_par()
379    ///     .into_fallible_result()
380    ///     .filter_map(|s| s.parse::<usize>().ok())
381    ///     .collect();
382    ///
383    /// assert_eq!(numbers, Ok(vec![1, 5]));
384    ///
385    /// // at least one fails
386    /// let a: Vec<Result<&str, char>> = vec![Ok("1"), Ok("two"), Err('x'), Ok("four"), Ok("5")];
387    ///
388    /// let numbers: Result<Vec<_>, char> = a
389    ///     .into_par()
390    ///     .into_fallible_result()
391    ///     .filter_map(|s| s.parse::<usize>().ok())
392    ///     .collect();
393    ///
394    /// assert_eq!(numbers, Err('x'));
395    /// ```
396    fn filter_map<Out, FilterMap>(
397        self,
398        filter_map: FilterMap,
399    ) -> impl ParIterResult<R, Item = Out, Err = Self::Err>
400    where
401        Self: Sized,
402        FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone,
403        Out: Send,
404    {
405        let par = self.into_regular_par();
406        let filter_map = par.filter_map(move |x| match x.into_result() {
407            Ok(x) => filter_map(x).map(|x| Ok(x)),
408            Err(e) => Some(Err(e)),
409        });
410        filter_map.into_fallible_result()
411    }
412
413    /// Does something with each successful element of an iterator, passing the value on, provided that all elements are of Ok variant;
414    /// short-circuits and returns the error otherwise.
415    ///
416    /// When using iterators, you’ll often chain several of them together.
417    /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
418    /// To do that, insert a call to `inspect()`.
419    ///
420    /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
421    /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
422    ///
423    /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
424    /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
425    /// collect some intermediate values during parallel execution for further inspection.
426    /// The following example demonstrates such a use case.
427    ///
428    /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
429    ///
430    /// ```
431    /// use orx_parallel::*;
432    /// use orx_concurrent_bag::*;
433    /// use std::num::ParseIntError;
434    ///
435    /// // all succeeds
436    /// let a: Vec<Result<u32, ParseIntError>> = ["1", "4", "2", "3"]
437    ///     .into_iter()
438    ///     .map(|x| x.parse::<u32>())
439    ///     .collect();
440    ///
441    /// // let's add some inspect() calls to investigate what's happening
442    /// // - log some events
443    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
444    /// let bag = ConcurrentBag::new();
445    ///
446    /// let sum = a
447    ///     .par()
448    ///     .cloned()
449    ///     .into_fallible_result()
450    ///     .inspect(|x| println!("about to filter: {x}"))
451    ///     .filter(|x| x % 2 == 0)
452    ///     .inspect(|x| {
453    ///         bag.push(*x);
454    ///         println!("made it through filter: {x}");
455    ///     })
456    ///     .sum();
457    /// assert_eq!(sum, Ok(4 + 2));
458    ///
459    /// let mut values_made_through = bag.into_inner();
460    /// values_made_through.sort();
461    /// assert_eq!(values_made_through, [2, 4]);
462    ///
463    /// // at least one fails
464    /// let a: Vec<Result<u32, ParseIntError>> = ["1", "4", "x", "3"]
465    ///     .into_iter()
466    ///     .map(|x| x.parse::<u32>())
467    ///     .collect();
468    ///
469    /// // let's add some inspect() calls to investigate what's happening
470    /// // - log some events
471    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
472    /// let bag = ConcurrentBag::new();
473    ///
474    /// let sum = a
475    ///     .par()
476    ///     .cloned()
477    ///     .into_fallible_result()
478    ///     .inspect(|x| println!("about to filter: {x}"))
479    ///     .filter(|x| x % 2 == 0)
480    ///     .inspect(|x| {
481    ///         bag.push(*x);
482    ///         println!("made it through filter: {x}");
483    ///     })
484    ///     .sum();
485    /// assert!(sum.is_err());
486    /// ```
487    fn inspect<Operation>(
488        self,
489        operation: Operation,
490    ) -> impl ParIterResult<R, Item = Self::Item, Err = Self::Err>
491    where
492        Self: Sized,
493        Operation: Fn(&Self::Item) + Sync + Clone,
494        Self::Item: Send,
495    {
496        let map = move |x| {
497            operation(&x);
498            x
499        };
500        self.map(map)
501    }
502
503    // collect
504
505    /// Collects all the items from an iterator into a collection iff all elements are of Ok variant.
506    /// Early exits and returns the error if any of the elements is an Err.
507    ///
508    /// This is useful when you already have a collection and want to add the iterator items to it.
509    ///
510    /// The collection is passed in as owned value, and returned back with the additional elements.
511    ///
512    /// All collections implementing [`ParCollectInto`] can be used to collect into.
513    ///
514    /// [`ParCollectInto`]: crate::ParCollectInto
515    ///
516    /// # Examples
517    ///
518    /// ```
519    /// use orx_parallel::*;
520    ///
521    /// let vec: Vec<i32> = vec![0, 1];
522    ///
523    /// // all succeeds
524    /// let result = ["1", "2", "3"]
525    ///     .into_par()
526    ///     .map(|x| x.parse::<i32>())
527    ///     .into_fallible_result()
528    ///     .map(|x| x * 10)
529    ///     .collect_into(vec);
530    ///
531    /// assert!(result.is_ok());
532    /// let vec = result.unwrap();
533    /// assert_eq!(vec, vec![0, 1, 10, 20, 30]);
534    ///
535    /// // at least one fails
536    ///
537    /// let result = ["1", "x!", "3"]
538    ///     .into_par()
539    ///     .map(|x| x.parse::<i32>())
540    ///     .into_fallible_result()
541    ///     .map(|x| x * 10)
542    ///     .collect_into(vec);
543    /// assert!(result.is_err());
544    /// ```
545    fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
546    where
547        C: ParCollectInto<Self::Item>,
548        Self::Item: Send,
549        Self::Err: Send;
550
551    /// Transforms an iterator into a collection iff all elements are of Ok variant.
552    /// Early exits and returns the error if any of the elements is an Err.
553    ///
554    /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
555    /// the type of the result collection; or turbofish annotation can be used.
556    ///
557    /// All collections implementing [`ParCollectInto`] can be used to collect into.
558    ///
559    /// [`ParCollectInto`]: crate::ParCollectInto
560    ///
561    /// # Examples
562    ///
563    /// ```
564    /// use orx_parallel::*;
565    ///
566    /// // all succeeds
567    ///
568    /// let result_doubled: Result<Vec<i32>, _> = ["1", "2", "3"]
569    ///     .into_par()
570    ///     .map(|x| x.parse::<i32>())
571    ///     .into_fallible_result()
572    ///     .map(|x| x * 2)
573    ///     .collect();
574    ///
575    /// assert_eq!(result_doubled, Ok(vec![2, 4, 6]));
576    ///
577    /// // at least one fails
578    ///
579    /// let result_doubled: Result<Vec<i32>, _> = ["1", "x!", "3"]
580    ///     .into_par()
581    ///     .map(|x| x.parse::<i32>())
582    ///     .into_fallible_result()
583    ///     .map(|x| x * 2)
584    ///     .collect();
585    ///
586    /// assert!(result_doubled.is_err());
587    /// ```
588    fn collect<C>(self) -> Result<C, Self::Err>
589    where
590        Self: Sized,
591        Self::Item: Send,
592        Self::Err: Send,
593        C: ParCollectInto<Self::Item>,
594    {
595        let output = C::empty(self.con_iter_len());
596        self.collect_into(output)
597    }
598
599    // reduce
600
601    /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
602    /// Early exits and returns the error if any of the elements is an Err.
603    ///
604    /// If the iterator is empty, returns `Ok(None)`; otherwise, returns `Ok` of result of the reduction.
605    ///
606    /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
607    ///
608    /// # Example
609    ///
610    /// ```
611    /// use orx_parallel::*;
612    ///
613    /// fn safe_div(a: u32, b: u32) -> Result<u32, char> {
614    ///     match b {
615    ///         0 => Err('!'),
616    ///         b => Ok(a / b),
617    ///     }
618    /// }
619    ///
620    /// // all succeeds
621    /// let reduced: Result<Option<u32>, char> = (1..10)
622    ///     .par()
623    ///     .map(|x| safe_div(100, x as u32))
624    ///     .into_fallible_result()
625    ///     .reduce(|acc, e| acc + e);
626    /// assert_eq!(reduced, Ok(Some(281)));
627    ///
628    /// // all succeeds - empty iterator
629    /// let reduced: Result<Option<u32>, char> = (1..1)
630    ///     .par()
631    ///     .map(|x| safe_div(100, x as u32))
632    ///     .into_fallible_result()
633    ///     .reduce(|acc, e| acc + e);
634    /// assert_eq!(reduced, Ok(None));
635    ///
636    /// // at least one fails
637    /// let reduced: Result<Option<u32>, char> = (0..10)
638    ///     .par()
639    ///     .map(|x| safe_div(100, x as u32))
640    ///     .into_fallible_result()
641    ///     .reduce(|acc, e| acc + e);
642    /// assert_eq!(reduced, Err('!'));
643    /// ```
644    fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
645    where
646        Self::Item: Send,
647        Self::Err: Send,
648        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
649
650    /// Tests if every element of the iterator matches a predicate.
651    /// Early exits and returns the error if any of the elements is an Err.
652    ///
653    /// `all` takes a `predicate` that returns true or false.
654    /// It applies this closure to each Ok element of the iterator,
655    /// and if they all return true, then so does `all`.
656    /// If any of them returns false, it returns false.
657    ///
658    /// Note that `all` computation is itself also short-circuiting; in other words, it will stop processing as soon as it finds a false,
659    /// given that no matter what else happens, the result will also be false.
660    ///
661    /// Therefore, in case the fallible iterator contains both an Err element and an Ok element which violates the `predicate`,
662    /// the result is **not deterministic**. It might be the `Err` if it is observed first; or `Ok(false)` if the violation is observed first.
663    ///
664    /// On the other hand, when it returns `Ok(true)`, we are certain that all elements are of `Ok` variant and all satisfy the `predicate`.
665    ///
666    /// An empty iterator returns `Ok(true)`.
667    ///
668    /// # Examples
669    ///
670    /// ```
671    /// use orx_parallel::*;
672    ///
673    /// // all Ok
674    /// let result = vec!["1", "2", "3"]
675    ///     .into_par()
676    ///     .map(|x| x.parse::<i32>())
677    ///     .into_fallible_result()
678    ///     .all(|x| *x > 0);
679    /// assert_eq!(result, Ok(true));
680    ///
681    /// let result = vec!["1", "2", "3"]
682    ///     .into_par()
683    ///     .map(|x| x.parse::<i32>())
684    ///     .into_fallible_result()
685    ///     .all(|x| *x > 1);
686    /// assert_eq!(result, Ok(false));
687    ///
688    /// let result = Vec::<&str>::new()
689    ///     .into_par()
690    ///     .map(|x| x.parse::<i32>())
691    ///     .into_fallible_result()
692    ///     .all(|x| *x > 1);
693    /// assert_eq!(result, Ok(true)); // empty iterator
694    ///
695    /// // at least one Err
696    /// let result = vec!["1", "x!", "3"]
697    ///     .into_par()
698    ///     .map(|x| x.parse::<i32>())
699    ///     .into_fallible_result()
700    ///     .all(|x| *x > 0);
701    /// assert!(result.is_err());
702    /// ```
703    fn all<Predicate>(self, predicate: Predicate) -> Result<bool, Self::Err>
704    where
705        Self: Sized,
706        Self::Item: Send,
707        Self::Err: Send,
708        Predicate: Fn(&Self::Item) -> bool + Sync,
709    {
710        let violates = |x: &Self::Item| !predicate(x);
711        self.find(violates).map(|x| x.is_none())
712    }
713
714    /// Tests if any element of the iterator matches a predicate.
715    /// Early exits and returns the error if any of the elements is an Err.
716    ///
717    /// `any` takes a `predicate` that returns true or false.
718    /// It applies this closure to each element of the iterator,
719    /// and if any of the elements returns true, then so does `any`.
720    /// If all of them return false, it returns false.
721    ///
722    /// Note that `any` computation is itself also short-circuiting; in other words, it will stop processing as soon as it finds a true,
723    /// given that no matter what else happens, the result will also be true.
724    ///
725    /// Therefore, in case the fallible iterator contains both an Err element and an Ok element which satisfies the `predicate`,
726    /// the result is **not deterministic**. It might be the `Err` if it is observed first; or `Ok(true)` if element satisfying the predicate
727    /// is observed first.
728    ///
729    /// On the other hand, when it returns `Ok(false)`, we are certain that all elements are of `Ok` variant and none of them satisfies the `predicate`.
730    ///
731    /// An empty iterator returns `Ok(false)`.
732    ///
733    /// # Examples
734    ///
735    /// ```
736    /// use orx_parallel::*;
737    ///
738    /// // all Ok
739    /// let result = vec!["1", "2", "3"]
740    ///     .into_par()
741    ///     .map(|x| x.parse::<i32>())
742    ///     .into_fallible_result()
743    ///     .any(|x| *x > 1);
744    /// assert_eq!(result, Ok(true));
745    ///
746    /// let result = vec!["1", "2", "3"]
747    ///     .into_par()
748    ///     .map(|x| x.parse::<i32>())
749    ///     .into_fallible_result()
750    ///     .any(|x| *x > 3);
751    /// assert_eq!(result, Ok(false));
752    ///
753    /// let result = Vec::<&str>::new()
754    ///     .into_par()
755    ///     .map(|x| x.parse::<i32>())
756    ///     .into_fallible_result()
757    ///     .any(|x| *x > 1);
758    /// assert_eq!(result, Ok(false)); // empty iterator
759    ///
760    /// // at least one Err
761    /// let result = vec!["1", "x!", "3"]
762    ///     .into_par()
763    ///     .map(|x| x.parse::<i32>())
764    ///     .into_fallible_result()
765    ///     .any(|x| *x > 5);
766    /// assert!(result.is_err());
767    /// ```
768    fn any<Predicate>(self, predicate: Predicate) -> Result<bool, Self::Err>
769    where
770        Self: Sized,
771        Self::Item: Send,
772        Self::Err: Send,
773        Predicate: Fn(&Self::Item) -> bool + Sync,
774    {
775        self.find(predicate).map(|x| x.is_some())
776    }
777
778    /// Consumes the iterator, counting the number of iterations and returning it.
779    /// Early exits and returns the error if any of the elements is an Err.
780    ///
781    /// # Examples
782    ///
783    /// ```
784    /// use orx_parallel::*;
785    ///
786    /// // all Ok
787    /// let result = vec!["1", "2", "3"]
788    ///     .into_par()
789    ///     .map(|x| x.parse::<i32>())
790    ///     .into_fallible_result()
791    ///     .filter(|x| *x >= 2)
792    ///     .count();
793    /// assert_eq!(result, Ok(2));
794    ///
795    /// // at least one Err
796    /// let result = vec!["x!", "2", "3"]
797    ///     .into_par()
798    ///     .map(|x| x.parse::<i32>())
799    ///     .into_fallible_result()
800    ///     .filter(|x| *x >= 2)
801    ///     .count();
802    /// assert!(result.is_err());
803    /// ```
804    fn count(self) -> Result<usize, Self::Err>
805    where
806        Self: Sized,
807        Self::Err: Send,
808    {
809        self.map(map_count)
810            .reduce(reduce_sum)
811            .map(|x| x.unwrap_or(0))
812    }
813
814    /// Calls a closure on each element of an iterator, and returns `Ok(())` if all elements succeed.
815    /// Early exits and returns the error if any of the elements is an Err.
816    ///
817    /// # Examples
818    ///
819    /// Basic usage:
820    ///
821    /// ```
822    /// use orx_parallel::*;
823    /// use std::sync::mpsc::channel;
824    ///
825    /// // all Ok
826    /// let (tx, rx) = channel();
827    /// let result = vec!["0", "1", "2", "3", "4"]
828    ///     .into_par()
829    ///     .map(|x| x.parse::<i32>())
830    ///     .into_fallible_result()
831    ///     .map(|x| x * 2 + 1)
832    ///     .for_each(move |x| tx.send(x).unwrap());
833    ///
834    /// assert_eq!(result, Ok(()));
835    ///
836    /// let mut v: Vec<_> = rx.iter().collect();
837    /// v.sort(); // order can be mixed, since messages will be sent in parallel
838    /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
839    ///
840    /// // at least one Err
841    /// let (tx, _rx) = channel();
842    /// let result = vec!["0", "1", "2", "x!", "4"]
843    ///     .into_par()
844    ///     .map(|x| x.parse::<i32>())
845    ///     .into_fallible_result()
846    ///     .map(|x| x * 2 + 1)
847    ///     .for_each(move |x| tx.send(x).unwrap());
848    ///
849    /// assert!(result.is_err());
850    /// ```
851    fn for_each<Operation>(self, operation: Operation) -> Result<(), Self::Err>
852    where
853        Self: Sized,
854        Self::Err: Send,
855        Operation: Fn(Self::Item) + Sync,
856    {
857        let map = |x| operation(x);
858        self.map(map).reduce(reduce_unit).map(|_| ())
859    }
860
861    /// Returns Ok of maximum element of an iterator if all elements succeed.
862    /// If the iterator is empty, `Ok(None)` is returned.
863    /// Early exits and returns the error if any of the elements is an Err.
864    ///
865    /// # Examples
866    ///
867    /// ```
868    /// use orx_parallel::*;
869    ///
870    /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
871    /// assert_eq!(a.par().copied().into_fallible_result().max(), Ok(Some(3)));
872    ///
873    /// let b: Vec<Result<i32, char>> = vec![];
874    /// assert_eq!(b.par().copied().into_fallible_result().max(), Ok(None));
875    ///
876    /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
877    /// assert_eq!(c.par().copied().into_fallible_result().max(), Err('x'));
878    /// ```
879    fn max(self) -> Result<Option<Self::Item>, Self::Err>
880    where
881        Self: Sized,
882        Self::Err: Send,
883        Self::Item: Ord + Send,
884    {
885        self.reduce(Ord::max)
886    }
887
888    /// Returns the element that gives the maximum value with respect to the specified `compare` function.
889    /// If the iterator is empty, `Ok(None)` is returned.
890    /// Early exits and returns the error if any of the elements is an Err.
891    ///
892    /// ```
893    /// use orx_parallel::*;
894    ///
895    /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
896    /// assert_eq!(
897    ///     a.par()
898    ///         .copied()
899    ///         .into_fallible_result()
900    ///         .max_by(|a, b| a.cmp(b)),
901    ///     Ok(Some(3))
902    /// );
903    ///
904    /// let b: Vec<Result<i32, char>> = vec![];
905    /// assert_eq!(
906    ///     b.par()
907    ///         .copied()
908    ///         .into_fallible_result()
909    ///         .max_by(|a, b| a.cmp(b)),
910    ///     Ok(None)
911    /// );
912    ///
913    /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
914    /// assert_eq!(
915    ///     c.par()
916    ///         .copied()
917    ///         .into_fallible_result()
918    ///         .max_by(|a, b| a.cmp(b)),
919    ///     Err('x')
920    /// );
921    /// ```
922    fn max_by<Compare>(self, compare: Compare) -> Result<Option<Self::Item>, Self::Err>
923    where
924        Self: Sized,
925        Self::Item: Send,
926        Self::Err: Send,
927        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
928    {
929        let reduce = |x, y| match compare(&x, &y) {
930            Ordering::Greater | Ordering::Equal => x,
931            Ordering::Less => y,
932        };
933        self.reduce(reduce)
934    }
935
936    /// Returns the element that gives the maximum value from the specified function.
937    /// If the iterator is empty, `Ok(None)` is returned.
938    /// Early exits and returns the error if any of the elements is an Err.
939    ///
940    /// # Examples
941    ///
942    /// ```
943    /// use orx_parallel::*;
944    ///
945    /// let a: Vec<Result<i32, char>> = vec![Ok(-1), Ok(2), Ok(-3)];
946    /// assert_eq!(
947    ///     a.par()
948    ///         .copied()
949    ///         .into_fallible_result()
950    ///         .max_by_key(|x| x.abs()),
951    ///     Ok(Some(-3))
952    /// );
953    ///
954    /// let b: Vec<Result<i32, char>> = vec![];
955    /// assert_eq!(
956    ///     b.par()
957    ///         .copied()
958    ///         .into_fallible_result()
959    ///         .max_by_key(|x| x.abs()),
960    ///     Ok(None)
961    /// );
962    ///
963    /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
964    /// assert_eq!(
965    ///     c.par()
966    ///         .copied()
967    ///         .into_fallible_result()
968    ///         .max_by_key(|x| x.abs()),
969    ///     Err('x')
970    /// );
971    /// ```
972    fn max_by_key<Key, GetKey>(self, key: GetKey) -> Result<Option<Self::Item>, Self::Err>
973    where
974        Self: Sized,
975        Self::Item: Send,
976        Self::Err: Send,
977        Key: Ord,
978        GetKey: Fn(&Self::Item) -> Key + Sync,
979    {
980        let reduce = |x, y| match key(&x).cmp(&key(&y)) {
981            Ordering::Greater | Ordering::Equal => x,
982            Ordering::Less => y,
983        };
984        self.reduce(reduce)
985    }
986
987    /// Returns Ok of minimum element of an iterator if all elements succeed.
988    /// If the iterator is empty, `Ok(None)` is returned.
989    /// Early exits and returns the error if any of the elements is an Err.
990    ///
991    /// # Examples
992    ///
993    /// ```
994    /// use orx_parallel::*;
995    ///
996    /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
997    /// assert_eq!(a.par().copied().into_fallible_result().min(), Ok(Some(1)));
998    ///
999    /// let b: Vec<Result<i32, char>> = vec![];
1000    /// assert_eq!(b.par().copied().into_fallible_result().min(), Ok(None));
1001    ///
1002    /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
1003    /// assert_eq!(c.par().copied().into_fallible_result().min(), Err('x'));
1004    /// ```
1005    fn min(self) -> Result<Option<Self::Item>, Self::Err>
1006    where
1007        Self: Sized,
1008        Self::Item: Ord + Send,
1009        Self::Err: Send,
1010    {
1011        self.reduce(Ord::min)
1012    }
1013
1014    /// Returns the element that gives the maximum value with respect to the specified `compare` function.
1015    /// If the iterator is empty, `Ok(None)` is returned.
1016    /// Early exits and returns the error if any of the elements is an Err.
1017    ///
1018    /// ```
1019    /// use orx_parallel::*;
1020    ///
1021    /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
1022    /// assert_eq!(
1023    ///     a.par()
1024    ///         .copied()
1025    ///         .into_fallible_result()
1026    ///         .min_by(|a, b| a.cmp(b)),
1027    ///     Ok(Some(1))
1028    /// );
1029    ///
1030    /// let b: Vec<Result<i32, char>> = vec![];
1031    /// assert_eq!(
1032    ///     b.par()
1033    ///         .copied()
1034    ///         .into_fallible_result()
1035    ///         .min_by(|a, b| a.cmp(b)),
1036    ///     Ok(None)
1037    /// );
1038    ///
1039    /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
1040    /// assert_eq!(
1041    ///     c.par()
1042    ///         .copied()
1043    ///         .into_fallible_result()
1044    ///         .min_by(|a, b| a.cmp(b)),
1045    ///     Err('x')
1046    /// );
1047    /// ```
1048    fn min_by<Compare>(self, compare: Compare) -> Result<Option<Self::Item>, Self::Err>
1049    where
1050        Self: Sized,
1051        Self::Item: Send,
1052        Self::Err: Send,
1053        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1054    {
1055        let reduce = |x, y| match compare(&x, &y) {
1056            Ordering::Less | Ordering::Equal => x,
1057            Ordering::Greater => y,
1058        };
1059        self.reduce(reduce)
1060    }
1061
1062    /// Returns the element that gives the minimum value from the specified function.
1063    /// If the iterator is empty, `Ok(None)` is returned.
1064    /// Early exits and returns the error if any of the elements is an Err.
1065    ///
1066    /// # Examples
1067    ///
1068    /// ```
1069    /// use orx_parallel::*;
1070    ///
1071    /// let a: Vec<Result<i32, char>> = vec![Ok(-1), Ok(2), Ok(-3)];
1072    /// assert_eq!(
1073    ///     a.par()
1074    ///         .copied()
1075    ///         .into_fallible_result()
1076    ///         .min_by_key(|x| x.abs()),
1077    ///     Ok(Some(-1))
1078    /// );
1079    ///
1080    /// let b: Vec<Result<i32, char>> = vec![];
1081    /// assert_eq!(
1082    ///     b.par()
1083    ///         .copied()
1084    ///         .into_fallible_result()
1085    ///         .min_by_key(|x| x.abs()),
1086    ///     Ok(None)
1087    /// );
1088    ///
1089    /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
1090    /// assert_eq!(
1091    ///     c.par()
1092    ///         .copied()
1093    ///         .into_fallible_result()
1094    ///         .min_by_key(|x| x.abs()),
1095    ///     Err('x')
1096    /// );
1097    /// ```
1098    fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Result<Option<Self::Item>, Self::Err>
1099    where
1100        Self: Sized,
1101        Self::Item: Send,
1102        Self::Err: Send,
1103        Key: Ord,
1104        GetKey: Fn(&Self::Item) -> Key + Sync,
1105    {
1106        let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1107            Ordering::Less | Ordering::Equal => x,
1108            Ordering::Greater => y,
1109        };
1110        self.reduce(reduce)
1111    }
1112
1113    /// Sums the elements of an iterator.
1114    /// Early exits and returns the error if any of the elements is an Err.
1115    ///
1116    /// If the iterator is empty, returns zero; otherwise, returns `Ok` of the sum.
1117    ///
1118    /// # Example
1119    ///
1120    /// ```
1121    /// use orx_parallel::*;
1122    ///
1123    /// fn safe_div(a: u32, b: u32) -> Result<u32, char> {
1124    ///     match b {
1125    ///         0 => Err('!'),
1126    ///         b => Ok(a / b),
1127    ///     }
1128    /// }
1129    ///
1130    /// // all succeeds
1131    /// let reduced: Result<u32, char> = (1..10)
1132    ///     .par()
1133    ///     .map(|x| safe_div(100, x as u32))
1134    ///     .into_fallible_result()
1135    ///     .sum();
1136    /// assert_eq!(reduced, Ok(281));
1137    ///
1138    /// // all succeeds - empty iterator
1139    /// let reduced: Result<u32, char> = (1..1)
1140    ///     .par()
1141    ///     .map(|x| safe_div(100, x as u32))
1142    ///     .into_fallible_result()
1143    ///     .sum();
1144    /// assert_eq!(reduced, Ok(0));
1145    ///
1146    /// // at least one fails
1147    /// let reduced: Result<u32, char> = (0..10)
1148    ///     .par()
1149    ///     .map(|x| safe_div(100, x as u32))
1150    ///     .into_fallible_result()
1151    ///     .sum();
1152    /// assert_eq!(reduced, Err('!'));
1153    /// ```
1154    fn sum<Out>(self) -> Result<Out, Self::Err>
1155    where
1156        Self: Sized,
1157        Self::Item: Sum<Out>,
1158        Self::Err: Send,
1159        Out: Send,
1160    {
1161        self.map(Self::Item::map)
1162            .reduce(Self::Item::reduce)
1163            .map(|x| x.unwrap_or(Self::Item::zero()))
1164    }
1165
1166    // early exit
1167
1168    /// Returns the first (or any) element of the iterator.
1169    /// If the iterator is empty, `Ok(None)` is returned.
1170    /// Early exits and returns the error if an Err element is observed first.
1171    ///
1172    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1173    /// * any element is returned if `IterationOrder::Arbitrary` is set.
1174    ///
1175    /// Note that `find` itself is short-circuiting in addition to fallible computation.
1176    /// Therefore, in case the fallible iterator contains both an Err and an Ok element,
1177    /// the result is **not deterministic**:
1178    /// * it might be the `Err` if it is observed first;
1179    /// * or `Ok(element)` if the Ok element is observed first.
1180    ///
1181    /// # Examples
1182    ///
1183    /// ```
1184    /// use orx_parallel::*;
1185    ///
1186    /// let a: Vec<Result<i32, char>> = vec![];
1187    /// assert_eq!(a.par().copied().into_fallible_result().first(), Ok(None));
1188    ///
1189    /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
1190    /// assert_eq!(a.par().copied().into_fallible_result().first(), Ok(Some(1)));
1191    ///
1192    /// let a: Vec<Result<i32, char>> = vec![Ok(1), Err('x'), Ok(3)];
1193    /// let result = a.par().copied().into_fallible_result().first();
1194    /// // depends on whichever is observed first in parallel execution
1195    /// assert!(result == Ok(Some(1)) || result == Err('x'));
1196    /// ```
1197    fn first(self) -> Result<Option<Self::Item>, Self::Err>
1198    where
1199        Self::Item: Send,
1200        Self::Err: Send;
1201
1202    /// Returns the first (or any) element of the iterator that satisfies the `predicate`.
1203    /// If the iterator is empty, `Ok(None)` is returned.
1204    /// Early exits and returns the error if an Err element is observed first.
1205    ///
1206    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1207    /// * any element is returned if `IterationOrder::Arbitrary` is set.
1208    ///
1209    /// Note that `find` itself is short-circuiting in addition to fallible computation.
1210    /// Therefore, in case the fallible iterator contains both an Err and an Ok element,
1211    /// the result is **not deterministic**:
1212    /// * it might be the `Err` if it is observed first;
1213    /// * or `Ok(element)` if the Ok element satisfying the predicate is observed first.
1214    ///
1215    /// # Examples
1216    ///
1217    /// ```
1218    /// use orx_parallel::*;
1219    ///
1220    /// let a: Vec<Result<i32, char>> = vec![];
1221    /// assert_eq!(
1222    ///     a.par().copied().into_fallible_result().find(|x| *x > 2),
1223    ///     Ok(None)
1224    /// );
1225    ///
1226    /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
1227    /// assert_eq!(
1228    ///     a.par().copied().into_fallible_result().find(|x| *x > 2),
1229    ///     Ok(Some(3))
1230    /// );
1231    ///
1232    /// let a: Vec<Result<i32, char>> = vec![Ok(1), Err('x'), Ok(3)];
1233    /// let result = a.par().copied().into_fallible_result().find(|x| *x > 2);
1234    /// // depends on whichever is observed first in parallel execution
1235    /// assert!(result == Ok(Some(3)) || result == Err('x'));
1236    /// ```
1237    fn find<Predicate>(self, predicate: Predicate) -> Result<Option<Self::Item>, Self::Err>
1238    where
1239        Self: Sized,
1240        Self::Item: Send,
1241        Self::Err: Send,
1242        Predicate: Fn(&Self::Item) -> bool + Sync,
1243    {
1244        self.filter(&predicate).first()
1245    }
1246}
1247
1248pub trait IntoResult<T, E> {
1249    fn into_result(self) -> Result<T, E>;
1250}
1251
1252impl<T, E> IntoResult<T, E> for Result<T, E> {
1253    #[inline(always)]
1254    fn into_result(self) -> Result<T, E> {
1255        self
1256    }
1257}