orx_parallel/par_iter_result.rs
1use crate::default_fns::{map_count, reduce_sum, reduce_unit};
2use crate::runner::{DefaultRunner, ParallelRunner};
3use crate::{ChunkSize, IterationOrder, NumThreads, ParThreadPool, RunnerWithPool, Sum};
4use crate::{ParCollectInto, ParIter, generic_values::fallible_iterators::ResultOfIter};
5use core::cmp::Ordering;
6
7/// A parallel iterator for which the computation either completely succeeds,
8/// or fails and **early exits** with an error.
9///
10/// # Examples
11///
12/// To demonstrate the difference of fallible iterator's behavior, consider the following simple example.
13/// We parse a series of strings into integers.
14/// We try this twice:
15/// * in the first one, all inputs are good, hence, we obtain Ok of parsed numbers,
16/// * in the second one, the value in the middle is faulty, we expect the computation to fail.
17///
18/// In the following, we try to achieve this both with a regular parallel iterator ([`ParIter`]) and a fallible
19/// parallel iterator, `ParIterResult` in this case.
20///
21/// You may notice the following differences:
22/// * In the regular iterator, it is not very convenient to keep both the resulting numbers and a potential error.
23/// Here, we make use of `filter_map` and simply ignore the error.
24/// * On the other hand, the `collect` method of the fallible iterator directly returns a `Result` of the computation
25/// which is either Ok of all parsed numbers or the error.
26/// * Also importantly note that the regular iterator will try to parse all the strings, regardless of how many times
27/// the parsing fails.
28/// * Fallible iterator, on the other hand, stops immediately after observing the first error and short circuits the
29/// computation.
30///
31/// ```
32/// use orx_parallel::*;
33/// use std::num::{IntErrorKind, ParseIntError};
34///
35/// let expected_results = [
36/// Ok((0..100).collect::<Vec<_>>()),
37/// Err(IntErrorKind::InvalidDigit),
38/// ];
39///
40/// for expected in expected_results {
41/// let expected_ok = expected.is_ok();
42/// let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
43/// if !expected_ok {
44/// inputs.insert(50, "x".to_string()); // plant an error case
45/// }
46///
47/// // regular parallel iterator
48/// let results = inputs.par().map(|x| x.parse::<u32>());
49/// let numbers: Vec<_> = results.filter_map(|x| x.ok()).collect();
50/// if expected_ok {
51/// assert_eq!(&expected, &Ok(numbers));
52/// } else {
53/// // we lost the error
54/// }
55///
56/// // fallible parallel iterator
57/// let results = inputs.par().map(|x| x.parse::<u32>());
58/// let result: Result<Vec<_>, ParseIntError> = results.into_fallible_result().collect();
59/// assert_eq!(&expected, &result.map_err(|x| x.kind().clone()));
60/// }
61/// ```
62///
63/// These differences are not specific to `collect`; all fallible iterator methods return a result.
64/// The following demonstrate reduction examples, where the result is either the reduced value if the entire computation
65/// succeeds, or the error.
66///
67/// ```
68/// use orx_parallel::*;
69/// use std::num::ParseIntError;
70///
71/// for will_fail in [false, true] {
72/// let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
73/// if will_fail {
74/// inputs.insert(50, "x".to_string()); // plant an error case
75/// }
76///
77/// // sum
78/// let results = inputs.par().map(|x| x.parse::<u32>());
79/// let result: Result<u32, ParseIntError> = results.into_fallible_result().sum();
80/// match will_fail {
81/// true => assert!(result.is_err()),
82/// false => assert_eq!(result, Ok(4950)),
83/// }
84///
85/// // max
86/// let results = inputs.par().map(|x| x.parse::<u32>());
87/// let result: Result<Option<u32>, ParseIntError> = results.into_fallible_result().max();
88/// match will_fail {
89/// true => assert!(result.is_err()),
90/// false => assert_eq!(result, Ok(Some(99))),
91/// }
92/// }
93/// ```
94///
95/// Finally, similar to regular iterators, a fallible parallel iterator can be tranformed using iterator methods.
96/// However, the transformation is on the success path, the error case always short circuits and returns the error.
97/// Notice in the following example that the success type keeps changing through transformations while the error type
98/// remains the same.
99///
100/// ```
101/// use orx_parallel::*;
102/// use std::num::ParseIntError;
103///
104/// for will_fail in [false, true] {
105/// let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
106/// if will_fail {
107/// inputs.insert(50, "x".to_string()); // plant an error case
108/// }
109///
110/// // fallible iter
111/// let results = inputs.par().map(|x| x.parse::<u32>());
112/// let fallible = results.into_fallible_result(); // Ok: u32, Error: ParseIntError
113///
114/// // transformations
115///
116/// let result: Result<usize, ParseIntError> = fallible
117/// .filter(|x| x % 2 == 1) // Ok: u32, Error: ParseIntError
118/// .map(|x| 3 * x) // Ok: u32, Error: ParseIntError
119/// .filter_map(|x| (x % 10 != 0).then_some(x)) // Ok: u32, Error: ParseIntError
120/// .flat_map(|x| [x.to_string(), (10 * x).to_string()]) // Ok: String, Error: ParseIntError
121/// .map(|x| x.len()) // Ok: usize, Error: ParseIntError
122/// .sum();
123///
124/// match will_fail {
125/// true => assert!(result.is_err()),
126/// false => assert_eq!(result, Ok(312)),
127/// }
128/// }
129/// ```
130///
131/// [`ParIter`]: crate::ParIter
132pub trait ParIterResult<R = DefaultRunner>
133where
134 R: ParallelRunner,
135{
136 /// Type of the Ok element, to be received as the Ok variant iff the entire computation succeeds.
137 type Item;
138
139 /// Type of the Err element, to be received if any of the computations fails.
140 type Err;
141
142 /// Element type of the regular parallel iterator this fallible iterator can be converted to, simply `Result<Self::Ok, Self::Err>`.
143 type RegularItem: IntoResult<Self::Item, Self::Err>;
144
145 /// Regular parallel iterator this fallible iterator can be converted into.
146 type RegularParIter: ParIter<R, Item = Self::RegularItem>;
147
148 /// Returns a reference to the input concurrent iterator.
149 fn con_iter_len(&self) -> Option<usize>;
150
151 /// Converts this fallible iterator into a regular parallel iterator; i.e., [`ParIter`], with `Item = Result<Self::Ok, Self::Err>`.
152 fn into_regular_par(self) -> Self::RegularParIter;
153
154 /// Converts the `regular_par` iterator with `Item = Result<Self::Ok, Self::Err>` into fallible result iterator.
155 fn from_regular_par(regular_par: Self::RegularParIter) -> Self;
156
157 // params transformations
158
159 /// Sets the number of threads to be used in the parallel execution.
160 /// Integers can be used as the argument with the following mapping:
161 ///
162 /// * `0` -> `NumThreads::Auto`
163 /// * `1` -> `NumThreads::sequential()`
164 /// * `n > 0` -> `NumThreads::Max(n)`
165 ///
166 /// See [`NumThreads`] and [`ParIter::num_threads`] for details.
167 fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self
168 where
169 Self: Sized,
170 {
171 Self::from_regular_par(self.into_regular_par().num_threads(num_threads))
172 }
173
174 /// Sets the number of elements to be pulled from the concurrent iterator during the
175 /// parallel execution. When integers are used as argument, the following mapping applies:
176 ///
177 /// * `0` -> `ChunkSize::Auto`
178 /// * `n > 0` -> `ChunkSize::Exact(n)`
179 ///
180 /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
181 ///
182 /// See [`ChunkSize`] and [`ParIter::chunk_size`] for details.
183 fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self
184 where
185 Self: Sized,
186 {
187 Self::from_regular_par(self.into_regular_par().chunk_size(chunk_size))
188 }
189
190 /// Sets the iteration order of the parallel computation.
191 ///
192 /// See [`IterationOrder`] and [`ParIter::iteration_order`] for details.
193 fn iteration_order(self, order: IterationOrder) -> Self
194 where
195 Self: Sized,
196 {
197 Self::from_regular_par(self.into_regular_par().iteration_order(order))
198 }
199
200 /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
201 ///
202 /// See [`ParIter::with_runner`] for details.
203 ///
204 /// [`DefaultRunner`]: crate::DefaultRunner
205 /// [`ParIter::with_runner`]: crate::ParIter::with_runner
206 fn with_runner<Q: ParallelRunner>(
207 self,
208 orchestrator: Q,
209 ) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err>;
210
211 /// Rather than [`DefaultPool`], uses the parallel runner with the given `pool` implementing
212 /// [`ParThreadPool`].
213 ///
214 /// See [`ParIter::with_pool`] for details.
215 ///
216 /// [`DefaultPool`]: crate::DefaultPool
217 /// [`ParIter::with_pool`]: crate::ParIter::with_pool
218 fn with_pool<P: ParThreadPool>(
219 self,
220 pool: P,
221 ) -> impl ParIterResult<RunnerWithPool<P, R::Executor>, Item = Self::Item, Err = Self::Err>
222 where
223 Self: Sized,
224 {
225 let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
226 self.with_runner(runner)
227 }
228
229 // computation transformations
230
231 /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
232 ///
233 /// Transformation is only for the success path where all elements are of the `Ok` variant.
234 /// Any observation of an `Err` case short-circuits the computation and immediately returns the observed error.
235 ///
236 /// # Examples
237 ///
238 /// ```
239 /// use orx_parallel::*;
240 ///
241 /// // all succeeds
242 /// let a: Vec<Result<u32, char>> = vec![Ok(1), Ok(2), Ok(3)];
243 /// let iter = a.into_par().into_fallible_result().map(|x| 2 * x);
244 ///
245 /// let b: Result<Vec<_>, _> = iter.collect();
246 /// assert_eq!(b, Ok(vec![2, 4, 6]));
247 ///
248 /// // at least one fails
249 /// let a = vec![Ok(1), Err('x'), Ok(3)];
250 /// let iter = a.into_par().into_fallible_result().map(|x| 2 * x);
251 ///
252 /// let b: Result<Vec<_>, _> = iter.collect();
253 /// assert_eq!(b, Err('x'));
254 /// ```
255 fn map<Out, Map>(self, map: Map) -> impl ParIterResult<R, Item = Out, Err = Self::Err>
256 where
257 Self: Sized,
258 Map: Fn(Self::Item) -> Out + Sync + Clone,
259 Out: Send,
260 {
261 let par = self.into_regular_par();
262 let map = par.map(move |x| x.into_result().map(map.clone()));
263 map.into_fallible_result()
264 }
265
266 /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
267 ///
268 /// Transformation is only for the success path where all elements are of the `Ok` variant.
269 /// Any observation of an `Err` case short-circuits the computation and immediately returns the observed error.
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// use orx_parallel::*;
275 ///
276 /// // all succeeds
277 /// let a: Vec<Result<u32, char>> = vec![Ok(1), Ok(2), Ok(3)];
278 /// let iter = a.into_par().into_fallible_result().filter(|x| x % 2 == 1);
279 ///
280 /// let b = iter.sum();
281 /// assert_eq!(b, Ok(1 + 3));
282 ///
283 /// // at least one fails
284 /// let a = vec![Ok(1), Err('x'), Ok(3)];
285 /// let iter = a.into_par().into_fallible_result().filter(|x| x % 2 == 1);
286 ///
287 /// let b = iter.sum();
288 /// assert_eq!(b, Err('x'));
289 /// ```
290 fn filter<Filter>(
291 self,
292 filter: Filter,
293 ) -> impl ParIterResult<R, Item = Self::Item, Err = Self::Err>
294 where
295 Self: Sized,
296 Filter: Fn(&Self::Item) -> bool + Sync + Clone,
297 Self::Item: Send,
298 {
299 let par = self.into_regular_par();
300 let filter_map = par.filter_map(move |x| match x.into_result() {
301 Ok(x) => match filter(&x) {
302 true => Some(Ok(x)),
303 false => None,
304 },
305 Err(e) => Some(Err(e)),
306 });
307 filter_map.into_fallible_result()
308 }
309
310 /// Creates an iterator that works like map, but flattens nested structure.
311 ///
312 /// Transformation is only for the success path where all elements are of the `Ok` variant.
313 /// Any observation of an `Err` case short-circuits the computation and immediately returns the observed error.
314 ///
315 /// # Examples
316 ///
317 /// ```
318 /// use orx_parallel::*;
319 ///
320 /// // all succeeds
321 /// let words: Vec<Result<&str, char>> = vec![Ok("alpha"), Ok("beta"), Ok("gamma")];
322 ///
323 /// let all_chars: Result<Vec<_>, _> = words
324 /// .into_par()
325 /// .into_fallible_result()
326 /// .flat_map(|s| s.chars()) // chars() returns an iterator
327 /// .collect();
328 ///
329 /// let merged: Result<String, _> = all_chars.map(|chars| chars.iter().collect());
330 /// assert_eq!(merged, Ok("alphabetagamma".to_string()));
331 ///
332 /// // at least one fails
333 /// let words: Vec<Result<&str, char>> = vec![Ok("alpha"), Ok("beta"), Err('x'), Ok("gamma")];
334 ///
335 /// let all_chars: Result<Vec<_>, _> = words
336 /// .into_par()
337 /// .into_fallible_result()
338 /// .flat_map(|s| s.chars()) // chars() returns an iterator
339 /// .collect();
340 ///
341 /// let merged: Result<String, _> = all_chars.map(|chars| chars.iter().collect());
342 /// assert_eq!(merged, Err('x'));
343 /// ```
344 fn flat_map<IOut, FlatMap>(
345 self,
346 flat_map: FlatMap,
347 ) -> impl ParIterResult<R, Item = IOut::Item, Err = Self::Err>
348 where
349 Self: Sized,
350 IOut: IntoIterator,
351 IOut::Item: Send,
352 FlatMap: Fn(Self::Item) -> IOut + Sync + Clone,
353 {
354 let par = self.into_regular_par();
355 let map = par.flat_map(move |x| match x.into_result() {
356 Ok(x) => ResultOfIter::ok(flat_map(x).into_iter()),
357 Err(e) => ResultOfIter::err(e),
358 });
359 map.into_fallible_result()
360 }
361
362 /// Creates an iterator that both filters and maps.
363 ///
364 /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
365 ///
366 /// `filter_map` can be used to make chains of `filter` and `map` more concise.
367 /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
368 ///
369 /// # Examples
370 ///
371 /// ```
372 /// use orx_parallel::*;
373 ///
374 /// // all succeeds
375 /// let a: Vec<Result<&str, char>> = vec![Ok("1"), Ok("two"), Ok("NaN"), Ok("four"), Ok("5")];
376 ///
377 /// let numbers: Result<Vec<_>, char> = a
378 /// .into_par()
379 /// .into_fallible_result()
380 /// .filter_map(|s| s.parse::<usize>().ok())
381 /// .collect();
382 ///
383 /// assert_eq!(numbers, Ok(vec![1, 5]));
384 ///
385 /// // at least one fails
386 /// let a: Vec<Result<&str, char>> = vec![Ok("1"), Ok("two"), Err('x'), Ok("four"), Ok("5")];
387 ///
388 /// let numbers: Result<Vec<_>, char> = a
389 /// .into_par()
390 /// .into_fallible_result()
391 /// .filter_map(|s| s.parse::<usize>().ok())
392 /// .collect();
393 ///
394 /// assert_eq!(numbers, Err('x'));
395 /// ```
396 fn filter_map<Out, FilterMap>(
397 self,
398 filter_map: FilterMap,
399 ) -> impl ParIterResult<R, Item = Out, Err = Self::Err>
400 where
401 Self: Sized,
402 FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone,
403 Out: Send,
404 {
405 let par = self.into_regular_par();
406 let filter_map = par.filter_map(move |x| match x.into_result() {
407 Ok(x) => filter_map(x).map(|x| Ok(x)),
408 Err(e) => Some(Err(e)),
409 });
410 filter_map.into_fallible_result()
411 }
412
413 /// Does something with each successful element of an iterator, passing the value on, provided that all elements are of Ok variant;
414 /// short-circuits and returns the error otherwise.
415 ///
416 /// When using iterators, you’ll often chain several of them together.
417 /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
418 /// To do that, insert a call to `inspect()`.
419 ///
420 /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
421 /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
422 ///
423 /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
424 /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
425 /// collect some intermediate values during parallel execution for further inspection.
426 /// The following example demonstrates such a use case.
427 ///
428 /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
429 ///
430 /// ```
431 /// use orx_parallel::*;
432 /// use orx_concurrent_bag::*;
433 /// use std::num::ParseIntError;
434 ///
435 /// // all succeeds
436 /// let a: Vec<Result<u32, ParseIntError>> = ["1", "4", "2", "3"]
437 /// .into_iter()
438 /// .map(|x| x.parse::<u32>())
439 /// .collect();
440 ///
441 /// // let's add some inspect() calls to investigate what's happening
442 /// // - log some events
443 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
444 /// let bag = ConcurrentBag::new();
445 ///
446 /// let sum = a
447 /// .par()
448 /// .cloned()
449 /// .into_fallible_result()
450 /// .inspect(|x| println!("about to filter: {x}"))
451 /// .filter(|x| x % 2 == 0)
452 /// .inspect(|x| {
453 /// bag.push(*x);
454 /// println!("made it through filter: {x}");
455 /// })
456 /// .sum();
457 /// assert_eq!(sum, Ok(4 + 2));
458 ///
459 /// let mut values_made_through = bag.into_inner();
460 /// values_made_through.sort();
461 /// assert_eq!(values_made_through, [2, 4]);
462 ///
463 /// // at least one fails
464 /// let a: Vec<Result<u32, ParseIntError>> = ["1", "4", "x", "3"]
465 /// .into_iter()
466 /// .map(|x| x.parse::<u32>())
467 /// .collect();
468 ///
469 /// // let's add some inspect() calls to investigate what's happening
470 /// // - log some events
471 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
472 /// let bag = ConcurrentBag::new();
473 ///
474 /// let sum = a
475 /// .par()
476 /// .cloned()
477 /// .into_fallible_result()
478 /// .inspect(|x| println!("about to filter: {x}"))
479 /// .filter(|x| x % 2 == 0)
480 /// .inspect(|x| {
481 /// bag.push(*x);
482 /// println!("made it through filter: {x}");
483 /// })
484 /// .sum();
485 /// assert!(sum.is_err());
486 /// ```
487 fn inspect<Operation>(
488 self,
489 operation: Operation,
490 ) -> impl ParIterResult<R, Item = Self::Item, Err = Self::Err>
491 where
492 Self: Sized,
493 Operation: Fn(&Self::Item) + Sync + Clone,
494 Self::Item: Send,
495 {
496 let map = move |x| {
497 operation(&x);
498 x
499 };
500 self.map(map)
501 }
502
503 // collect
504
505 /// Collects all the items from an iterator into a collection iff all elements are of Ok variant.
506 /// Early exits and returns the error if any of the elements is an Err.
507 ///
508 /// This is useful when you already have a collection and want to add the iterator items to it.
509 ///
510 /// The collection is passed in as owned value, and returned back with the additional elements.
511 ///
512 /// All collections implementing [`ParCollectInto`] can be used to collect into.
513 ///
514 /// [`ParCollectInto`]: crate::ParCollectInto
515 ///
516 /// # Examples
517 ///
518 /// ```
519 /// use orx_parallel::*;
520 ///
521 /// let vec: Vec<i32> = vec![0, 1];
522 ///
523 /// // all succeeds
524 /// let result = ["1", "2", "3"]
525 /// .into_par()
526 /// .map(|x| x.parse::<i32>())
527 /// .into_fallible_result()
528 /// .map(|x| x * 10)
529 /// .collect_into(vec);
530 ///
531 /// assert!(result.is_ok());
532 /// let vec = result.unwrap();
533 /// assert_eq!(vec, vec![0, 1, 10, 20, 30]);
534 ///
535 /// // at least one fails
536 ///
537 /// let result = ["1", "x!", "3"]
538 /// .into_par()
539 /// .map(|x| x.parse::<i32>())
540 /// .into_fallible_result()
541 /// .map(|x| x * 10)
542 /// .collect_into(vec);
543 /// assert!(result.is_err());
544 /// ```
545 fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
546 where
547 C: ParCollectInto<Self::Item>,
548 Self::Item: Send,
549 Self::Err: Send;
550
551 /// Transforms an iterator into a collection iff all elements are of Ok variant.
552 /// Early exits and returns the error if any of the elements is an Err.
553 ///
554 /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
555 /// the type of the result collection; or turbofish annotation can be used.
556 ///
557 /// All collections implementing [`ParCollectInto`] can be used to collect into.
558 ///
559 /// [`ParCollectInto`]: crate::ParCollectInto
560 ///
561 /// # Examples
562 ///
563 /// ```
564 /// use orx_parallel::*;
565 ///
566 /// // all succeeds
567 ///
568 /// let result_doubled: Result<Vec<i32>, _> = ["1", "2", "3"]
569 /// .into_par()
570 /// .map(|x| x.parse::<i32>())
571 /// .into_fallible_result()
572 /// .map(|x| x * 2)
573 /// .collect();
574 ///
575 /// assert_eq!(result_doubled, Ok(vec![2, 4, 6]));
576 ///
577 /// // at least one fails
578 ///
579 /// let result_doubled: Result<Vec<i32>, _> = ["1", "x!", "3"]
580 /// .into_par()
581 /// .map(|x| x.parse::<i32>())
582 /// .into_fallible_result()
583 /// .map(|x| x * 2)
584 /// .collect();
585 ///
586 /// assert!(result_doubled.is_err());
587 /// ```
588 fn collect<C>(self) -> Result<C, Self::Err>
589 where
590 Self: Sized,
591 Self::Item: Send,
592 Self::Err: Send,
593 C: ParCollectInto<Self::Item>,
594 {
595 let output = C::empty(self.con_iter_len());
596 self.collect_into(output)
597 }
598
599 // reduce
600
601 /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
602 /// Early exits and returns the error if any of the elements is an Err.
603 ///
604 /// If the iterator is empty, returns `Ok(None)`; otherwise, returns `Ok` of result of the reduction.
605 ///
606 /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
607 ///
608 /// # Example
609 ///
610 /// ```
611 /// use orx_parallel::*;
612 ///
613 /// fn safe_div(a: u32, b: u32) -> Result<u32, char> {
614 /// match b {
615 /// 0 => Err('!'),
616 /// b => Ok(a / b),
617 /// }
618 /// }
619 ///
620 /// // all succeeds
621 /// let reduced: Result<Option<u32>, char> = (1..10)
622 /// .par()
623 /// .map(|x| safe_div(100, x as u32))
624 /// .into_fallible_result()
625 /// .reduce(|acc, e| acc + e);
626 /// assert_eq!(reduced, Ok(Some(281)));
627 ///
628 /// // all succeeds - empty iterator
629 /// let reduced: Result<Option<u32>, char> = (1..1)
630 /// .par()
631 /// .map(|x| safe_div(100, x as u32))
632 /// .into_fallible_result()
633 /// .reduce(|acc, e| acc + e);
634 /// assert_eq!(reduced, Ok(None));
635 ///
636 /// // at least one fails
637 /// let reduced: Result<Option<u32>, char> = (0..10)
638 /// .par()
639 /// .map(|x| safe_div(100, x as u32))
640 /// .into_fallible_result()
641 /// .reduce(|acc, e| acc + e);
642 /// assert_eq!(reduced, Err('!'));
643 /// ```
644 fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
645 where
646 Self::Item: Send,
647 Self::Err: Send,
648 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
649
650 /// Tests if every element of the iterator matches a predicate.
651 /// Early exits and returns the error if any of the elements is an Err.
652 ///
653 /// `all` takes a `predicate` that returns true or false.
654 /// It applies this closure to each Ok element of the iterator,
655 /// and if they all return true, then so does `all`.
656 /// If any of them returns false, it returns false.
657 ///
658 /// Note that `all` computation is itself also short-circuiting; in other words, it will stop processing as soon as it finds a false,
659 /// given that no matter what else happens, the result will also be false.
660 ///
661 /// Therefore, in case the fallible iterator contains both an Err element and an Ok element which violates the `predicate`,
662 /// the result is **not deterministic**. It might be the `Err` if it is observed first; or `Ok(false)` if the violation is observed first.
663 ///
664 /// On the other hand, when it returns `Ok(true)`, we are certain that all elements are of `Ok` variant and all satisfy the `predicate`.
665 ///
666 /// An empty iterator returns `Ok(true)`.
667 ///
668 /// # Examples
669 ///
670 /// ```
671 /// use orx_parallel::*;
672 ///
673 /// // all Ok
674 /// let result = vec!["1", "2", "3"]
675 /// .into_par()
676 /// .map(|x| x.parse::<i32>())
677 /// .into_fallible_result()
678 /// .all(|x| *x > 0);
679 /// assert_eq!(result, Ok(true));
680 ///
681 /// let result = vec!["1", "2", "3"]
682 /// .into_par()
683 /// .map(|x| x.parse::<i32>())
684 /// .into_fallible_result()
685 /// .all(|x| *x > 1);
686 /// assert_eq!(result, Ok(false));
687 ///
688 /// let result = Vec::<&str>::new()
689 /// .into_par()
690 /// .map(|x| x.parse::<i32>())
691 /// .into_fallible_result()
692 /// .all(|x| *x > 1);
693 /// assert_eq!(result, Ok(true)); // empty iterator
694 ///
695 /// // at least one Err
696 /// let result = vec!["1", "x!", "3"]
697 /// .into_par()
698 /// .map(|x| x.parse::<i32>())
699 /// .into_fallible_result()
700 /// .all(|x| *x > 0);
701 /// assert!(result.is_err());
702 /// ```
703 fn all<Predicate>(self, predicate: Predicate) -> Result<bool, Self::Err>
704 where
705 Self: Sized,
706 Self::Item: Send,
707 Self::Err: Send,
708 Predicate: Fn(&Self::Item) -> bool + Sync,
709 {
710 let violates = |x: &Self::Item| !predicate(x);
711 self.find(violates).map(|x| x.is_none())
712 }
713
714 /// Tests if any element of the iterator matches a predicate.
715 /// Early exits and returns the error if any of the elements is an Err.
716 ///
717 /// `any` takes a `predicate` that returns true or false.
718 /// It applies this closure to each element of the iterator,
719 /// and if any of the elements returns true, then so does `any`.
720 /// If all of them return false, it returns false.
721 ///
722 /// Note that `any` computation is itself also short-circuiting; in other words, it will stop processing as soon as it finds a true,
723 /// given that no matter what else happens, the result will also be true.
724 ///
725 /// Therefore, in case the fallible iterator contains both an Err element and an Ok element which satisfies the `predicate`,
726 /// the result is **not deterministic**. It might be the `Err` if it is observed first; or `Ok(true)` if element satisfying the predicate
727 /// is observed first.
728 ///
729 /// On the other hand, when it returns `Ok(false)`, we are certain that all elements are of `Ok` variant and none of them satisfies the `predicate`.
730 ///
731 /// An empty iterator returns `Ok(false)`.
732 ///
733 /// # Examples
734 ///
735 /// ```
736 /// use orx_parallel::*;
737 ///
738 /// // all Ok
739 /// let result = vec!["1", "2", "3"]
740 /// .into_par()
741 /// .map(|x| x.parse::<i32>())
742 /// .into_fallible_result()
743 /// .any(|x| *x > 1);
744 /// assert_eq!(result, Ok(true));
745 ///
746 /// let result = vec!["1", "2", "3"]
747 /// .into_par()
748 /// .map(|x| x.parse::<i32>())
749 /// .into_fallible_result()
750 /// .any(|x| *x > 3);
751 /// assert_eq!(result, Ok(false));
752 ///
753 /// let result = Vec::<&str>::new()
754 /// .into_par()
755 /// .map(|x| x.parse::<i32>())
756 /// .into_fallible_result()
757 /// .any(|x| *x > 1);
758 /// assert_eq!(result, Ok(false)); // empty iterator
759 ///
760 /// // at least one Err
761 /// let result = vec!["1", "x!", "3"]
762 /// .into_par()
763 /// .map(|x| x.parse::<i32>())
764 /// .into_fallible_result()
765 /// .any(|x| *x > 5);
766 /// assert!(result.is_err());
767 /// ```
768 fn any<Predicate>(self, predicate: Predicate) -> Result<bool, Self::Err>
769 where
770 Self: Sized,
771 Self::Item: Send,
772 Self::Err: Send,
773 Predicate: Fn(&Self::Item) -> bool + Sync,
774 {
775 self.find(predicate).map(|x| x.is_some())
776 }
777
778 /// Consumes the iterator, counting the number of iterations and returning it.
779 /// Early exits and returns the error if any of the elements is an Err.
780 ///
781 /// # Examples
782 ///
783 /// ```
784 /// use orx_parallel::*;
785 ///
786 /// // all Ok
787 /// let result = vec!["1", "2", "3"]
788 /// .into_par()
789 /// .map(|x| x.parse::<i32>())
790 /// .into_fallible_result()
791 /// .filter(|x| *x >= 2)
792 /// .count();
793 /// assert_eq!(result, Ok(2));
794 ///
795 /// // at least one Err
796 /// let result = vec!["x!", "2", "3"]
797 /// .into_par()
798 /// .map(|x| x.parse::<i32>())
799 /// .into_fallible_result()
800 /// .filter(|x| *x >= 2)
801 /// .count();
802 /// assert!(result.is_err());
803 /// ```
804 fn count(self) -> Result<usize, Self::Err>
805 where
806 Self: Sized,
807 Self::Err: Send,
808 {
809 self.map(map_count)
810 .reduce(reduce_sum)
811 .map(|x| x.unwrap_or(0))
812 }
813
814 /// Calls a closure on each element of an iterator, and returns `Ok(())` if all elements succeed.
815 /// Early exits and returns the error if any of the elements is an Err.
816 ///
817 /// # Examples
818 ///
819 /// Basic usage:
820 ///
821 /// ```
822 /// use orx_parallel::*;
823 /// use std::sync::mpsc::channel;
824 ///
825 /// // all Ok
826 /// let (tx, rx) = channel();
827 /// let result = vec!["0", "1", "2", "3", "4"]
828 /// .into_par()
829 /// .map(|x| x.parse::<i32>())
830 /// .into_fallible_result()
831 /// .map(|x| x * 2 + 1)
832 /// .for_each(move |x| tx.send(x).unwrap());
833 ///
834 /// assert_eq!(result, Ok(()));
835 ///
836 /// let mut v: Vec<_> = rx.iter().collect();
837 /// v.sort(); // order can be mixed, since messages will be sent in parallel
838 /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
839 ///
840 /// // at least one Err
841 /// let (tx, _rx) = channel();
842 /// let result = vec!["0", "1", "2", "x!", "4"]
843 /// .into_par()
844 /// .map(|x| x.parse::<i32>())
845 /// .into_fallible_result()
846 /// .map(|x| x * 2 + 1)
847 /// .for_each(move |x| tx.send(x).unwrap());
848 ///
849 /// assert!(result.is_err());
850 /// ```
851 fn for_each<Operation>(self, operation: Operation) -> Result<(), Self::Err>
852 where
853 Self: Sized,
854 Self::Err: Send,
855 Operation: Fn(Self::Item) + Sync,
856 {
857 let map = |x| operation(x);
858 self.map(map).reduce(reduce_unit).map(|_| ())
859 }
860
861 /// Returns Ok of maximum element of an iterator if all elements succeed.
862 /// If the iterator is empty, `Ok(None)` is returned.
863 /// Early exits and returns the error if any of the elements is an Err.
864 ///
865 /// # Examples
866 ///
867 /// ```
868 /// use orx_parallel::*;
869 ///
870 /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
871 /// assert_eq!(a.par().copied().into_fallible_result().max(), Ok(Some(3)));
872 ///
873 /// let b: Vec<Result<i32, char>> = vec![];
874 /// assert_eq!(b.par().copied().into_fallible_result().max(), Ok(None));
875 ///
876 /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
877 /// assert_eq!(c.par().copied().into_fallible_result().max(), Err('x'));
878 /// ```
879 fn max(self) -> Result<Option<Self::Item>, Self::Err>
880 where
881 Self: Sized,
882 Self::Err: Send,
883 Self::Item: Ord + Send,
884 {
885 self.reduce(Ord::max)
886 }
887
888 /// Returns the element that gives the maximum value with respect to the specified `compare` function.
889 /// If the iterator is empty, `Ok(None)` is returned.
890 /// Early exits and returns the error if any of the elements is an Err.
891 ///
892 /// ```
893 /// use orx_parallel::*;
894 ///
895 /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
896 /// assert_eq!(
897 /// a.par()
898 /// .copied()
899 /// .into_fallible_result()
900 /// .max_by(|a, b| a.cmp(b)),
901 /// Ok(Some(3))
902 /// );
903 ///
904 /// let b: Vec<Result<i32, char>> = vec![];
905 /// assert_eq!(
906 /// b.par()
907 /// .copied()
908 /// .into_fallible_result()
909 /// .max_by(|a, b| a.cmp(b)),
910 /// Ok(None)
911 /// );
912 ///
913 /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
914 /// assert_eq!(
915 /// c.par()
916 /// .copied()
917 /// .into_fallible_result()
918 /// .max_by(|a, b| a.cmp(b)),
919 /// Err('x')
920 /// );
921 /// ```
922 fn max_by<Compare>(self, compare: Compare) -> Result<Option<Self::Item>, Self::Err>
923 where
924 Self: Sized,
925 Self::Item: Send,
926 Self::Err: Send,
927 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
928 {
929 let reduce = |x, y| match compare(&x, &y) {
930 Ordering::Greater | Ordering::Equal => x,
931 Ordering::Less => y,
932 };
933 self.reduce(reduce)
934 }
935
936 /// Returns the element that gives the maximum value from the specified function.
937 /// If the iterator is empty, `Ok(None)` is returned.
938 /// Early exits and returns the error if any of the elements is an Err.
939 ///
940 /// # Examples
941 ///
942 /// ```
943 /// use orx_parallel::*;
944 ///
945 /// let a: Vec<Result<i32, char>> = vec![Ok(-1), Ok(2), Ok(-3)];
946 /// assert_eq!(
947 /// a.par()
948 /// .copied()
949 /// .into_fallible_result()
950 /// .max_by_key(|x| x.abs()),
951 /// Ok(Some(-3))
952 /// );
953 ///
954 /// let b: Vec<Result<i32, char>> = vec![];
955 /// assert_eq!(
956 /// b.par()
957 /// .copied()
958 /// .into_fallible_result()
959 /// .max_by_key(|x| x.abs()),
960 /// Ok(None)
961 /// );
962 ///
963 /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
964 /// assert_eq!(
965 /// c.par()
966 /// .copied()
967 /// .into_fallible_result()
968 /// .max_by_key(|x| x.abs()),
969 /// Err('x')
970 /// );
971 /// ```
972 fn max_by_key<Key, GetKey>(self, key: GetKey) -> Result<Option<Self::Item>, Self::Err>
973 where
974 Self: Sized,
975 Self::Item: Send,
976 Self::Err: Send,
977 Key: Ord,
978 GetKey: Fn(&Self::Item) -> Key + Sync,
979 {
980 let reduce = |x, y| match key(&x).cmp(&key(&y)) {
981 Ordering::Greater | Ordering::Equal => x,
982 Ordering::Less => y,
983 };
984 self.reduce(reduce)
985 }
986
987 /// Returns Ok of minimum element of an iterator if all elements succeed.
988 /// If the iterator is empty, `Ok(None)` is returned.
989 /// Early exits and returns the error if any of the elements is an Err.
990 ///
991 /// # Examples
992 ///
993 /// ```
994 /// use orx_parallel::*;
995 ///
996 /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
997 /// assert_eq!(a.par().copied().into_fallible_result().min(), Ok(Some(1)));
998 ///
999 /// let b: Vec<Result<i32, char>> = vec![];
1000 /// assert_eq!(b.par().copied().into_fallible_result().min(), Ok(None));
1001 ///
1002 /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
1003 /// assert_eq!(c.par().copied().into_fallible_result().min(), Err('x'));
1004 /// ```
1005 fn min(self) -> Result<Option<Self::Item>, Self::Err>
1006 where
1007 Self: Sized,
1008 Self::Item: Ord + Send,
1009 Self::Err: Send,
1010 {
1011 self.reduce(Ord::min)
1012 }
1013
1014 /// Returns the element that gives the maximum value with respect to the specified `compare` function.
1015 /// If the iterator is empty, `Ok(None)` is returned.
1016 /// Early exits and returns the error if any of the elements is an Err.
1017 ///
1018 /// ```
1019 /// use orx_parallel::*;
1020 ///
1021 /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
1022 /// assert_eq!(
1023 /// a.par()
1024 /// .copied()
1025 /// .into_fallible_result()
1026 /// .min_by(|a, b| a.cmp(b)),
1027 /// Ok(Some(1))
1028 /// );
1029 ///
1030 /// let b: Vec<Result<i32, char>> = vec![];
1031 /// assert_eq!(
1032 /// b.par()
1033 /// .copied()
1034 /// .into_fallible_result()
1035 /// .min_by(|a, b| a.cmp(b)),
1036 /// Ok(None)
1037 /// );
1038 ///
1039 /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
1040 /// assert_eq!(
1041 /// c.par()
1042 /// .copied()
1043 /// .into_fallible_result()
1044 /// .min_by(|a, b| a.cmp(b)),
1045 /// Err('x')
1046 /// );
1047 /// ```
1048 fn min_by<Compare>(self, compare: Compare) -> Result<Option<Self::Item>, Self::Err>
1049 where
1050 Self: Sized,
1051 Self::Item: Send,
1052 Self::Err: Send,
1053 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1054 {
1055 let reduce = |x, y| match compare(&x, &y) {
1056 Ordering::Less | Ordering::Equal => x,
1057 Ordering::Greater => y,
1058 };
1059 self.reduce(reduce)
1060 }
1061
1062 /// Returns the element that gives the minimum value from the specified function.
1063 /// If the iterator is empty, `Ok(None)` is returned.
1064 /// Early exits and returns the error if any of the elements is an Err.
1065 ///
1066 /// # Examples
1067 ///
1068 /// ```
1069 /// use orx_parallel::*;
1070 ///
1071 /// let a: Vec<Result<i32, char>> = vec![Ok(-1), Ok(2), Ok(-3)];
1072 /// assert_eq!(
1073 /// a.par()
1074 /// .copied()
1075 /// .into_fallible_result()
1076 /// .min_by_key(|x| x.abs()),
1077 /// Ok(Some(-1))
1078 /// );
1079 ///
1080 /// let b: Vec<Result<i32, char>> = vec![];
1081 /// assert_eq!(
1082 /// b.par()
1083 /// .copied()
1084 /// .into_fallible_result()
1085 /// .min_by_key(|x| x.abs()),
1086 /// Ok(None)
1087 /// );
1088 ///
1089 /// let c: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Err('x')];
1090 /// assert_eq!(
1091 /// c.par()
1092 /// .copied()
1093 /// .into_fallible_result()
1094 /// .min_by_key(|x| x.abs()),
1095 /// Err('x')
1096 /// );
1097 /// ```
1098 fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Result<Option<Self::Item>, Self::Err>
1099 where
1100 Self: Sized,
1101 Self::Item: Send,
1102 Self::Err: Send,
1103 Key: Ord,
1104 GetKey: Fn(&Self::Item) -> Key + Sync,
1105 {
1106 let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1107 Ordering::Less | Ordering::Equal => x,
1108 Ordering::Greater => y,
1109 };
1110 self.reduce(reduce)
1111 }
1112
1113 /// Sums the elements of an iterator.
1114 /// Early exits and returns the error if any of the elements is an Err.
1115 ///
1116 /// If the iterator is empty, returns zero; otherwise, returns `Ok` of the sum.
1117 ///
1118 /// # Example
1119 ///
1120 /// ```
1121 /// use orx_parallel::*;
1122 ///
1123 /// fn safe_div(a: u32, b: u32) -> Result<u32, char> {
1124 /// match b {
1125 /// 0 => Err('!'),
1126 /// b => Ok(a / b),
1127 /// }
1128 /// }
1129 ///
1130 /// // all succeeds
1131 /// let reduced: Result<u32, char> = (1..10)
1132 /// .par()
1133 /// .map(|x| safe_div(100, x as u32))
1134 /// .into_fallible_result()
1135 /// .sum();
1136 /// assert_eq!(reduced, Ok(281));
1137 ///
1138 /// // all succeeds - empty iterator
1139 /// let reduced: Result<u32, char> = (1..1)
1140 /// .par()
1141 /// .map(|x| safe_div(100, x as u32))
1142 /// .into_fallible_result()
1143 /// .sum();
1144 /// assert_eq!(reduced, Ok(0));
1145 ///
1146 /// // at least one fails
1147 /// let reduced: Result<u32, char> = (0..10)
1148 /// .par()
1149 /// .map(|x| safe_div(100, x as u32))
1150 /// .into_fallible_result()
1151 /// .sum();
1152 /// assert_eq!(reduced, Err('!'));
1153 /// ```
1154 fn sum<Out>(self) -> Result<Out, Self::Err>
1155 where
1156 Self: Sized,
1157 Self::Item: Sum<Out>,
1158 Self::Err: Send,
1159 Out: Send,
1160 {
1161 self.map(Self::Item::map)
1162 .reduce(Self::Item::reduce)
1163 .map(|x| x.unwrap_or(Self::Item::zero()))
1164 }
1165
1166 // early exit
1167
1168 /// Returns the first (or any) element of the iterator.
1169 /// If the iterator is empty, `Ok(None)` is returned.
1170 /// Early exits and returns the error if an Err element is observed first.
1171 ///
1172 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1173 /// * any element is returned if `IterationOrder::Arbitrary` is set.
1174 ///
1175 /// Note that `find` itself is short-circuiting in addition to fallible computation.
1176 /// Therefore, in case the fallible iterator contains both an Err and an Ok element,
1177 /// the result is **not deterministic**:
1178 /// * it might be the `Err` if it is observed first;
1179 /// * or `Ok(element)` if the Ok element is observed first.
1180 ///
1181 /// # Examples
1182 ///
1183 /// ```
1184 /// use orx_parallel::*;
1185 ///
1186 /// let a: Vec<Result<i32, char>> = vec![];
1187 /// assert_eq!(a.par().copied().into_fallible_result().first(), Ok(None));
1188 ///
1189 /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
1190 /// assert_eq!(a.par().copied().into_fallible_result().first(), Ok(Some(1)));
1191 ///
1192 /// let a: Vec<Result<i32, char>> = vec![Ok(1), Err('x'), Ok(3)];
1193 /// let result = a.par().copied().into_fallible_result().first();
1194 /// // depends on whichever is observed first in parallel execution
1195 /// assert!(result == Ok(Some(1)) || result == Err('x'));
1196 /// ```
1197 fn first(self) -> Result<Option<Self::Item>, Self::Err>
1198 where
1199 Self::Item: Send,
1200 Self::Err: Send;
1201
1202 /// Returns the first (or any) element of the iterator that satisfies the `predicate`.
1203 /// If the iterator is empty, `Ok(None)` is returned.
1204 /// Early exits and returns the error if an Err element is observed first.
1205 ///
1206 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1207 /// * any element is returned if `IterationOrder::Arbitrary` is set.
1208 ///
1209 /// Note that `find` itself is short-circuiting in addition to fallible computation.
1210 /// Therefore, in case the fallible iterator contains both an Err and an Ok element,
1211 /// the result is **not deterministic**:
1212 /// * it might be the `Err` if it is observed first;
1213 /// * or `Ok(element)` if the Ok element satisfying the predicate is observed first.
1214 ///
1215 /// # Examples
1216 ///
1217 /// ```
1218 /// use orx_parallel::*;
1219 ///
1220 /// let a: Vec<Result<i32, char>> = vec![];
1221 /// assert_eq!(
1222 /// a.par().copied().into_fallible_result().find(|x| *x > 2),
1223 /// Ok(None)
1224 /// );
1225 ///
1226 /// let a: Vec<Result<i32, char>> = vec![Ok(1), Ok(2), Ok(3)];
1227 /// assert_eq!(
1228 /// a.par().copied().into_fallible_result().find(|x| *x > 2),
1229 /// Ok(Some(3))
1230 /// );
1231 ///
1232 /// let a: Vec<Result<i32, char>> = vec![Ok(1), Err('x'), Ok(3)];
1233 /// let result = a.par().copied().into_fallible_result().find(|x| *x > 2);
1234 /// // depends on whichever is observed first in parallel execution
1235 /// assert!(result == Ok(Some(3)) || result == Err('x'));
1236 /// ```
1237 fn find<Predicate>(self, predicate: Predicate) -> Result<Option<Self::Item>, Self::Err>
1238 where
1239 Self: Sized,
1240 Self::Item: Send,
1241 Self::Err: Send,
1242 Predicate: Fn(&Self::Item) -> bool + Sync,
1243 {
1244 self.filter(&predicate).first()
1245 }
1246}
1247
1248pub trait IntoResult<T, E> {
1249 fn into_result(self) -> Result<T, E>;
1250}
1251
1252impl<T, E> IntoResult<T, E> for Result<T, E> {
1253 #[inline(always)]
1254 fn into_result(self) -> Result<T, E> {
1255 self
1256 }
1257}