orx_parallel/
par_iter.rs

1use crate::{
2    Params,
3    collect_into::ParCollectInto,
4    computations::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
5    parameters::{ChunkSize, IterationOrder, NumThreads},
6    runner::{DefaultRunner, ParallelRunner},
7    special_type_sets::Sum,
8};
9use orx_concurrent_iter::ConcurrentIter;
10use std::cmp::Ordering;
11
12/// Parallel iterator.
13pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
14where
15    R: ParallelRunner,
16{
17    /// Element type of the parallel iterator.
18    type Item: Send + Sync;
19
20    /// Returns a reference to the input concurrent iterator.
21    fn con_iter(&self) -> &impl ConcurrentIter;
22
23    /// Parameters of the parallel iterator.
24    ///
25    /// # Examples
26    ///
27    /// ```
28    /// use orx_parallel::*;
29    /// use std::num::NonZero;
30    ///
31    /// let vec = vec![1, 2, 3, 4];
32    ///
33    /// assert_eq!(
34    ///     vec.par().params(),
35    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
36    /// );
37    ///
38    /// assert_eq!(
39    ///     vec.par().num_threads(0).chunk_size(0).params(),
40    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
41    /// );
42    ///
43    /// assert_eq!(
44    ///     vec.par().num_threads(1).params(),
45    ///     &Params::new(
46    ///         NumThreads::Max(NonZero::new(1).unwrap()),
47    ///         ChunkSize::Auto,
48    ///         IterationOrder::Ordered
49    ///     )
50    /// );
51    ///
52    /// assert_eq!(
53    ///     vec.par().num_threads(4).chunk_size(64).params(),
54    ///     &Params::new(
55    ///         NumThreads::Max(NonZero::new(4).unwrap()),
56    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
57    ///         IterationOrder::Ordered
58    ///     )
59    /// );
60    ///
61    /// assert_eq!(
62    ///     vec.par()
63    ///         .num_threads(8)
64    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
65    ///         .iteration_order(IterationOrder::Arbitrary)
66    ///         .params(),
67    ///     &Params::new(
68    ///         NumThreads::Max(NonZero::new(8).unwrap()),
69    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
70    ///         IterationOrder::Arbitrary
71    ///     )
72    /// );
73    /// ```
74    fn params(&self) -> &Params;
75
76    // params transformations
77
78    /// Sets the number of threads to be used in the parallel execution.
79    /// Integers can be used as the argument with the following mapping:
80    ///
81    /// * `0` -> `NumThreads::Auto`
82    /// * `1` -> `NumThreads::sequential()`
83    /// * `n > 0` -> `NumThreads::Max(n)`
84    ///
85    /// See [`NumThreads`] for details.
86    ///
87    /// # Examples
88    ///
89    /// ```
90    /// use orx_parallel::*;
91    /// use std::num::NonZero;
92    ///
93    /// let vec = vec![1, 2, 3, 4];
94    ///
95    /// // all available threads can be used
96    ///
97    /// assert_eq!(
98    ///     vec.par().params(),
99    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
100    /// );
101    ///
102    /// assert_eq!(
103    ///     vec.par().num_threads(0).chunk_size(0).params(),
104    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
105    /// );
106    ///
107    /// // computation will be executed sequentially on the main thread, no parallelization
108    ///
109    /// assert_eq!(
110    ///     vec.par().num_threads(1).params(),
111    ///     &Params::new(
112    ///         NumThreads::Max(NonZero::new(1).unwrap()),
113    ///         ChunkSize::Auto,
114    ///         IterationOrder::Ordered
115    ///     )
116    /// );
117    ///
118    /// // maximum 4 threads can be used
119    /// assert_eq!(
120    ///     vec.par().num_threads(4).chunk_size(64).params(),
121    ///     &Params::new(
122    ///         NumThreads::Max(NonZero::new(4).unwrap()),
123    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
124    ///         IterationOrder::Ordered
125    ///     )
126    /// );
127    ///
128    /// // maximum 8 threads can be used
129    /// assert_eq!(
130    ///     vec.par()
131    ///         .num_threads(8)
132    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
133    ///         .iteration_order(IterationOrder::Arbitrary)
134    ///         .params(),
135    ///     &Params::new(
136    ///         NumThreads::Max(NonZero::new(8).unwrap()),
137    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
138    ///         IterationOrder::Arbitrary
139    ///     )
140    /// );
141    /// ```
142    fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
143
144    /// Sets the number of elements to be pulled from the concurrent iterator during the
145    /// parallel execution. When integers are used as argument, the following mapping applies:
146    ///
147    /// * `0` -> `ChunkSize::Auto`
148    /// * `n > 0` -> `ChunkSize::Exact(n)`
149    ///
150    /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
151    ///
152    /// See [`ChunkSize`] for details.
153    ///
154    /// # Examples
155    ///
156    /// ```
157    /// use orx_parallel::*;
158    /// use std::num::NonZero;
159    ///
160    /// let vec = vec![1, 2, 3, 4];
161    ///
162    /// // chunk sizes will be dynamically decided by the parallel runner
163    ///
164    /// assert_eq!(
165    ///     vec.par().params(),
166    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
167    /// );
168    ///
169    /// assert_eq!(
170    ///     vec.par().num_threads(0).chunk_size(0).params(),
171    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
172    /// );
173    ///
174    /// assert_eq!(
175    ///     vec.par().num_threads(1).params(),
176    ///     &Params::new(
177    ///         NumThreads::Max(NonZero::new(1).unwrap()),
178    ///         ChunkSize::Auto,
179    ///         IterationOrder::Ordered
180    ///     )
181    /// );
182    ///
183    /// // chunk size will always be 64, parallel runner cannot change
184    ///
185    /// assert_eq!(
186    ///     vec.par().num_threads(4).chunk_size(64).params(),
187    ///     &Params::new(
188    ///         NumThreads::Max(NonZero::new(4).unwrap()),
189    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
190    ///         IterationOrder::Ordered
191    ///     )
192    /// );
193    ///
194    /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
195    ///
196    /// assert_eq!(
197    ///     vec.par()
198    ///         .num_threads(8)
199    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
200    ///         .iteration_order(IterationOrder::Arbitrary)
201    ///         .params(),
202    ///     &Params::new(
203    ///         NumThreads::Max(NonZero::new(8).unwrap()),
204    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
205    ///         IterationOrder::Arbitrary
206    ///     )
207    /// );
208    /// ```
209    fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
210
211    /// Sets the iteration order of the parallel computation.
212    ///
213    /// # Examples
214    ///
215    /// ```
216    /// use orx_parallel::*;
217    ///
218    /// let vec = vec![1, 2, 3, 4];
219    ///
220    /// // results are collected in order consistent to the input order,
221    /// // or find returns the first element satisfying the predicate
222    ///
223    /// assert_eq!(
224    ///     vec.par().params(),
225    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
226    /// );
227    ///
228    /// assert_eq!(
229    ///     vec.par().iteration_order(IterationOrder::Ordered).params(),
230    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
231    /// );
232    ///
233    /// // results might be collected in arbitrary order
234    /// // or find returns the any of the elements satisfying the predicate
235    ///
236    /// assert_eq!(
237    ///     vec.par().iteration_order(IterationOrder::Arbitrary).params(),
238    ///     &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
239    /// );
240    /// ```
241    fn iteration_order(self, collect: IterationOrder) -> Self;
242
243    /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
244    ///
245    /// # Examples
246    ///
247    /// ```ignore
248    /// use orx_parallel::*;
249    ///
250    /// let inputs = vec![1, 2, 3, 4];
251    ///
252    /// // uses the default runner
253    /// let sum = inputs.par().sum();
254    ///
255    /// // uses the custom parallel runner MyParallelRunner: ParallelRunner
256    /// let sum = inputs.par().with_runner::<MyParallelRunner>().sum();
257    /// ```
258    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item>;
259
260    // computation transformations
261
262    /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
263    ///
264    /// # Examples
265    ///
266    /// ```
267    /// use orx_parallel::*;
268    ///
269    /// let a = [1, 2, 3];
270    ///
271    /// let iter = a.into_par().map(|x| 2 * x);
272    ///
273    /// let b: Vec<_> = iter.collect();
274    /// assert_eq!(b, &[2, 4, 6]);
275    /// ```
276    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
277    where
278        Out: Send + Sync,
279        Map: Fn(Self::Item) -> Out + Send + Sync + Clone;
280
281    /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
282    ///
283    /// # Examples
284    ///
285    /// ```
286    /// use orx_parallel::*;
287    ///
288    /// let a = [1, 2, 3];
289    ///
290    /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
291    ///
292    /// let b: Vec<_> = iter.collect();
293    /// assert_eq!(b, &[1, 3]);
294    /// ```
295    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
296    where
297        Filter: Fn(&Self::Item) -> bool + Send + Sync + Clone;
298
299    /// Creates an iterator that works like map, but flattens nested structure.
300    ///
301    /// # Examples
302    ///
303    /// ```
304    /// use orx_parallel::*;
305    ///
306    /// let words = ["alpha", "beta", "gamma"];
307    ///
308    /// // chars() returns an iterator
309    /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
310    ///
311    /// let merged: String = all_chars.iter().collect();
312    /// assert_eq!(merged, "alphabetagamma");
313    /// ```
314    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
315    where
316        IOut: IntoIterator + Send + Sync,
317        IOut::IntoIter: Send + Sync,
318        IOut::Item: Send + Sync,
319        FlatMap: Fn(Self::Item) -> IOut + Send + Sync + Clone;
320
321    /// Creates an iterator that both filters and maps.
322    ///
323    /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
324    ///
325    /// `filter_map` can be used to make chains of `filter` and `map` more concise.
326    /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
327    ///
328    /// # Examples
329    ///
330    /// ```
331    /// use orx_parallel::*;
332    ///
333    /// let a = ["1", "two", "NaN", "four", "5"];
334    ///
335    /// let numbers: Vec<_> = a
336    ///     .into_par()
337    ///     .filter_map(|s| s.parse::<usize>().ok())
338    ///     .collect();
339    ///
340    /// assert_eq!(numbers, [1, 5]);
341    /// ```
342    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
343    where
344        Out: Send + Sync,
345        FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone;
346
347    /// Does something with each element of an iterator, passing the value on.
348    ///
349    /// When using iterators, you’ll often chain several of them together.
350    /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
351    /// To do that, insert a call to `inspect()`.
352    ///
353    /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
354    /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
355    ///
356    /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
357    /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
358    /// collect some intermediate values during parallel execution for further inspection.
359    /// The following example demonstrates such a use case.
360    ///
361    /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
362    ///
363    /// # Examples
364    ///
365    /// ```
366    /// use orx_parallel::*;
367    /// use orx_concurrent_bag::*;
368    ///
369    /// let a = vec![1, 4, 2, 3];
370    ///
371    /// // let's add some inspect() calls to investigate what's happening
372    /// // - log some events
373    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
374    /// let bag = ConcurrentBag::new();
375    ///
376    /// let sum = a
377    ///     .par()
378    ///     .copied()
379    ///     .inspect(|x| println!("about to filter: {x}"))
380    ///     .filter(|x| x % 2 == 0)
381    ///     .inspect(|x| {
382    ///         bag.push(*x);
383    ///         println!("made it through filter: {x}");
384    ///     })
385    ///     .sum();
386    /// println!("{sum}");
387    ///
388    /// let mut values_made_through = bag.into_inner();
389    /// values_made_through.sort();
390    /// assert_eq!(values_made_through, [2, 4]);
391    /// ```
392    ///
393    /// This will print:
394    ///
395    /// ```console
396    /// about to filter: 1
397    /// about to filter: 4
398    /// made it through filter: 4
399    /// about to filter: 2
400    /// made it through filter: 2
401    /// about to filter: 3
402    /// 6
403    /// ```
404    fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
405    where
406        Operation: Fn(&Self::Item) + Sync + Send + Clone,
407    {
408        let map = move |x| {
409            operation(&x);
410            x
411        };
412        self.map(map)
413    }
414
415    // special item transformations
416
417    /// Creates an iterator which copies all of its elements.
418    ///
419    /// # Examples
420    ///
421    /// ```
422    /// use orx_parallel::*;
423    ///
424    /// let a = vec![1, 2, 3];
425    ///
426    /// let v_copied: Vec<_> = a.par().copied().collect();
427    ///
428    /// // copied is the same as .map(|&x| x)
429    /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
430    ///
431    /// assert_eq!(v_copied, vec![1, 2, 3]);
432    /// assert_eq!(v_map, vec![1, 2, 3]);
433    /// ```
434    fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
435    where
436        T: 'a + Copy + Send + Sync,
437        Self: ParIter<R, Item = &'a T>,
438    {
439        self.map(map_copy)
440    }
441
442    /// Creates an iterator which clones all of its elements.
443    ///
444    /// # Examples
445    ///
446    /// ```
447    /// use orx_parallel::*;
448    ///
449    /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
450    ///
451    /// let v_cloned: Vec<_> = a.par().cloned().collect();
452    ///
453    /// // cloned is the same as .map(|x| x.clone())
454    /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
455    ///
456    /// assert_eq!(
457    ///     v_cloned,
458    ///     vec![String::from("1"), String::from("2"), String::from("3")]
459    /// );
460    /// assert_eq!(
461    ///     v_map,
462    ///     vec![String::from("1"), String::from("2"), String::from("3")]
463    /// );
464    /// ```
465    fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
466    where
467        T: 'a + Clone + Send + Sync,
468        Self: ParIter<R, Item = &'a T>,
469    {
470        self.map(map_clone)
471    }
472
473    /// Creates an iterator that flattens nested structure.
474    ///
475    /// This is useful when you have an iterator of iterators or an iterator of things that can be
476    /// turned into iterators and you want to remove one level of indirection.
477    ///
478    /// # Examples
479    ///
480    /// Basic usage.
481    ///
482    /// ```
483    /// use orx_parallel::*;
484    ///
485    /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
486    /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
487    /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
488    /// ```
489    ///
490    /// Mapping and then flattening:
491    ///
492    /// ```
493    /// use orx_parallel::*;
494    ///
495    /// let words = vec!["alpha", "beta", "gamma"];
496    ///
497    /// // chars() returns an iterator
498    /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
499    /// let merged: String = all_characters.into_iter().collect();
500    /// assert_eq!(merged, "alphabetagamma");
501    /// ```
502    ///
503    /// But actually, you can write this in terms of `flat_map`,
504    /// which is preferable in this case since it conveys intent more clearly:
505    ///
506    /// ```
507    /// use orx_parallel::*;
508    ///
509    /// let words = vec!["alpha", "beta", "gamma"];
510    ///
511    /// // chars() returns an iterator
512    /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
513    /// let merged: String = all_characters.into_iter().collect();
514    /// assert_eq!(merged, "alphabetagamma");
515    /// ```
516    fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
517    where
518        Self::Item: IntoIterator,
519        <Self::Item as IntoIterator>::IntoIter: Send + Sync,
520        <Self::Item as IntoIterator>::Item: Send + Sync,
521        R: Send + Sync,
522        Self: Send + Sync,
523    {
524        let map = |e: Self::Item| e.into_iter();
525        self.flat_map(map)
526    }
527
528    // collect
529
530    /// Collects all the items from an iterator into a collection.
531    ///
532    /// This is useful when you already have a collection and want to add the iterator items to it.
533    ///
534    /// The collection is passed in as owned value, and returned back with the additional elements.
535    ///
536    /// All collections implementing [`ParCollectInto`] can be used to collect into.
537    ///
538    /// [`ParCollectInto`]: crate::ParCollectInto
539    ///
540    /// # Examples
541    ///
542    /// ```
543    /// use orx_parallel::*;
544    ///
545    /// let a = vec![1, 2, 3];
546    ///
547    /// let vec: Vec<i32> = vec![0, 1];
548    /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
549    /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
550    ///
551    /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
552    /// ```
553    fn collect_into<C>(self, output: C) -> C
554    where
555        C: ParCollectInto<Self::Item>;
556
557    /// Transforms an iterator into a collection.
558    ///
559    /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
560    /// the type of the result collection; or turbofish annotation can be used.
561    ///
562    /// All collections implementing [`ParCollectInto`] can be used to collect into.
563    ///
564    /// [`ParCollectInto`]: crate::ParCollectInto
565    ///
566    /// # Examples
567    ///
568    /// ```
569    /// use orx_parallel::*;
570    ///
571    /// let a = vec![1, 2, 3];
572    ///
573    /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
574    ///
575    /// assert_eq!(vec![2, 4, 6], doubled);
576    /// ```
577    fn collect<C>(self) -> C
578    where
579        C: ParCollectInto<Self::Item>,
580    {
581        let output = C::empty(self.con_iter().try_get_len());
582        self.collect_into(output)
583    }
584
585    // reduce
586
587    /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
588    ///
589    /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
590    ///
591    /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
592    ///
593    /// # Example
594    ///
595    /// ```
596    /// use orx_parallel::*;
597    ///
598    /// let inputs = 1..10;
599    /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
600    /// assert_eq!(reduced, 45);
601    /// ```
602    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
603    where
604        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync;
605
606    /// Tests if every element of the iterator matches a predicate.
607    ///
608    /// `all` takes a `predicate` that returns true or false.
609    /// It applies this closure to each element of the iterator,
610    /// and if they all return true, then so does `all`.
611    /// If any of them returns false, it returns false.
612    ///
613    /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
614    /// given that no matter what else happens, the result will also be false.
615    ///
616    /// An empty iterator returns true.
617    ///
618    /// # Examples
619    ///
620    /// ```
621    /// use orx_parallel::*;
622    ///
623    /// let mut a = vec![1, 2, 3];
624    /// assert!(a.par().all(|x| **x > 0));
625    /// assert!(!a.par().all(|x| **x > 2));
626    ///
627    /// a.clear();
628    /// assert!(a.par().all(|x| **x > 2)); // empty iterator
629    /// ```
630    fn all<Predicate>(self, predicate: Predicate) -> bool
631    where
632        Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
633    {
634        let violates = |x: &Self::Item| !predicate(x);
635        self.find(violates).is_none()
636    }
637
638    /// Tests if any element of the iterator matches a predicate.
639    ///
640    /// `any` takes a `predicate` that returns true or false.
641    /// It applies this closure to each element of the iterator,
642    /// and if any of the elements returns true, then so does `any`.
643    /// If all of them return false, it returns false.
644    ///
645    /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
646    /// given that no matter what else happens, the result will also be true.
647    ///
648    /// An empty iterator returns false.
649    ///
650    /// # Examples
651    ///
652    /// ```
653    /// use orx_parallel::*;
654    ///
655    /// let mut a = vec![1, 2, 3];
656    /// assert!(a.par().any(|x| **x > 0));
657    /// assert!(!a.par().any(|x| **x > 5));
658    ///
659    /// a.clear();
660    /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
661    /// ```
662    fn any<Predicate>(self, predicate: Predicate) -> bool
663    where
664        Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
665    {
666        self.find(predicate).is_some()
667    }
668
669    /// Consumes the iterator, counting the number of iterations and returning it.
670    ///
671    /// # Examples
672    ///
673    /// ```
674    /// use orx_parallel::*;
675    ///
676    /// let a = vec![1, 2, 3];
677    /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
678    /// ```
679    fn count(self) -> usize {
680        self.map(map_count).reduce(reduce_sum).unwrap_or(0)
681    }
682
683    /// Calls a closure on each element of an iterator.
684    ///
685    /// # Examples
686    ///
687    /// Basic usage:
688    ///
689    /// ```
690    /// use orx_parallel::*;
691    /// use std::sync::mpsc::channel;
692    ///
693    /// let (tx, rx) = channel();
694    /// (0..5)
695    ///     .par()
696    ///     .map(|x| x * 2 + 1)
697    ///     .for_each(move |x| tx.send(x).unwrap());
698    ///
699    /// let mut v: Vec<_> = rx.iter().collect();
700    /// v.sort(); // order can be mixed, since messages will be sent in parallel
701    /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
702    /// ```
703    ///
704    /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
705    /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
706    /// In the following example, we log every element that satisfies a predicate in parallel.
707    ///
708    /// ```
709    /// use orx_parallel::*;
710    ///
711    /// (0..5)
712    ///     .par()
713    ///     .flat_map(|x| x * 100..x * 110)
714    ///     .filter(|&x| x % 3 == 0)
715    ///     .for_each(|x| println!("{x}"));
716    /// ```
717    fn for_each<Operation>(self, operation: Operation)
718    where
719        Operation: Fn(Self::Item) + Sync + Send,
720    {
721        let map = |x| operation(x);
722        let _ = self.map(map).reduce(reduce_unit);
723    }
724
725    /// Returns the maximum element of an iterator.
726    ///
727    /// If the iterator is empty, None is returned.
728    ///
729    /// # Examples
730    ///
731    /// ```
732    /// use orx_parallel::*;
733    ///
734    /// let a = vec![1, 2, 3];
735    /// let b: Vec<u32> = Vec::new();
736    ///
737    /// assert_eq!(a.par().max(), Some(&3));
738    /// assert_eq!(b.par().max(), None);
739    /// ```
740    fn max(self) -> Option<Self::Item>
741    where
742        Self::Item: Ord,
743    {
744        self.reduce(Ord::max)
745    }
746
747    /// Returns the element that gives the maximum value with respect to the specified `compare` function.
748    ///
749    /// If the iterator is empty, None is returned.
750    ///
751    /// # Examples
752    ///
753    /// ```
754    /// use orx_parallel::*;
755    ///
756    /// let a = vec![-3_i32, 0, 1, 5, -10];
757    /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
758    /// ```
759    fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
760    where
761        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
762    {
763        let reduce = |x, y| match compare(&x, &y) {
764            Ordering::Greater | Ordering::Equal => x,
765            Ordering::Less => y,
766        };
767        self.reduce(reduce)
768    }
769
770    /// Returns the element that gives the maximum value from the specified function.
771    ///
772    /// If the iterator is empty, None is returned.
773    ///
774    /// # Examples
775    ///
776    /// ```
777    /// use orx_parallel::*;
778    ///
779    /// let a = vec![-3_i32, 0, 1, 5, -10];
780    /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
781    /// ```
782    fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
783    where
784        Key: Ord,
785        GetKey: Fn(&Self::Item) -> Key + Sync,
786    {
787        let reduce = |x, y| match key(&x).cmp(&key(&y)) {
788            Ordering::Greater | Ordering::Equal => x,
789            Ordering::Less => y,
790        };
791        self.reduce(reduce)
792    }
793
794    /// Returns the minimum element of an iterator.
795    ///
796    /// If the iterator is empty, None is returned.
797    ///
798    /// # Examples
799    ///
800    /// ```
801    /// use orx_parallel::*;
802    ///
803    /// let a = vec![1, 2, 3];
804    /// let b: Vec<u32> = Vec::new();
805    ///
806    /// assert_eq!(a.par().min(), Some(&1));
807    /// assert_eq!(b.par().min(), None);
808    /// ```
809    fn min(self) -> Option<Self::Item>
810    where
811        Self::Item: Ord,
812    {
813        self.reduce(Ord::min)
814    }
815
816    /// Returns the element that gives the minimum value with respect to the specified `compare` function.
817    ///
818    /// If the iterator is empty, None is returned.
819    ///
820    /// # Examples
821    ///
822    /// ```
823    /// use orx_parallel::*;
824    ///
825    /// let a = vec![-3_i32, 0, 1, 5, -10];
826    /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
827    /// ```
828    fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
829    where
830        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
831    {
832        let reduce = |x, y| match compare(&x, &y) {
833            Ordering::Less | Ordering::Equal => x,
834            Ordering::Greater => y,
835        };
836        self.reduce(reduce)
837    }
838
839    /// Returns the element that gives the minimum value from the specified function.
840    ///
841    /// If the iterator is empty, None is returned.
842    ///
843    /// # Examples
844    ///
845    /// ```
846    /// use orx_parallel::*;
847    ///
848    /// let a = vec![-3_i32, 0, 1, 5, -10];
849    /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
850    /// ```
851    fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
852    where
853        Key: Ord,
854        GetKey: Fn(&Self::Item) -> Key + Sync,
855    {
856        let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
857            Ordering::Less | Ordering::Equal => x,
858            Ordering::Greater => y,
859        };
860        self.reduce(reduce)
861    }
862
863    /// Sums the elements of an iterator.
864    ///
865    /// Takes each element, adds them together, and returns the result.
866    ///
867    /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
868    ///
869    /// `sum` can be used to sum any type implementing [`Sum<Out>`].
870    ///
871    /// [`Sum<Out>`]: crate::Sum
872    ///
873    /// # Examples
874    ///
875    /// ```
876    /// use orx_parallel::*;
877    ///
878    /// let a = vec![1, 2, 3];
879    /// let sum: i32 = a.par().sum();
880    ///
881    /// assert_eq!(sum, 6);
882    /// ```
883    fn sum<Out>(self) -> Out
884    where
885        Self::Item: Sum<Out>,
886        Out: Send + Sync,
887    {
888        self.map(Self::Item::map)
889            .reduce(Self::Item::reduce)
890            .unwrap_or(Self::Item::zero())
891    }
892
893    // early exit
894
895    /// Returns the first (or any) element of the iterator; returns None if it is empty.
896    ///
897    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
898    /// * any element is returned if `IterationOrder::Arbitrary` is set.
899    ///
900    /// # Examples
901    ///
902    /// The following example demonstrates the usage of first with default `Ordered` iteration.
903    /// This guarantees that the first element with respect to position in the input sequence
904    /// is returned.
905    ///
906    /// ```
907    /// use orx_parallel::*;
908    ///
909    /// let a: Vec<usize> = vec![];
910    /// assert_eq!(a.par().copied().first(), None);
911    ///
912    /// let a = vec![1, 2, 3];
913    /// assert_eq!(a.par().copied().first(), Some(1));
914    ///
915    /// let a = 1..10_000;
916    /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
917    /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
918    ///
919    /// // or equivalently,
920    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
921    /// ```
922    ///
923    /// When the order is set to `Arbitrary`, `first` might return any of the elements,
924    /// whichever is visited first depending on the parallel execution.
925    ///
926    /// ```
927    /// use orx_parallel::*;
928    ///
929    /// let a = 1..10_000;
930    ///
931    /// // might return either of 3421 or 2*3421
932    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
933    /// assert!([3421, 2 * 3421].contains(&any));
934    ///
935    /// // or equivalently,
936    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
937    /// assert!([3421, 2 * 3421].contains(&any));
938    fn first(self) -> Option<Self::Item>;
939
940    /// Searches for an element of an iterator that satisfies a `predicate`.
941    ///
942    /// Depending on the set iteration order of the parallel iterator, returns
943    ///
944    /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
945    /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
946    ///
947    /// `find` takes a closure that returns true or false.
948    /// It applies this closure to each element of the iterator,
949    /// and returns `Some(x)` where `x` is the first element that returns true.
950    /// If they all return false, it returns None.
951    ///
952    /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
953    ///
954    /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
955    ///
956    /// # Examples
957    ///
958    /// The following example demonstrates the usage of first with default `Ordered` iteration.
959    /// This guarantees that the first element with respect to position in the input sequence
960    /// is returned.
961    ///
962    /// ```
963    /// use orx_parallel::*;
964    ///
965    /// let a = 1..10_000;
966    /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
967    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
968    /// ```
969    ///
970    /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
971    /// whichever is found first depending on the parallel execution.
972    ///
973    /// ```
974    /// use orx_parallel::*;
975    ///
976    /// let a = 1..10_000;
977    ///
978    /// // might return either of 3421 or 2*3421
979    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
980    /// assert!([3421, 2 * 3421].contains(&any));
981    /// ```
982    fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
983    where
984        Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
985    {
986        self.filter(predicate).first()
987    }
988}