orx_parallel/par_iter_option.rs
1use crate::default_fns::{map_count, reduce_sum, reduce_unit};
2use crate::runner::{DefaultRunner, ParallelRunner};
3use crate::{
4 ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParThreadPool, RunnerWithPool, Sum,
5};
6use core::cmp::Ordering;
7
8/// A parallel iterator for which the computation either completely succeeds,
9/// or fails and **early exits** with None.
10///
11/// # Examples
12///
13/// To demonstrate the difference of fallible iterator's behavior, consider the following simple example.
14/// We parse a series of strings into integers.
15/// We try this twice:
16/// * in the first one, all inputs are good, hence, we obtain Some of parsed numbers,
17/// * in the second one, the value in the middle is faulty, we expect the computation to fail.
18///
19/// In the following, we try to achieve this both with a regular parallel iterator ([`ParIter`]) and a fallible
20/// parallel iterator, `ParIterOption` in this case.
21///
22/// You may notice the following differences:
23/// * In the regular iterator, it is not very convenient to keep both the resulting numbers and a potential error.
24/// Here, we make use of `filter_map`.
25/// * On the other hand, the `collect` method of the fallible iterator directly returns an `Option` of the computation
26/// which is either Some of all parsed numbers or None if any computation fails.
27/// * Also importantly note that the regular iterator will try to parse all the strings, regardless of how many times
28/// the parsing fails.
29/// * Fallible iterator, on the other hand, stops immediately after observing the first None and short circuits the
30/// computation.
31///
32/// ```
33/// use orx_parallel::*;
34///
35/// let expected_results = [Some((0..100).collect::<Vec<_>>()), None];
36///
37/// for expected in expected_results {
38/// let expected_some = expected.is_some();
39/// let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
40/// if !expected_some {
41/// inputs.insert(50, "x".to_string()); // plant an error case
42/// }
43///
44/// // regular parallel iterator
45/// let results = inputs.par().map(|x| x.parse::<u32>().ok());
46/// let numbers: Vec<_> = results.filter_map(|x| x).collect();
47/// if expected_some {
48/// assert_eq!(&expected, &Some(numbers));
49/// } else {
50/// // otherwise, numbers contains some numbers, but we are not sure
51/// // if the computation completely succeeded or not
52/// }
53///
54/// // fallible parallel iterator
55/// let results = inputs.par().map(|x| x.parse::<u32>().ok());
56/// let result: Option<Vec<_>> = results.into_fallible_option().collect();
57/// assert_eq!(&expected, &result);
58/// }
59/// ```
60///
61/// These differences are not specific to `collect`; all fallible iterator methods return an option.
62/// The following demonstrate reduction examples, where the result is either the reduced value if the entire computation
63/// succeeds, or None.
64///
65/// ```
66/// use orx_parallel::*;
67///
68/// for will_fail in [false, true] {
69/// let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
70/// if will_fail {
71/// inputs.insert(50, "x".to_string()); // plant an error case
72/// }
73///
74/// // sum
75/// let results = inputs.par().map(|x| x.parse::<u32>().ok());
76/// let result: Option<u32> = results.into_fallible_option().sum();
77/// match will_fail {
78/// true => assert_eq!(result, None),
79/// false => assert_eq!(result, Some(4950)),
80/// }
81///
82/// // max
83/// let results = inputs.par().map(|x| x.parse::<u32>().ok());
84/// let result: Option<Option<u32>> = results.into_fallible_option().max();
85/// match will_fail {
86/// true => assert_eq!(result, None),
87/// false => assert_eq!(result, Some(Some(99))),
88/// }
89/// }
90/// ```
91///
92/// Finally, similar to regular iterators, a fallible parallel iterator can be tranformed using iterator methods.
93/// However, the transformation is on the success path, the failure case of None always short circuits and returns None.
94///
95/// ```
96/// use orx_parallel::*;
97///
98/// for will_fail in [false, true] {
99/// let mut inputs: Vec<_> = (0..100).map(|x| x.to_string()).collect();
100/// if will_fail {
101/// inputs.insert(50, "x".to_string()); // plant an error case
102/// }
103///
104/// // fallible iter
105/// let results = inputs.par().map(|x| x.parse::<u32>().ok());
106/// let fallible = results.into_fallible_option();
107///
108/// // transformations
109///
110/// let result: Option<usize> = fallible
111/// .filter(|x| x % 2 == 1) // Item: u32
112/// .map(|x| 3 * x) // Item: u32
113/// .filter_map(|x| (x % 10 != 0).then_some(x)) // Item: u32
114/// .flat_map(|x| [x.to_string(), (10 * x).to_string()]) // Item: String
115/// .map(|x| x.len()) // Item: usize
116/// .sum();
117///
118/// match will_fail {
119/// true => assert_eq!(result, None),
120/// false => assert_eq!(result, Some(312)),
121/// }
122/// }
123/// ```
124///
125/// [`ParIter`]: crate::ParIter
126pub trait ParIterOption<R = DefaultRunner>
127where
128 R: ParallelRunner,
129{
130 /// Type of the success element, to be received as the Some variant iff the entire computation succeeds.
131 type Item;
132
133 // params transformations
134
135 /// Sets the number of threads to be used in the parallel execution.
136 /// Integers can be used as the argument with the following mapping:
137 ///
138 /// * `0` -> `NumThreads::Auto`
139 /// * `1` -> `NumThreads::sequential()`
140 /// * `n > 0` -> `NumThreads::Max(n)`
141 ///
142 /// See [`NumThreads`] and [`crate::ParIter::num_threads`] for details.
143 fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
144
145 /// Sets the number of elements to be pulled from the concurrent iterator during the
146 /// parallel execution. When integers are used as argument, the following mapping applies:
147 ///
148 /// * `0` -> `ChunkSize::Auto`
149 /// * `n > 0` -> `ChunkSize::Exact(n)`
150 ///
151 /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
152 ///
153 /// See [`ChunkSize`] and [`crate::ParIter::chunk_size`] for details.
154 fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
155
156 /// Sets the iteration order of the parallel computation.
157 ///
158 /// See [`IterationOrder`] and [`crate::ParIter::iteration_order`] for details.
159 fn iteration_order(self, order: IterationOrder) -> Self;
160
161 /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
162 ///
163 /// See [`ParIter::with_runner`] for details.
164 ///
165 /// [`DefaultRunner`]: crate::DefaultRunner
166 /// [`ParIter::with_runner`]: crate::ParIter::with_runner
167 fn with_runner<Q: ParallelRunner>(
168 self,
169 orchestrator: Q,
170 ) -> impl ParIterOption<Q, Item = Self::Item>;
171
172 /// Rather than [`DefaultPool`], uses the parallel runner with the given `pool` implementing
173 /// [`ParThreadPool`].
174 ///
175 /// See [`ParIter::with_pool`] for details.
176 ///
177 /// [`DefaultPool`]: crate::DefaultPool
178 /// [`ParIter::with_pool`]: crate::ParIter::with_pool
179 fn with_pool<P: ParThreadPool>(
180 self,
181 pool: P,
182 ) -> impl ParIterOption<RunnerWithPool<P, R::Executor>, Item = Self::Item>
183 where
184 Self: Sized,
185 {
186 let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
187 self.with_runner(runner)
188 }
189
190 // computation transformations
191
192 /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
193 ///
194 /// Transformation is only for the success path where all elements are of the `Some` variant.
195 /// Any observation of a `None` case short-circuits the computation and immediately returns None.
196 ///
197 /// # Examples
198 ///
199 /// ```
200 /// use orx_parallel::*;
201 ///
202 /// // all succeeds
203 /// let a: Vec<Option<u32>> = vec![Some(1), Some(2), Some(3)];
204 /// let iter = a.into_par().into_fallible_option().map(|x| 2 * x);
205 ///
206 /// let b: Option<Vec<_>> = iter.collect();
207 /// assert_eq!(b, Some(vec![2, 4, 6]));
208 ///
209 /// // at least one fails
210 /// let a = vec![Some(1), None, Some(3)];
211 /// let iter = a.into_par().into_fallible_option().map(|x| 2 * x);
212 ///
213 /// let b: Option<Vec<_>> = iter.collect();
214 /// assert_eq!(b, None);
215 /// ```
216 fn map<Out, Map>(self, map: Map) -> impl ParIterOption<R, Item = Out>
217 where
218 Self: Sized,
219 Map: Fn(Self::Item) -> Out + Sync + Clone,
220 Out: Send;
221
222 /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
223 ///
224 /// Transformation is only for the success path where all elements are of the `Some` variant.
225 /// Any observation of a `None` case short-circuits the computation and immediately returns None.
226 ///
227 /// # Examples
228 ///
229 /// ```
230 /// use orx_parallel::*;
231 ///
232 /// // all succeeds
233 /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
234 /// let iter = a.into_par().into_fallible_option().filter(|x| x % 2 == 1);
235 ///
236 /// let b = iter.sum();
237 /// assert_eq!(b, Some(1 + 3));
238 ///
239 /// // at least one fails
240 /// let a = vec![Some(1), None, Some(3)];
241 /// let iter = a.into_par().into_fallible_option().filter(|x| x % 2 == 1);
242 ///
243 /// let b = iter.sum();
244 /// assert_eq!(b, None);
245 /// ```
246 fn filter<Filter>(self, filter: Filter) -> impl ParIterOption<R, Item = Self::Item>
247 where
248 Self: Sized,
249 Filter: Fn(&Self::Item) -> bool + Sync + Clone,
250 Self::Item: Send;
251
252 /// Creates an iterator that works like map, but flattens nested structure.
253 ///
254 /// Transformation is only for the success path where all elements are of the `Some` variant.
255 /// Any observation of a `None` case short-circuits the computation and immediately returns None.
256 ///
257 /// # Examples
258 ///
259 /// ```
260 /// use orx_parallel::*;
261 ///
262 /// // all succeeds
263 /// let words: Vec<Option<&str>> = vec![Some("alpha"), Some("beta"), Some("gamma")];
264 ///
265 /// let all_chars: Option<Vec<_>> = words
266 /// .into_par()
267 /// .into_fallible_option()
268 /// .flat_map(|s| s.chars()) // chars() returns an iterator
269 /// .collect();
270 ///
271 /// let merged: Option<String> = all_chars.map(|chars| chars.iter().collect());
272 /// assert_eq!(merged, Some("alphabetagamma".to_string()));
273 ///
274 /// // at least one fails
275 /// let words: Vec<Option<&str>> = vec![Some("alpha"), Some("beta"), None, Some("gamma")];
276 ///
277 /// let all_chars: Option<Vec<_>> = words
278 /// .into_par()
279 /// .into_fallible_option()
280 /// .flat_map(|s| s.chars()) // chars() returns an iterator
281 /// .collect();
282 ///
283 /// let merged: Option<String> = all_chars.map(|chars| chars.iter().collect());
284 /// assert_eq!(merged, None);
285 /// ```
286 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIterOption<R, Item = IOut::Item>
287 where
288 Self: Sized,
289 IOut: IntoIterator,
290 IOut::Item: Send,
291 FlatMap: Fn(Self::Item) -> IOut + Sync + Clone;
292
293 /// Creates an iterator that both filters and maps.
294 ///
295 /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
296 ///
297 /// `filter_map` can be used to make chains of `filter` and `map` more concise.
298 /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
299 ///
300 /// # Examples
301 ///
302 /// ```
303 /// use orx_parallel::*;
304 ///
305 /// // all succeeds
306 /// let a: Vec<Option<&str>> = vec![Some("1"), Some("two"), Some("NaN"), Some("four"), Some("5")];
307 ///
308 /// let numbers: Option<Vec<_>> = a
309 /// .into_par()
310 /// .into_fallible_option()
311 /// .filter_map(|s| s.parse::<usize>().ok())
312 /// .collect();
313 ///
314 /// assert_eq!(numbers, Some(vec![1, 5]));
315 ///
316 /// // at least one fails
317 /// let a: Vec<Option<&str>> = vec![Some("1"), Some("two"), None, Some("four"), Some("5")];
318 ///
319 /// let numbers: Option<Vec<_>> = a
320 /// .into_par()
321 /// .into_fallible_option()
322 /// .filter_map(|s| s.parse::<usize>().ok())
323 /// .collect();
324 ///
325 /// assert_eq!(numbers, None);
326 /// ```
327 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIterOption<R, Item = Out>
328 where
329 Self: Sized,
330 FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone,
331 Out: Send;
332
333 /// Does something with each successful element of an iterator, passing the value on, provided that all elements are of Some variant;
334 /// short-circuits and returns None otherwise.
335 ///
336 /// When using iterators, you’ll often chain several of them together.
337 /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
338 /// To do that, insert a call to `inspect()`.
339 ///
340 /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
341 /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
342 ///
343 /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
344 /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
345 /// collect some intermediate values during parallel execution for further inspection.
346 /// The following example demonstrates such a use case.
347 ///
348 /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
349 ///
350 /// ```
351 /// use orx_parallel::*;
352 /// use orx_concurrent_bag::*;
353 /// use std::num::ParseIntError;
354 ///
355 /// // all succeeds
356 /// let a: Vec<Option<u32>> = ["1", "4", "2", "3"]
357 /// .into_iter()
358 /// .map(|x| x.parse::<u32>().ok())
359 /// .collect();
360 ///
361 /// // let's add some inspect() calls to investigate what's happening
362 /// // - log some events
363 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
364 /// let bag = ConcurrentBag::new();
365 ///
366 /// let sum = a
367 /// .par()
368 /// .cloned()
369 /// .into_fallible_option()
370 /// .inspect(|x| println!("about to filter: {x}"))
371 /// .filter(|x| x % 2 == 0)
372 /// .inspect(|x| {
373 /// bag.push(*x);
374 /// println!("made it through filter: {x}");
375 /// })
376 /// .sum();
377 /// assert_eq!(sum, Some(4 + 2));
378 ///
379 /// let mut values_made_through = bag.into_inner();
380 /// values_made_through.sort();
381 /// assert_eq!(values_made_through, [2, 4]);
382 ///
383 /// // at least one fails
384 /// let a: Vec<Option<u32>> = ["1", "4", "x", "3"]
385 /// .into_iter()
386 /// .map(|x| x.parse::<u32>().ok())
387 /// .collect();
388 ///
389 /// // let's add some inspect() calls to investigate what's happening
390 /// // - log some events
391 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
392 /// let bag = ConcurrentBag::new();
393 ///
394 /// let sum = a
395 /// .par()
396 /// .cloned()
397 /// .into_fallible_option()
398 /// .inspect(|x| println!("about to filter: {x}"))
399 /// .filter(|x| x % 2 == 0)
400 /// .inspect(|x| {
401 /// bag.push(*x);
402 /// println!("made it through filter: {x}");
403 /// })
404 /// .sum();
405 /// assert_eq!(sum, None);
406 /// ```
407 fn inspect<Operation>(self, operation: Operation) -> impl ParIterOption<R, Item = Self::Item>
408 where
409 Self: Sized,
410 Operation: Fn(&Self::Item) + Sync + Clone,
411 Self::Item: Send;
412
413 // collect
414
415 /// Collects all the items from an iterator into a collection iff all elements are of Some variant.
416 /// Early exits and returns None if any of the elements is None.
417 ///
418 /// This is useful when you already have a collection and want to add the iterator items to it.
419 ///
420 /// The collection is passed in as owned value, and returned back with the additional elements.
421 ///
422 /// All collections implementing [`ParCollectInto`] can be used to collect into.
423 ///
424 /// [`ParCollectInto`]: crate::ParCollectInto
425 ///
426 /// # Examples
427 ///
428 /// ```
429 /// use orx_parallel::*;
430 ///
431 /// let vec: Vec<i32> = vec![0, 1];
432 ///
433 /// // all succeeds
434 /// let result = ["1", "2", "3"]
435 /// .into_par()
436 /// .map(|x| x.parse::<i32>().ok())
437 /// .into_fallible_option()
438 /// .map(|x| x * 10)
439 /// .collect_into(vec);
440 /// assert_eq!(result, Some(vec![0, 1, 10, 20, 30]));
441 ///
442 /// let vec = result.unwrap();
443 ///
444 /// // at least one fails
445 ///
446 /// let result = ["1", "x!", "3"]
447 /// .into_par()
448 /// .map(|x| x.parse::<i32>().ok())
449 /// .into_fallible_option()
450 /// .map(|x| x * 10)
451 /// .collect_into(vec);
452 /// assert_eq!(result, None);
453 /// ```
454 fn collect_into<C>(self, output: C) -> Option<C>
455 where
456 Self::Item: Send,
457 C: ParCollectInto<Self::Item>;
458
459 /// Transforms an iterator into a collection iff all elements are of Ok variant.
460 /// Early exits and returns the error if any of the elements is an Err.
461 ///
462 /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
463 /// the type of the result collection; or turbofish annotation can be used.
464 ///
465 /// All collections implementing [`ParCollectInto`] can be used to collect into.
466 ///
467 /// [`ParCollectInto`]: crate::ParCollectInto
468 ///
469 /// # Examples
470 ///
471 /// ```
472 /// use orx_parallel::*;
473 ///
474 /// // all succeeds
475 ///
476 /// let result_doubled: Option<Vec<i32>> = ["1", "2", "3"]
477 /// .into_par()
478 /// .map(|x| x.parse::<i32>().ok())
479 /// .into_fallible_option()
480 /// .map(|x| x * 2)
481 /// .collect();
482 ///
483 /// assert_eq!(result_doubled, Some(vec![2, 4, 6]));
484 ///
485 /// // at least one fails
486 ///
487 /// let result_doubled: Option<Vec<i32>> = ["1", "x!", "3"]
488 /// .into_par()
489 /// .map(|x| x.parse::<i32>().ok())
490 /// .into_fallible_option()
491 /// .map(|x| x * 2)
492 /// .collect();
493 ///
494 /// assert_eq!(result_doubled, None);
495 /// ```
496 fn collect<C>(self) -> Option<C>
497 where
498 Self::Item: Send,
499 C: ParCollectInto<Self::Item>;
500
501 // reduce
502
503 /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
504 /// Early exits and returns None if any of the elements is None.
505 ///
506 /// If the iterator is empty, returns `Some(None)`; otherwise, returns `Some` of result of the reduction.
507 ///
508 /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
509 ///
510 /// # Example
511 ///
512 /// ```
513 /// use orx_parallel::*;
514 ///
515 /// // all succeeds
516 /// let reduced: Option<Option<u32>> = (1..10)
517 /// .par()
518 /// .map(|x| 100u32.checked_div(x as u32))
519 /// .into_fallible_option()
520 /// .reduce(|acc, e| acc + e);
521 /// assert_eq!(reduced, Some(Some(281)));
522 ///
523 /// // all succeeds - empty iterator
524 /// let reduced: Option<Option<u32>> = (1..1)
525 /// .par()
526 /// .map(|x| 100u32.checked_div(x as u32))
527 /// .into_fallible_option()
528 /// .reduce(|acc, e| acc + e);
529 /// assert_eq!(reduced, Some(None));
530 ///
531 /// // at least one fails
532 /// let reduced: Option<Option<u32>> = (0..10)
533 /// .par()
534 /// .map(|x| 100u32.checked_div(x as u32))
535 /// .into_fallible_option()
536 /// .reduce(|acc, e| acc + e);
537 /// assert_eq!(reduced, None);
538 /// ```
539 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Option<Self::Item>>
540 where
541 Self::Item: Send,
542 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
543
544 /// Tests if every element of the iterator matches a predicate.
545 /// Early exits and returns None if any of the elements is None.
546 ///
547 /// `all` takes a `predicate` that returns true or false.
548 /// It applies this closure to each Ok element of the iterator,
549 /// and if they all return true, then so does `all`.
550 /// If any of them returns false, it returns false.
551 ///
552 /// Note that `all` computation is itself also short-circuiting; in other words, it will stop processing as soon as it finds a false,
553 /// given that no matter what else happens, the result will also be false.
554 ///
555 /// Therefore, in case the fallible iterator contains both a None element and a Some element which violates the `predicate`,
556 /// the result is **not deterministic**. It might be the `None` if it is observed first; or `Some(false)` if the violation is observed first.
557 ///
558 /// On the other hand, when it returns `Some(true)`, we are certain that all elements are of `Some` variant and all satisfy the `predicate`.
559 ///
560 /// An empty iterator returns `Some(true)`.
561 ///
562 /// # Examples
563 ///
564 /// ```
565 /// use orx_parallel::*;
566 ///
567 /// // all Some
568 /// let result = vec!["1", "2", "3"]
569 /// .into_par()
570 /// .map(|x| x.parse::<i32>().ok())
571 /// .into_fallible_option()
572 /// .all(|x| *x > 0);
573 /// assert_eq!(result, Some(true));
574 ///
575 /// let result = vec!["1", "2", "3"]
576 /// .into_par()
577 /// .map(|x| x.parse::<i32>().ok())
578 /// .into_fallible_option()
579 /// .all(|x| *x > 1);
580 /// assert_eq!(result, Some(false));
581 ///
582 /// let result = Vec::<&str>::new()
583 /// .into_par()
584 /// .map(|x| x.parse::<i32>().ok())
585 /// .into_fallible_option()
586 /// .all(|x| *x > 1);
587 /// assert_eq!(result, Some(true)); // empty iterator
588 ///
589 /// // at least one None
590 /// let result = vec!["1", "x!", "3"]
591 /// .into_par()
592 /// .map(|x| x.parse::<i32>().ok())
593 /// .into_fallible_option()
594 /// .all(|x| *x > 0);
595 /// assert_eq!(result, None);
596 /// ```
597 fn all<Predicate>(self, predicate: Predicate) -> Option<bool>
598 where
599 Self: Sized,
600 Self::Item: Send,
601 Predicate: Fn(&Self::Item) -> bool + Sync,
602 {
603 let violates = |x: &Self::Item| !predicate(x);
604 self.find(violates).map(|x| x.is_none())
605 }
606
607 /// Tests if any element of the iterator matches a predicate.
608 /// Early exits and returns None if any of the elements is None.
609 ///
610 /// `any` takes a `predicate` that returns true or false.
611 /// It applies this closure to each element of the iterator,
612 /// and if any of the elements returns true, then so does `any`.
613 /// If all of them return false, it returns false.
614 ///
615 /// Note that `any` computation is itself also short-circuiting; in other words, it will stop processing as soon as it finds a true,
616 /// given that no matter what else happens, the result will also be true.
617 ///
618 /// Therefore, in case the fallible iterator contains both a None element and a Some element which satisfies the `predicate`,
619 /// the result is **not deterministic**. It might be the `None` if it is observed first; or `Some(true)` if element satisfying the predicate
620 /// is observed first.
621 ///
622 /// On the other hand, when it returns `Some(false)`, we are certain that all elements are of `Some` variant and none of them satisfies the `predicate`.
623 ///
624 /// An empty iterator returns `Some(false)`.
625 ///
626 /// # Examples
627 ///
628 /// ```
629 /// use orx_parallel::*;
630 ///
631 /// // all Some
632 /// let result = vec!["1", "2", "3"]
633 /// .into_par()
634 /// .map(|x| x.parse::<i32>().ok())
635 /// .into_fallible_option()
636 /// .any(|x| *x > 1);
637 /// assert_eq!(result, Some(true));
638 ///
639 /// let result = vec!["1", "2", "3"]
640 /// .into_par()
641 /// .map(|x| x.parse::<i32>().ok())
642 /// .into_fallible_option()
643 /// .any(|x| *x > 3);
644 /// assert_eq!(result, Some(false));
645 ///
646 /// let result = Vec::<&str>::new()
647 /// .into_par()
648 /// .map(|x| x.parse::<i32>().ok())
649 /// .into_fallible_option()
650 /// .any(|x| *x > 1);
651 /// assert_eq!(result, Some(false)); // empty iterator
652 ///
653 /// // at least one None
654 /// let result = vec!["1", "x!", "3"]
655 /// .into_par()
656 /// .map(|x| x.parse::<i32>().ok())
657 /// .into_fallible_option()
658 /// .any(|x| *x > 5);
659 /// assert_eq!(result, None);
660 /// ```
661 fn any<Predicate>(self, predicate: Predicate) -> Option<bool>
662 where
663 Self: Sized,
664 Self::Item: Send,
665 Predicate: Fn(&Self::Item) -> bool + Sync,
666 {
667 self.find(predicate).map(|x| x.is_some())
668 }
669
670 /// Consumes the iterator, counting the number of iterations and returning it.
671 /// Early exits and returns None if any of the elements is None.
672 ///
673 /// # Examples
674 ///
675 /// ```
676 /// use orx_parallel::*;
677 ///
678 /// // all Some
679 /// let result = vec!["1", "2", "3"]
680 /// .into_par()
681 /// .map(|x| x.parse::<i32>().ok())
682 /// .into_fallible_option()
683 /// .filter(|x| *x >= 2)
684 /// .count();
685 /// assert_eq!(result, Some(2));
686 ///
687 /// // at least one None
688 /// let result = vec!["x!", "2", "3"]
689 /// .into_par()
690 /// .map(|x| x.parse::<i32>().ok())
691 /// .into_fallible_option()
692 /// .filter(|x| *x >= 2)
693 /// .count();
694 /// assert_eq!(result, None);
695 /// ```
696 fn count(self) -> Option<usize>
697 where
698 Self: Sized,
699 {
700 self.map(map_count)
701 .reduce(reduce_sum)
702 .map(|x| x.unwrap_or(0))
703 }
704
705 /// Calls a closure on each element of an iterator, and returns `Ok(())` if all elements succeed.
706 /// Early exits and returns None if any of the elements is None.
707 ///
708 /// # Examples
709 ///
710 /// Basic usage:
711 ///
712 /// ```
713 /// use orx_parallel::*;
714 /// use std::sync::mpsc::channel;
715 ///
716 /// // all Some
717 /// let (tx, rx) = channel();
718 /// let result = vec!["0", "1", "2", "3", "4"]
719 /// .into_par()
720 /// .map(|x| x.parse::<i32>().ok())
721 /// .into_fallible_option()
722 /// .map(|x| x * 2 + 1)
723 /// .for_each(move |x| tx.send(x).unwrap());
724 ///
725 /// assert_eq!(result, Some(()));
726 ///
727 /// let mut v: Vec<_> = rx.iter().collect();
728 /// v.sort(); // order can be mixed, since messages will be sent in parallel
729 /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
730 ///
731 /// // at least one None
732 /// let (tx, _rx) = channel();
733 /// let result = vec!["0", "1", "2", "x!", "4"]
734 /// .into_par()
735 /// .map(|x| x.parse::<i32>().ok())
736 /// .into_fallible_option()
737 /// .map(|x| x * 2 + 1)
738 /// .for_each(move |x| tx.send(x).unwrap());
739 ///
740 /// assert_eq!(result, None);
741 /// ```
742 fn for_each<Operation>(self, operation: Operation) -> Option<()>
743 where
744 Self: Sized,
745 Operation: Fn(Self::Item) + Sync,
746 {
747 let map = |x| operation(x);
748 self.map(map).reduce(reduce_unit).map(|_| ())
749 }
750
751 /// Returns Some of maximum element of an iterator if all elements succeed.
752 /// If the iterator is empty, `Some(None)` is returned.
753 /// Early exits and returns None if any of the elements is None.
754 ///
755 /// # Examples
756 ///
757 /// ```
758 /// use orx_parallel::*;
759 ///
760 /// let a = vec![Some(1), Some(2), Some(3)];
761 /// assert_eq!(a.par().copied().into_fallible_option().max(), Some(Some(3)));
762 ///
763 /// let b: Vec<Option<i32>> = vec![];
764 /// assert_eq!(b.par().copied().into_fallible_option().max(), Some(None));
765 ///
766 /// let c = vec![Some(1), Some(2), None];
767 /// assert_eq!(c.par().copied().into_fallible_option().max(), None);
768 /// ```
769 fn max(self) -> Option<Option<Self::Item>>
770 where
771 Self: Sized,
772 Self::Item: Ord + Send,
773 {
774 self.reduce(Ord::max)
775 }
776
777 /// Returns the element that gives the maximum value with respect to the specified `compare` function.
778 /// If the iterator is empty, `Some(None)` is returned.
779 /// Early exits and returns None if any of the elements is None.
780 ///
781 /// ```
782 /// use orx_parallel::*;
783 ///
784 /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
785 /// assert_eq!(
786 /// a.par()
787 /// .copied()
788 /// .into_fallible_option()
789 /// .max_by(|a, b| a.cmp(b)),
790 /// Some(Some(3))
791 /// );
792 ///
793 /// let b: Vec<Option<i32>> = vec![];
794 /// assert_eq!(
795 /// b.par()
796 /// .copied()
797 /// .into_fallible_option()
798 /// .max_by(|a, b| a.cmp(b)),
799 /// Some(None)
800 /// );
801 ///
802 /// let c: Vec<Option<i32>> = vec![Some(1), Some(2), None];
803 /// assert_eq!(
804 /// c.par()
805 /// .copied()
806 /// .into_fallible_option()
807 /// .max_by(|a, b| a.cmp(b)),
808 /// None
809 /// );
810 /// ```
811 fn max_by<Compare>(self, compare: Compare) -> Option<Option<Self::Item>>
812 where
813 Self: Sized,
814 Self::Item: Send,
815 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
816 {
817 let reduce = |x, y| match compare(&x, &y) {
818 Ordering::Greater | Ordering::Equal => x,
819 Ordering::Less => y,
820 };
821 self.reduce(reduce)
822 }
823
824 /// Returns the element that gives the maximum value from the specified function.
825 /// If the iterator is empty, `Some(None)` is returned.
826 /// Early exits and returns None if any of the elements is None.
827 ///
828 /// # Examples
829 ///
830 /// ```
831 /// use orx_parallel::*;
832 ///
833 /// let a: Vec<Option<i32>> = vec![Some(-1), Some(2), Some(-3)];
834 /// assert_eq!(
835 /// a.par()
836 /// .copied()
837 /// .into_fallible_option()
838 /// .max_by_key(|x| x.abs()),
839 /// Some(Some(-3))
840 /// );
841 ///
842 /// let b: Vec<Option<i32>> = vec![];
843 /// assert_eq!(
844 /// b.par()
845 /// .copied()
846 /// .into_fallible_option()
847 /// .max_by_key(|x| x.abs()),
848 /// Some(None)
849 /// );
850 ///
851 /// let c: Vec<Option<i32>> = vec![Some(1), Some(2), None];
852 /// assert_eq!(
853 /// c.par()
854 /// .copied()
855 /// .into_fallible_option()
856 /// .max_by_key(|x| x.abs()),
857 /// None
858 /// );
859 /// ```
860 fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Option<Self::Item>>
861 where
862 Self: Sized,
863 Self::Item: Send,
864 Key: Ord,
865 GetKey: Fn(&Self::Item) -> Key + Sync,
866 {
867 let reduce = |x, y| match key(&x).cmp(&key(&y)) {
868 Ordering::Greater | Ordering::Equal => x,
869 Ordering::Less => y,
870 };
871 self.reduce(reduce)
872 }
873
874 /// Returns Some of minimum element of an iterator if all elements succeed.
875 /// If the iterator is empty, `Some(None)` is returned.
876 /// Early exits and returns None if any of the elements is None.
877 ///
878 /// # Examples
879 ///
880 /// ```
881 /// use orx_parallel::*;
882 ///
883 /// let a = vec![Some(1), Some(2), Some(3)];
884 /// assert_eq!(a.par().copied().into_fallible_option().min(), Some(Some(1)));
885 ///
886 /// let b: Vec<Option<i32>> = vec![];
887 /// assert_eq!(b.par().copied().into_fallible_option().min(), Some(None));
888 ///
889 /// let c = vec![Some(1), Some(2), None];
890 /// assert_eq!(c.par().copied().into_fallible_option().min(), None);
891 /// ```
892 fn min(self) -> Option<Option<Self::Item>>
893 where
894 Self: Sized,
895 Self::Item: Ord + Send,
896 {
897 self.reduce(Ord::min)
898 }
899
900 /// Returns the element that gives the minimum value with respect to the specified `compare` function.
901 /// If the iterator is empty, `Some(None)` is returned.
902 /// Early exits and returns None if any of the elements is None.
903 ///
904 /// ```
905 /// use orx_parallel::*;
906 ///
907 /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
908 /// assert_eq!(
909 /// a.par()
910 /// .copied()
911 /// .into_fallible_option()
912 /// .min_by(|a, b| a.cmp(b)),
913 /// Some(Some(1))
914 /// );
915 ///
916 /// let b: Vec<Option<i32>> = vec![];
917 /// assert_eq!(
918 /// b.par()
919 /// .copied()
920 /// .into_fallible_option()
921 /// .min_by(|a, b| a.cmp(b)),
922 /// Some(None)
923 /// );
924 ///
925 /// let c: Vec<Option<i32>> = vec![Some(1), Some(2), None];
926 /// assert_eq!(
927 /// c.par()
928 /// .copied()
929 /// .into_fallible_option()
930 /// .min_by(|a, b| a.cmp(b)),
931 /// None
932 /// );
933 /// ```
934 fn min_by<Compare>(self, compare: Compare) -> Option<Option<Self::Item>>
935 where
936 Self: Sized,
937 Self::Item: Send,
938 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
939 {
940 let reduce = |x, y| match compare(&x, &y) {
941 Ordering::Less | Ordering::Equal => x,
942 Ordering::Greater => y,
943 };
944 self.reduce(reduce)
945 }
946
947 /// Returns the element that gives the minimum value from the specified function.
948 /// If the iterator is empty, `Some(None)` is returned.
949 /// Early exits and returns None if any of the elements is None.
950 ///
951 /// # Examples
952 ///
953 /// ```
954 /// use orx_parallel::*;
955 ///
956 /// let a: Vec<Option<i32>> = vec![Some(-1), Some(2), Some(-3)];
957 /// assert_eq!(
958 /// a.par()
959 /// .copied()
960 /// .into_fallible_option()
961 /// .min_by_key(|x| x.abs()),
962 /// Some(Some(-1))
963 /// );
964 ///
965 /// let b: Vec<Option<i32>> = vec![];
966 /// assert_eq!(
967 /// b.par()
968 /// .copied()
969 /// .into_fallible_option()
970 /// .min_by_key(|x| x.abs()),
971 /// Some(None)
972 /// );
973 ///
974 /// let c: Vec<Option<i32>> = vec![Some(1), Some(2), None];
975 /// assert_eq!(
976 /// c.par()
977 /// .copied()
978 /// .into_fallible_option()
979 /// .min_by_key(|x| x.abs()),
980 /// None
981 /// );
982 /// ```
983 fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Option<Self::Item>>
984 where
985 Self: Sized,
986 Self::Item: Send,
987 Key: Ord,
988 GetKey: Fn(&Self::Item) -> Key + Sync,
989 {
990 let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
991 Ordering::Less | Ordering::Equal => x,
992 Ordering::Greater => y,
993 };
994 self.reduce(reduce)
995 }
996
997 /// Sums the elements of an iterator.
998 /// Early exits and returns None if any of the elements is None.
999 ///
1000 /// If the iterator is empty, returns zero; otherwise, returns `Some` of the sum.
1001 ///
1002 /// # Example
1003 ///
1004 /// ```
1005 /// use orx_parallel::*;
1006 ///
1007 /// // all succeeds
1008 /// let reduced: Option<u32> = (1..10)
1009 /// .par()
1010 /// .map(|x| 100u32.checked_div(x as u32))
1011 /// .into_fallible_option()
1012 /// .sum();
1013 /// assert_eq!(reduced, Some(281));
1014 ///
1015 /// // all succeeds - empty iterator
1016 /// let reduced: Option<u32> = (1..1)
1017 /// .par()
1018 /// .map(|x| 100u32.checked_div(x as u32))
1019 /// .into_fallible_option()
1020 /// .sum();
1021 /// assert_eq!(reduced, Some(0));
1022 ///
1023 /// // at least one fails
1024 /// let reduced: Option<u32> = (0..10)
1025 /// .par()
1026 /// .map(|x| 100u32.checked_div(x as u32))
1027 /// .into_fallible_option()
1028 /// .sum();
1029 /// assert_eq!(reduced, None);
1030 /// ```
1031 fn sum<Out>(self) -> Option<Out>
1032 where
1033 Self: Sized,
1034 Self::Item: Sum<Out>,
1035 Out: Send,
1036 {
1037 self.map(Self::Item::map)
1038 .reduce(Self::Item::reduce)
1039 .map(|x| x.unwrap_or(Self::Item::zero()))
1040 }
1041
1042 // early exit
1043
1044 /// Returns the first (or any) element of the iterator.
1045 /// If the iterator is empty, `Some(None)` is returned.
1046 /// Early exits and returns None if a None element is observed first.
1047 ///
1048 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1049 /// * any element is returned if `IterationOrder::Arbitrary` is set.
1050 ///
1051 /// Note that `find` itself is short-circuiting in addition to fallible computation.
1052 /// Therefore, in case the fallible iterator contains both a None and a Some element,
1053 /// the result is **not deterministic**:
1054 /// * it might be the `None` if it is observed first;
1055 /// * or `Some(element)` if the Some element is observed first.
1056 ///
1057 /// # Examples
1058 ///
1059 /// ```
1060 /// use orx_parallel::*;
1061 ///
1062 /// let a: Vec<Option<i32>> = vec![];
1063 /// assert_eq!(a.par().copied().into_fallible_option().first(), Some(None));
1064 ///
1065 /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
1066 /// assert_eq!(
1067 /// a.par().copied().into_fallible_option().first(),
1068 /// Some(Some(1))
1069 /// );
1070 ///
1071 /// let a: Vec<Option<i32>> = vec![Some(1), None, Some(3)];
1072 /// let result = a.par().copied().into_fallible_option().first();
1073 /// // depends on whichever is observed first in parallel execution
1074 /// assert!(result == Some(Some(1)) || result == None);
1075 /// ```
1076 fn first(self) -> Option<Option<Self::Item>>
1077 where
1078 Self::Item: Send;
1079
1080 /// Returns the first (or any) element of the iterator that satisfies the `predicate`.
1081 /// If the iterator is empty, `Some(None)` is returned.
1082 /// Early exits and returns None if a None element is observed first.
1083 ///
1084 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1085 /// * any element is returned if `IterationOrder::Arbitrary` is set.
1086 ///
1087 /// Note that `find` itself is short-circuiting in addition to fallible computation.
1088 /// Therefore, in case the fallible iterator contains both a None and a Some element,
1089 /// the result is **not deterministic**:
1090 /// * it might be the `None` if it is observed first;
1091 /// * or `Some(element)` if the Some element satisfying the predicate is observed first.
1092 ///
1093 /// # Examples
1094 ///
1095 /// ```
1096 /// use orx_parallel::*;
1097 ///
1098 /// let a: Vec<Option<i32>> = vec![];
1099 /// assert_eq!(
1100 /// a.par().copied().into_fallible_option().find(|x| *x > 2),
1101 /// Some(None)
1102 /// );
1103 ///
1104 /// let a: Vec<Option<i32>> = vec![Some(1), Some(2), Some(3)];
1105 /// assert_eq!(
1106 /// a.par().copied().into_fallible_option().find(|x| *x > 2),
1107 /// Some(Some(3))
1108 /// );
1109 ///
1110 /// let a: Vec<Option<i32>> = vec![Some(1), None, Some(3)];
1111 /// let result = a.par().copied().into_fallible_option().find(|x| *x > 2);
1112 /// // depends on whichever is observed first in parallel execution
1113 /// assert!(result == Some(Some(3)) || result == None);
1114 /// ```
1115 fn find<Predicate>(self, predicate: Predicate) -> Option<Option<Self::Item>>
1116 where
1117 Self: Sized,
1118 Self::Item: Send,
1119 Predicate: Fn(&Self::Item) -> bool + Sync,
1120 {
1121 self.filter(&predicate).first()
1122 }
1123}
1124
1125pub trait IntoOption<T> {
1126 fn into_option(self) -> Option<T>;
1127
1128 fn into_result_with_unit_err(self) -> Result<T, ()>;
1129}
1130
1131impl<T> IntoOption<T> for Option<T> {
1132 #[inline(always)]
1133 fn into_option(self) -> Option<T> {
1134 self
1135 }
1136
1137 #[inline(always)]
1138 fn into_result_with_unit_err(self) -> Result<T, ()> {
1139 match self {
1140 Some(x) => Ok(x),
1141 None => Err(()),
1142 }
1143 }
1144}
1145
1146pub(crate) trait ResultIntoOption<T> {
1147 fn into_option(self) -> Option<T>;
1148}
1149
1150impl<T> ResultIntoOption<T> for Result<T, ()> {
1151 #[inline(always)]
1152 fn into_option(self) -> Option<T> {
1153 self.ok()
1154 }
1155}