orx_parallel/
par_iter.rs

1use crate::computational_variants::fallible_option::ParOption;
2use crate::par_iter_option::{IntoOption, ParIterOption};
3use crate::par_iter_result::IntoResult;
4use crate::runner::{DefaultRunner, ParallelRunner};
5use crate::using::{UsingClone, UsingFun};
6use crate::{ParIterResult, ParThreadPool, RunnerWithPool};
7use crate::{
8    ParIterUsing, Params,
9    collect_into::ParCollectInto,
10    default_fns::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
11    parameters::{ChunkSize, IterationOrder, NumThreads},
12    special_type_sets::Sum,
13};
14use core::cmp::Ordering;
15use orx_concurrent_iter::ConcurrentIter;
16
17/// Parallel iterator.
18pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
19where
20    R: ParallelRunner,
21{
22    /// Element type of the parallel iterator.
23    type Item;
24
25    /// Returns a reference to the input concurrent iterator.
26    fn con_iter(&self) -> &impl ConcurrentIter;
27
28    /// Parameters of the parallel iterator.
29    ///
30    /// # Examples
31    ///
32    /// ```
33    /// use orx_parallel::*;
34    /// use std::num::NonZero;
35    ///
36    /// let vec = vec![1, 2, 3, 4];
37    ///
38    /// assert_eq!(
39    ///     vec.par().params(),
40    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
41    /// );
42    ///
43    /// assert_eq!(
44    ///     vec.par().num_threads(0).chunk_size(0).params(),
45    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
46    /// );
47    ///
48    /// assert_eq!(
49    ///     vec.par().num_threads(1).params(),
50    ///     Params::new(
51    ///         NumThreads::Max(NonZero::new(1).unwrap()),
52    ///         ChunkSize::Auto,
53    ///         IterationOrder::Ordered
54    ///     )
55    /// );
56    ///
57    /// assert_eq!(
58    ///     vec.par().num_threads(4).chunk_size(64).params(),
59    ///     Params::new(
60    ///         NumThreads::Max(NonZero::new(4).unwrap()),
61    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
62    ///         IterationOrder::Ordered
63    ///     )
64    /// );
65    ///
66    /// assert_eq!(
67    ///     vec.par()
68    ///         .num_threads(8)
69    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
70    ///         .iteration_order(IterationOrder::Arbitrary)
71    ///         .params(),
72    ///     Params::new(
73    ///         NumThreads::Max(NonZero::new(8).unwrap()),
74    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
75    ///         IterationOrder::Arbitrary
76    ///     )
77    /// );
78    /// ```
79    fn params(&self) -> Params;
80
81    // params transformations
82
83    /// Sets the number of threads to be used in the parallel execution.
84    /// Integers can be used as the argument with the following mapping:
85    ///
86    /// * `0` -> `NumThreads::Auto`
87    /// * `1` -> `NumThreads::sequential()`
88    /// * `n > 0` -> `NumThreads::Max(n)`
89    ///
90    /// See [`NumThreads`] for details.
91    ///
92    /// # Examples
93    ///
94    /// ```
95    /// use orx_parallel::*;
96    /// use std::num::NonZero;
97    ///
98    /// let vec = vec![1, 2, 3, 4];
99    ///
100    /// // all available threads can be used
101    ///
102    /// assert_eq!(
103    ///     vec.par().params(),
104    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
105    /// );
106    ///
107    /// assert_eq!(
108    ///     vec.par().num_threads(0).chunk_size(0).params(),
109    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
110    /// );
111    ///
112    /// // computation will be executed sequentially on the main thread, no parallelization
113    ///
114    /// assert_eq!(
115    ///     vec.par().num_threads(1).params(),
116    ///     Params::new(
117    ///         NumThreads::Max(NonZero::new(1).unwrap()),
118    ///         ChunkSize::Auto,
119    ///         IterationOrder::Ordered
120    ///     )
121    /// );
122    ///
123    /// // maximum 4 threads can be used
124    /// assert_eq!(
125    ///     vec.par().num_threads(4).chunk_size(64).params(),
126    ///     Params::new(
127    ///         NumThreads::Max(NonZero::new(4).unwrap()),
128    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
129    ///         IterationOrder::Ordered
130    ///     )
131    /// );
132    ///
133    /// // maximum 8 threads can be used
134    /// assert_eq!(
135    ///     vec.par()
136    ///         .num_threads(8)
137    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
138    ///         .iteration_order(IterationOrder::Arbitrary)
139    ///         .params(),
140    ///     Params::new(
141    ///         NumThreads::Max(NonZero::new(8).unwrap()),
142    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
143    ///         IterationOrder::Arbitrary
144    ///     )
145    /// );
146    /// ```
147    fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
148
149    /// Sets the number of elements to be pulled from the concurrent iterator during the
150    /// parallel execution. When integers are used as argument, the following mapping applies:
151    ///
152    /// * `0` -> `ChunkSize::Auto`
153    /// * `n > 0` -> `ChunkSize::Exact(n)`
154    ///
155    /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
156    ///
157    /// See [`ChunkSize`] for details.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// use orx_parallel::*;
163    /// use std::num::NonZero;
164    ///
165    /// let vec = vec![1, 2, 3, 4];
166    ///
167    /// // chunk sizes will be dynamically decided by the parallel runner
168    ///
169    /// assert_eq!(
170    ///     vec.par().params(),
171    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
172    /// );
173    ///
174    /// assert_eq!(
175    ///     vec.par().num_threads(0).chunk_size(0).params(),
176    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
177    /// );
178    ///
179    /// assert_eq!(
180    ///     vec.par().num_threads(1).params(),
181    ///     Params::new(
182    ///         NumThreads::Max(NonZero::new(1).unwrap()),
183    ///         ChunkSize::Auto,
184    ///         IterationOrder::Ordered
185    ///     )
186    /// );
187    ///
188    /// // chunk size will always be 64, parallel runner cannot change
189    ///
190    /// assert_eq!(
191    ///     vec.par().num_threads(4).chunk_size(64).params(),
192    ///     Params::new(
193    ///         NumThreads::Max(NonZero::new(4).unwrap()),
194    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
195    ///         IterationOrder::Ordered
196    ///     )
197    /// );
198    ///
199    /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
200    ///
201    /// assert_eq!(
202    ///     vec.par()
203    ///         .num_threads(8)
204    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
205    ///         .iteration_order(IterationOrder::Arbitrary)
206    ///         .params(),
207    ///     Params::new(
208    ///         NumThreads::Max(NonZero::new(8).unwrap()),
209    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
210    ///         IterationOrder::Arbitrary
211    ///     )
212    /// );
213    /// ```
214    fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
215
216    /// Sets the iteration order of the parallel computation.
217    ///
218    /// See [`IterationOrder`] for details.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use orx_parallel::*;
224    ///
225    /// let vec = vec![1, 2, 3, 4];
226    ///
227    /// // results are collected in order consistent to the input order,
228    /// // or find returns the first element satisfying the predicate
229    ///
230    /// assert_eq!(
231    ///     vec.par().params(),
232    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
233    /// );
234    ///
235    /// assert_eq!(
236    ///     vec.par().iteration_order(IterationOrder::Ordered).params(),
237    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
238    /// );
239    ///
240    /// // results might be collected in arbitrary order
241    /// // or find returns the any of the elements satisfying the predicate
242    ///
243    /// assert_eq!(
244    ///     vec.par().iteration_order(IterationOrder::Arbitrary).params(),
245    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
246    /// );
247    /// ```
248    fn iteration_order(self, order: IterationOrder) -> Self;
249
250    /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
251    ///
252    /// See also [`with_pool`].
253    ///
254    /// Parallel runner of each computation can be independently specified using `with_runner`.
255    ///
256    /// When not specified the default runner is used, which is:
257    /// * [`RunnerWithPool`] with [`StdDefaultPool`] when "std" feature is enabled,
258    /// * [`RunnerWithPool`] with [`SequentialPool`] when "std" feature is disabled.
259    ///
260    /// Note that [`StdDefaultPool`] uses standard native threads.
261    ///
262    /// When working in a no-std environment, the default runner falls back to sequential.
263    /// In this case, `RunnerWithPool` using a particular thread pool must be passed in using `with_runner`
264    /// transformation to achieve parallel computation.
265    ///
266    /// [`RunnerWithPool`]: crate::RunnerWithPool
267    /// [`StdDefaultPool`]: crate::StdDefaultPool
268    /// [`SequentialPool`]: crate::SequentialPool
269    /// [`with_pool`]: crate::ParIter::with_pool
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// use orx_parallel::*;
275    ///
276    /// let inputs: Vec<_> = (0..42).collect();
277    ///
278    /// // uses the DefaultRunner
279    /// // assuming "std" enabled, RunnerWithPool<StdDefaultPool> will be used; i.e., native threads
280    /// let sum = inputs.par().sum();
281    ///
282    /// // equivalent to:
283    /// #[cfg(feature = "std")]
284    /// {
285    ///     let sum2 = inputs.par().with_runner(RunnerWithPool::from(StdDefaultPool::default())).sum();
286    ///     assert_eq!(sum, sum2);
287    /// }
288    ///
289    /// #[cfg(not(miri))]
290    /// #[cfg(feature = "scoped_threadpool")]
291    /// {
292    ///     let mut pool = scoped_threadpool::Pool::new(8);
293    ///
294    ///     // uses the scoped_threadpool::Pool created with 8 threads
295    ///     let sum2 = inputs.par().with_runner(RunnerWithPool::from(&mut pool)).sum();
296    ///     assert_eq!(sum, sum2);
297    /// }
298    ///
299    /// #[cfg(not(miri))]
300    /// #[cfg(feature = "rayon-core")]
301    /// {
302    ///     let pool = rayon_core::ThreadPoolBuilder::new()
303    ///         .num_threads(8)
304    ///         .build()
305    ///         .unwrap();
306    ///
307    ///     // uses the rayon-core::ThreadPool created with 8 threads
308    ///     let sum2 = inputs.par().with_runner(RunnerWithPool::from(&pool)).sum();
309    ///     assert_eq!(sum, sum2);
310    /// }
311    /// ```
312    fn with_runner<Q: ParallelRunner>(self, runner: Q) -> impl ParIter<Q, Item = Self::Item>;
313
314    /// Rather than [`DefaultPool`], uses the parallel runner with the given `pool` implementing
315    /// [`ParThreadPool`].
316    ///
317    /// See also [`with_runner`].
318    ///
319    /// Thread pool of each computation can be independently specified using `with_pool`.
320    ///
321    /// When not specified the default pool is used, which is:
322    /// * [`StdDefaultPool`] when "std" feature is enabled,
323    /// * [`SequentialPool`] when "std" feature is disabled.
324    ///
325    /// Note that [`StdDefaultPool`] uses standard native threads.
326    ///
327    /// When working in a no-std environment, the default pool falls back to sequential.
328    /// In this case, a thread pool must be passed in using `with_pool` transformation to achieve parallel computation.
329    ///
330    /// Note that if a thread pool, say `pool`, is of a type that implements [`ParThreadPool`]; then:
331    /// * `with_pool` can be called with owned value `with_pool(pool)` for all implementors; but also,
332    /// * with a shared reference `with_pool(&pool)` for most of the implementations (eg: rayon-core, yastl), and
333    /// * with a mutable reference `with_pool(&mut pool)` for others (eg: scoped_threadpool).
334    ///
335    /// [`DefaultPool`]: crate::DefaultPool
336    /// [`RunnerWithPool`]: crate::RunnerWithPool
337    /// [`StdDefaultPool`]: crate::StdDefaultPool
338    /// [`SequentialPool`]: crate::SequentialPool
339    /// [`with_runner`]: crate::ParIter::with_runner
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use orx_parallel::*;
345    ///
346    /// let inputs: Vec<_> = (0..42).collect();
347    ///
348    /// // uses the DefaultPool
349    /// // assuming "std" enabled, StdDefaultPool will be used; i.e., native threads
350    /// let sum = inputs.par().sum();
351    ///
352    /// // equivalent to:
353    /// #[cfg(feature = "std")]
354    /// {
355    ///     let sum2 = inputs.par().with_pool(StdDefaultPool::default()).sum();
356    ///     assert_eq!(sum, sum2);
357    /// }
358    ///
359    /// #[cfg(not(miri))]
360    /// #[cfg(feature = "scoped_threadpool")]
361    /// {
362    ///     let mut pool = scoped_threadpool::Pool::new(8);
363    ///
364    ///     // uses the scoped_threadpool::Pool created with 8 threads
365    ///     let sum2 = inputs.par().with_pool(&mut pool).sum();
366    ///     assert_eq!(sum, sum2);
367    /// }
368    ///
369    /// #[cfg(not(miri))]
370    /// #[cfg(feature = "rayon-core")]
371    /// {
372    ///     let pool = rayon_core::ThreadPoolBuilder::new()
373    ///         .num_threads(8)
374    ///         .build()
375    ///         .unwrap();
376    ///
377    ///     // uses the rayon-core::ThreadPool created with 8 threads
378    ///     let sum2 = inputs.par().with_pool(&pool).sum();
379    ///     assert_eq!(sum, sum2);
380    /// }
381    /// ```
382    fn with_pool<P: ParThreadPool>(
383        self,
384        pool: P,
385    ) -> impl ParIter<RunnerWithPool<P, R::Executor>, Item = Self::Item> {
386        let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
387        self.with_runner(runner)
388    }
389
390    // using transformations
391
392    /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
393    /// used variable throughout the computation.
394    ///
395    /// Note that each used thread will obtain exactly one instance of the variable.
396    ///
397    /// The signature of the `using` closure is `(thread_idx: usize) -> U` which will create an instance of
398    /// `U` with respect to the `thread_idx`. The `thread_idx` is the order of the spawned thread; i.e.,
399    /// if the parallel computation uses 8 threads, the thread indices will be 0, 1, ..., 7.
400    ///
401    /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
402    ///
403    /// # Examples
404    ///
405    /// ## Example 1: Channels
406    ///
407    /// The following example is taken from rayon's `for_each_with` documentation and converted to using transformation:
408    ///
409    /// ```ignore
410    /// use orx_parallel::*;
411    /// use std::sync::mpsc::channel;
412    ///
413    /// let (sender, receiver) = channel();
414    ///
415    /// (0..5)
416    ///     .into_par()
417    ///     .using(|_thread_idx| sender.clone())
418    ///     .for_each(|s, x| s.send(x).unwrap());
419    ///
420    /// let mut res: Vec<_> = receiver.iter().collect();
421    ///
422    /// res.sort();
423    ///
424    /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
425    /// ```
426    ///
427    /// ## Example 2: Random Number Generator
428    ///
429    /// Random number generator is one of the common use cases that is important for a certain class of algorithms.
430    ///
431    /// The following example demonstrates how to safely generate random numbers through mutable references within
432    /// a parallel computation.
433    ///
434    /// Notice the differences between sequential and parallel computation.
435    /// * In sequential computation, a mutable reference to `rng` is captured, while in parallel computation, we
436    ///   explicitly define that we will be `using` a random number generator.
437    /// * Parallel iterator does not mutable capture any variable from the scope; however, using transformation
438    ///   converts the `ParIter` into `ParIterUsing` which allows mutable access within all iterator methods.
439    ///
440    /// ```
441    /// use orx_parallel::*;
442    /// use rand::{Rng, SeedableRng};
443    /// use rand_chacha::ChaCha20Rng;
444    ///
445    /// fn random_walk(rng: &mut impl Rng, position: i64, num_steps: usize) -> i64 {
446    ///     (0..num_steps).fold(position, |p, _| random_step(rng, p))
447    /// }
448    ///
449    /// fn random_step(rng: &mut impl Rng, position: i64) -> i64 {
450    ///     match rng.random_bool(0.5) {
451    ///         true => position + 1,  // to right
452    ///         false => position - 1, // to left
453    ///     }
454    /// }
455    ///
456    /// fn input_positions() -> Vec<i64> {
457    ///     (-100..=100).collect()
458    /// }
459    ///
460    /// fn sequential() {
461    ///     let positions = input_positions();
462    ///
463    ///     let mut rng = ChaCha20Rng::seed_from_u64(42);
464    ///     let final_positions: Vec<_> = positions
465    ///         .iter()
466    ///         .copied()
467    ///         .map(|position| random_walk(&mut rng, position, 10))
468    ///         .collect();
469    ///     let sum_final_positions = final_positions.iter().sum::<i64>();
470    /// }
471    ///
472    /// fn parallel() {
473    ///     let positions = input_positions();
474    ///
475    ///     let final_positions: Vec<_> = positions
476    ///         .par()
477    ///         .copied()
478    ///         .using(|t_idx| ChaCha20Rng::seed_from_u64(42 * t_idx as u64))
479    ///         .map(|rng, position| random_walk(rng, position, 10))
480    ///         .collect();
481    ///     let sum_final_positions = final_positions.iter().sum::<i64>();
482    /// }
483    ///
484    /// sequential();
485    /// parallel();
486    /// ```
487    ///
488    /// ## Example 3: Metrics Collection
489    ///
490    /// The following example demonstrates how to collect metrics about a parallel computation with `using` transformation and
491    /// some `unsafe` help with interior mutability.
492    ///
493    /// ```
494    /// use orx_parallel::*;
495    /// use std::cell::UnsafeCell;
496    ///
497    /// const N: u64 = 1_000;
498    /// const MAX_NUM_THREADS: usize = 4;
499    ///
500    /// // just some work
501    /// fn fibonacci(n: u64) -> u64 {
502    ///     let mut a = 0;
503    ///     let mut b = 1;
504    ///     for _ in 0..n {
505    ///         let c = a + b;
506    ///         a = b;
507    ///         b = c;
508    ///     }
509    ///     a
510    /// }
511    ///
512    /// #[derive(Default, Debug)]
513    /// struct ThreadMetrics {
514    ///     thread_idx: usize,
515    ///     num_items_handled: usize,
516    ///     handled_42: bool,
517    ///     num_filtered_out: usize,
518    /// }
519    ///
520    /// struct ThreadMetricsWriter<'a> {
521    ///     metrics_ref: &'a mut ThreadMetrics,
522    /// }
523    ///
524    /// struct ComputationMetrics {
525    ///     thread_metrics: UnsafeCell<[ThreadMetrics; MAX_NUM_THREADS]>,
526    /// }
527    /// unsafe impl Sync for ComputationMetrics {}
528    /// impl ComputationMetrics {
529    ///     fn new() -> Self {
530    ///         let mut thread_metrics: [ThreadMetrics; MAX_NUM_THREADS] = Default::default();
531    ///         for i in 0..MAX_NUM_THREADS {
532    ///             thread_metrics[i].thread_idx = i;
533    ///         }
534    ///         Self {
535    ///             thread_metrics: UnsafeCell::new(thread_metrics),
536    ///         }
537    ///     }
538    /// }
539    ///
540    /// impl ComputationMetrics {
541    ///     unsafe fn create_for_thread<'a>(&self, thread_idx: usize) -> ThreadMetricsWriter<'a> {
542    ///         // SAFETY: here we create a mutable variable to the thread_idx-th metrics
543    ///         // * If we call this method multiple times with the same index,
544    ///         //   we create multiple mutable references to the same ThreadMetrics,
545    ///         //   which would lead to a race condition.
546    ///         // * We must make sure that `create_for_thread` is called only once per thread.
547    ///         // * If we use `create_for_thread` within the `using` call to create mutable values
548    ///         //   used by the threads, we are certain that the parallel computation
549    ///         //   will only call this method once per thread; hence, it will not
550    ///         //   cause the race condition.
551    ///         // * On the other hand, we must ensure that we do not call this method
552    ///         //   externally.
553    ///         let array = unsafe { &mut *self.thread_metrics.get() };
554    ///         ThreadMetricsWriter {
555    ///             metrics_ref: &mut array[thread_idx],
556    ///         }
557    ///     }
558    /// }
559    ///
560    /// let mut metrics = ComputationMetrics::new();
561    ///
562    /// let input: Vec<u64> = (0..N).collect();
563    ///
564    /// let sum = input
565    ///     .par()
566    ///     // SAFETY: we do not call `create_for_thread` externally;
567    ///     // it is safe if it is called only by the parallel computation.
568    ///     // Since we unsafely implement Sync for ComputationMetrics,
569    ///     // we must ensure that ComputationMetrics is not used elsewhere.
570    ///     .using(|t| unsafe { metrics.create_for_thread(t) })
571    ///     .map(|m: &mut ThreadMetricsWriter<'_>, i| {
572    ///         // collect some useful metrics
573    ///         m.metrics_ref.num_items_handled += 1;
574    ///         m.metrics_ref.handled_42 |= *i == 42;
575    ///
576    ///         // actual work
577    ///         fibonacci((*i % 20) + 1) % 100
578    ///     })
579    ///     .filter(|m, i| {
580    ///         let is_even = i % 2 == 0;
581    ///
582    ///         if !is_even {
583    ///             m.metrics_ref.num_filtered_out += 1;
584    ///         }
585    ///
586    ///         is_even
587    ///     })
588    ///     .num_threads(MAX_NUM_THREADS)
589    ///     .sum();
590    ///
591    /// assert_eq!(sum, 9100);
592    ///
593    /// let total_by_metrics: usize = metrics
594    ///     .thread_metrics
595    ///     .get_mut()
596    ///     .iter()
597    ///     .map(|x| x.num_items_handled)
598    ///     .sum();
599    ///
600    /// assert_eq!(N as usize, total_by_metrics);
601    /// ```
602    ///
603    fn using<U, F>(
604        self,
605        using: F,
606    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
607    where
608        U: 'static,
609        F: Fn(usize) -> U + Sync;
610
611    /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
612    /// used variable throughout the computation.
613    ///
614    /// Note that each used thread will obtain exactly one instance of the variable.
615    ///
616    /// Each used thread receives a clone of the provided `value`.
617    /// Note that, `using_clone(value)` can be considered as a shorthand for `using(|_thread_idx| value.clone())`.
618    ///
619    /// Please see [`using`] for examples.
620    ///
621    /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
622    ///
623    /// [`using`]: crate::ParIter::using
624    fn using_clone<U>(
625        self,
626        value: U,
627    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
628    where
629        U: Clone + 'static;
630
631    // transformations into fallible computations
632
633    /// Transforms a parallel iterator where elements are of the result type; i.e., `ParIter<R, Item = Result<T, E>>`,
634    ///  into fallible parallel iterator with item type `T` and error type `E`; i.e., into `ParIterResult<R, Item = T, Err = E>`.
635    ///
636    /// `ParIterResult` is also a parallel iterator; however, with methods specialized for handling fallible computations
637    /// as follows:
638    ///
639    /// * All of its methods are based on the success path with item type of `T`.
640    /// * However, computations short-circuit and immediately return the observed error if any of the items
641    ///   is of the `Err` variant of the result enum.
642    ///
643    /// See [`ParIterResult`] for details.
644    ///
645    /// # Examples
646    ///
647    /// ```
648    /// use orx_parallel::*;
649    ///
650    /// // all succeeds
651    ///
652    /// let result_doubled: Result<Vec<i32>, _> = ["1", "2", "3"]
653    ///     .into_par()
654    ///     .map(|x| x.parse::<i32>())  // ParIter with Item=Result<i32, ParseIntError>
655    ///     .into_fallible_result()     // ParIterResult with Item=i32 and Err=ParseIntError
656    ///     .map(|x| x * 2)             // methods focus on the success path with Item=i32
657    ///     .collect();                 // methods return Result<_, Err>
658    ///                                 // where the Ok variant depends on the computation
659    ///
660    /// assert_eq!(result_doubled, Ok(vec![2, 4, 6]));
661    ///
662    /// // at least one fails
663    ///
664    /// let result_doubled: Result<Vec<i32>, _> = ["1", "x!", "3"]
665    ///     .into_par()
666    ///     .map(|x| x.parse::<i32>())
667    ///     .into_fallible_result()
668    ///     .map(|x| x * 2)
669    ///     .collect();
670    ///
671    /// assert!(result_doubled.is_err());
672    /// ```
673    fn into_fallible_result<T, E>(self) -> impl ParIterResult<R, Item = T, Err = E>
674    where
675        Self::Item: IntoResult<T, E>;
676
677    /// Transforms a parallel iterator where elements are of the option type; i.e., `ParIter<R, Item = Option<T>>`,
678    ///  into fallible parallel iterator with item type `T`; i.e., into `ParIterOption<R, Item = T>`.
679    ///
680    /// `ParIterOption` is also a parallel iterator; however, with methods specialized for handling fallible computations
681    /// as follows:
682    ///
683    /// * All of its methods are based on the success path with item type of `T`.
684    /// * However, computations short-circuit and immediately return None if any of the items
685    ///   is of the `None` variant of the option enum.
686    ///
687    /// See [`ParIterResult`] for details.
688    ///
689    /// # Examples
690    ///
691    /// ```
692    /// use orx_parallel::*;
693    ///
694    /// // all succeeds
695    ///
696    /// let result_doubled: Option<Vec<i32>> = ["1", "2", "3"]
697    ///     .into_par()
698    ///     .map(|x| x.parse::<i32>().ok())     // ParIter with Item=Option<i32>
699    ///     .into_fallible_option()             // ParIterOption with Item=i32
700    ///     .map(|x| x * 2)                     // methods focus on the success path with Item=i32
701    ///     .collect();                         // methods return Option<T>
702    ///                                         // where T depends on the computation
703    ///
704    /// assert_eq!(result_doubled, Some(vec![2, 4, 6]));
705    ///
706    /// // at least one fails
707    ///
708    /// let result_doubled: Option<Vec<i32>> = ["1", "x!", "3"]
709    ///     .into_par()
710    ///     .map(|x| x.parse::<i32>().ok())
711    ///     .into_fallible_option()
712    ///     .map(|x| x * 2)
713    ///     .collect();
714    ///
715    /// assert_eq!(result_doubled, None);
716    /// ```
717    fn into_fallible_option<T>(self) -> impl ParIterOption<R, Item = T>
718    where
719        Self::Item: IntoOption<T>,
720    {
721        ParOption::new(
722            self.map(|x| x.into_result_with_unit_err())
723                .into_fallible_result(),
724        )
725    }
726
727    // computation transformations
728
729    /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
730    ///
731    /// # Examples
732    ///
733    /// ```
734    /// use orx_parallel::*;
735    ///
736    /// let a = [1, 2, 3];
737    ///
738    /// let iter = a.into_par().map(|x| 2 * x);
739    ///
740    /// let b: Vec<_> = iter.collect();
741    /// assert_eq!(b, &[2, 4, 6]);
742    /// ```
743    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
744    where
745        Map: Fn(Self::Item) -> Out + Sync + Clone;
746
747    /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
748    ///
749    /// # Examples
750    ///
751    /// ```
752    /// use orx_parallel::*;
753    ///
754    /// let a = [1, 2, 3];
755    ///
756    /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
757    ///
758    /// let b: Vec<_> = iter.collect();
759    /// assert_eq!(b, &[1, 3]);
760    /// ```
761    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
762    where
763        Filter: Fn(&Self::Item) -> bool + Sync + Clone;
764
765    /// Creates an iterator that works like map, but flattens nested structure.
766    ///
767    /// # Examples
768    ///
769    /// ```
770    /// use orx_parallel::*;
771    ///
772    /// let words = ["alpha", "beta", "gamma"];
773    ///
774    /// // chars() returns an iterator
775    /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
776    ///
777    /// let merged: String = all_chars.iter().collect();
778    /// assert_eq!(merged, "alphabetagamma");
779    /// ```
780    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
781    where
782        IOut: IntoIterator,
783        FlatMap: Fn(Self::Item) -> IOut + Sync + Clone;
784
785    /// Creates an iterator that both filters and maps.
786    ///
787    /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
788    ///
789    /// `filter_map` can be used to make chains of `filter` and `map` more concise.
790    /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
791    ///
792    /// # Examples
793    ///
794    /// ```
795    /// use orx_parallel::*;
796    ///
797    /// let a = ["1", "two", "NaN", "four", "5"];
798    ///
799    /// let numbers: Vec<_> = a
800    ///     .into_par()
801    ///     .filter_map(|s| s.parse::<usize>().ok())
802    ///     .collect();
803    ///
804    /// assert_eq!(numbers, [1, 5]);
805    /// ```
806    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
807    where
808        FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone;
809
810    /// Does something with each element of an iterator, passing the value on.
811    ///
812    /// When using iterators, you’ll often chain several of them together.
813    /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
814    /// To do that, insert a call to `inspect()`.
815    ///
816    /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
817    /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
818    ///
819    /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
820    /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
821    /// collect some intermediate values during parallel execution for further inspection.
822    /// The following example demonstrates such a use case.
823    ///
824    /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
825    ///
826    /// # Examples
827    ///
828    /// ```
829    /// use orx_parallel::*;
830    /// use orx_concurrent_bag::*;
831    ///
832    /// let a = vec![1, 4, 2, 3];
833    ///
834    /// // let's add some inspect() calls to investigate what's happening
835    /// // - log some events
836    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
837    /// let bag = ConcurrentBag::new();
838    ///
839    /// let sum = a
840    ///     .par()
841    ///     .copied()
842    ///     .inspect(|x| println!("about to filter: {x}"))
843    ///     .filter(|x| x % 2 == 0)
844    ///     .inspect(|x| {
845    ///         bag.push(*x);
846    ///         println!("made it through filter: {x}");
847    ///     })
848    ///     .sum();
849    /// println!("{sum}");
850    ///
851    /// let mut values_made_through = bag.into_inner();
852    /// values_made_through.sort();
853    /// assert_eq!(values_made_through, [2, 4]);
854    /// ```
855    ///
856    /// This will print:
857    ///
858    /// ```console
859    /// about to filter: 1
860    /// about to filter: 4
861    /// made it through filter: 4
862    /// about to filter: 2
863    /// made it through filter: 2
864    /// about to filter: 3
865    /// 6
866    /// ```
867    fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
868    where
869        Operation: Fn(&Self::Item) + Sync + Clone,
870    {
871        let map = move |x| {
872            operation(&x);
873            x
874        };
875        self.map(map)
876    }
877
878    // while transformations
879
880    /// Creates an iterator that yields elements based on the predicate `take_while`.
881    ///
882    /// It will call this closure on each element of the iterator, and yield elements while it returns true.
883    ///
884    /// After false is returned, take_while’s job is over, and the rest of the elements are ignored.
885    ///
886    /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
887    ///
888    /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
889    /// elements before the predicate returns false are collected in the correct order; and hence, the result
890    /// is deterministic.
891    ///
892    /// # Examples
893    ///
894    /// ```
895    /// use orx_parallel::*;
896    ///
897    /// let iter = (0..10_000).par().take_while(|x| *x != 5_000);
898    /// let b: Vec<_> = iter.collect();
899    ///
900    /// assert_eq!(b, (0..5_000).collect::<Vec<_>>());
901    /// ```
902    fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
903    where
904        While: Fn(&Self::Item) -> bool + Sync + Clone;
905
906    /// Creates an iterator that both yields elements based on the predicate `map_while` and maps.
907    ///
908    /// `map_while` takes a closure as an argument.
909    /// It will call this closure on each element of the iterator, and yield elements while it returns Some(_).
910    ///
911    /// After None is returned, map_while job is over, and the rest of the elements are ignored.
912    ///
913    /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
914    ///
915    /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
916    /// elements before the predicate returns None are collected in the correct order; and hence, the result
917    /// is deterministic.
918    ///
919    /// ```
920    /// use orx_parallel::*;
921    ///
922    /// let iter = (0..10_000)
923    ///     .par()
924    ///     .map(|x| x as i32 - 5_000) // -5_000..10_000
925    ///     .map_while(|x| 16i32.checked_div(x));
926    /// let b: Vec<_> = iter.collect();
927    ///
928    /// assert_eq!(b, (-5_000..0).map(|x| 16 / x).collect::<Vec<_>>());
929    /// ```
930    fn map_while<Out, MapWhile>(self, map_while: MapWhile) -> impl ParIter<R, Item = Out>
931    where
932        MapWhile: Fn(Self::Item) -> Option<Out> + Sync + Clone,
933    {
934        self.map(map_while).take_while(|x| x.is_some()).map(|x| {
935            // SAFETY: since x passed the whilst(is-some) check, unwrap_unchecked
936            unsafe { x.unwrap_unchecked() }
937        })
938    }
939
940    // special item transformations
941
942    /// Creates an iterator which copies all of its elements.
943    ///
944    /// # Examples
945    ///
946    /// ```
947    /// use orx_parallel::*;
948    ///
949    /// let a = vec![1, 2, 3];
950    ///
951    /// let v_copied: Vec<_> = a.par().copied().collect();
952    ///
953    /// // copied is the same as .map(|&x| x)
954    /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
955    ///
956    /// assert_eq!(v_copied, vec![1, 2, 3]);
957    /// assert_eq!(v_map, vec![1, 2, 3]);
958    /// ```
959    fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
960    where
961        T: 'a + Copy,
962        Self: ParIter<R, Item = &'a T>,
963    {
964        self.map(map_copy)
965    }
966
967    /// Creates an iterator which clones all of its elements.
968    ///
969    /// # Examples
970    ///
971    /// ```
972    /// use orx_parallel::*;
973    ///
974    /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
975    ///
976    /// let v_cloned: Vec<_> = a.par().cloned().collect();
977    ///
978    /// // cloned is the same as .map(|x| x.clone())
979    /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
980    ///
981    /// assert_eq!(
982    ///     v_cloned,
983    ///     vec![String::from("1"), String::from("2"), String::from("3")]
984    /// );
985    /// assert_eq!(
986    ///     v_map,
987    ///     vec![String::from("1"), String::from("2"), String::from("3")]
988    /// );
989    /// ```
990    fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
991    where
992        T: 'a + Clone,
993        Self: ParIter<R, Item = &'a T>,
994    {
995        self.map(map_clone)
996    }
997
998    /// Creates an iterator that flattens nested structure.
999    ///
1000    /// This is useful when you have an iterator of iterators or an iterator of things that can be
1001    /// turned into iterators and you want to remove one level of indirection.
1002    ///
1003    /// # Examples
1004    ///
1005    /// Basic usage.
1006    ///
1007    /// ```
1008    /// use orx_parallel::*;
1009    ///
1010    /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
1011    /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
1012    /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
1013    /// ```
1014    ///
1015    /// Mapping and then flattening:
1016    ///
1017    /// ```
1018    /// use orx_parallel::*;
1019    ///
1020    /// let words = vec!["alpha", "beta", "gamma"];
1021    ///
1022    /// // chars() returns an iterator
1023    /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
1024    /// let merged: String = all_characters.into_iter().collect();
1025    /// assert_eq!(merged, "alphabetagamma");
1026    /// ```
1027    ///
1028    /// But actually, you can write this in terms of `flat_map`,
1029    /// which is preferable in this case since it conveys intent more clearly:
1030    ///
1031    /// ```
1032    /// use orx_parallel::*;
1033    ///
1034    /// let words = vec!["alpha", "beta", "gamma"];
1035    ///
1036    /// // chars() returns an iterator
1037    /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
1038    /// let merged: String = all_characters.into_iter().collect();
1039    /// assert_eq!(merged, "alphabetagamma");
1040    /// ```
1041    fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
1042    where
1043        Self::Item: IntoIterator,
1044    {
1045        let map = |e: Self::Item| e.into_iter();
1046        self.flat_map(map)
1047    }
1048
1049    // collect
1050
1051    /// Collects all the items from an iterator into a collection.
1052    ///
1053    /// This is useful when you already have a collection and want to add the iterator items to it.
1054    ///
1055    /// The collection is passed in as owned value, and returned back with the additional elements.
1056    ///
1057    /// All collections implementing [`ParCollectInto`] can be used to collect into.
1058    ///
1059    /// [`ParCollectInto`]: crate::ParCollectInto
1060    ///
1061    /// # Examples
1062    ///
1063    /// ```
1064    /// use orx_parallel::*;
1065    ///
1066    /// let a = vec![1, 2, 3];
1067    ///
1068    /// let vec: Vec<i32> = vec![0, 1];
1069    /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
1070    /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
1071    ///
1072    /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
1073    /// ```
1074    fn collect_into<C>(self, output: C) -> C
1075    where
1076        C: ParCollectInto<Self::Item>;
1077
1078    /// Transforms an iterator into a collection.
1079    ///
1080    /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
1081    /// the type of the result collection; or turbofish annotation can be used.
1082    ///
1083    /// All collections implementing [`ParCollectInto`] can be used to collect into.
1084    ///
1085    /// [`ParCollectInto`]: crate::ParCollectInto
1086    ///
1087    /// # Examples
1088    ///
1089    /// ```
1090    /// use orx_parallel::*;
1091    ///
1092    /// let a = vec![1, 2, 3];
1093    ///
1094    /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
1095    ///
1096    /// assert_eq!(vec![2, 4, 6], doubled);
1097    /// ```
1098    fn collect<C>(self) -> C
1099    where
1100        C: ParCollectInto<Self::Item>,
1101    {
1102        let output = C::empty(self.con_iter().try_get_len());
1103        self.collect_into(output)
1104    }
1105
1106    // reduce
1107
1108    /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
1109    ///
1110    /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
1111    ///
1112    /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
1113    ///
1114    /// # Example
1115    ///
1116    /// ```
1117    /// use orx_parallel::*;
1118    ///
1119    /// let inputs = 1..10;
1120    /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
1121    /// assert_eq!(reduced, 45);
1122    /// ```
1123    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
1124    where
1125        Self::Item: Send,
1126        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
1127
1128    /// Tests if every element of the iterator matches a predicate.
1129    ///
1130    /// `all` takes a `predicate` that returns true or false.
1131    /// It applies this closure to each element of the iterator,
1132    /// and if they all return true, then so does `all`.
1133    /// If any of them returns false, it returns false.
1134    ///
1135    /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
1136    /// given that no matter what else happens, the result will also be false.
1137    ///
1138    /// An empty iterator returns true.
1139    ///
1140    /// # Examples
1141    ///
1142    /// ```
1143    /// use orx_parallel::*;
1144    ///
1145    /// let mut a = vec![1, 2, 3];
1146    /// assert!(a.par().all(|x| **x > 0));
1147    /// assert!(!a.par().all(|x| **x > 2));
1148    ///
1149    /// a.clear();
1150    /// assert!(a.par().all(|x| **x > 2)); // empty iterator
1151    /// ```
1152    fn all<Predicate>(self, predicate: Predicate) -> bool
1153    where
1154        Self::Item: Send,
1155        Predicate: Fn(&Self::Item) -> bool + Sync,
1156    {
1157        let violates = |x: &Self::Item| !predicate(x);
1158        self.find(violates).is_none()
1159    }
1160
1161    /// Tests if any element of the iterator matches a predicate.
1162    ///
1163    /// `any` takes a `predicate` that returns true or false.
1164    /// It applies this closure to each element of the iterator,
1165    /// and if any of the elements returns true, then so does `any`.
1166    /// If all of them return false, it returns false.
1167    ///
1168    /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
1169    /// given that no matter what else happens, the result will also be true.
1170    ///
1171    /// An empty iterator returns false.
1172    ///
1173    /// # Examples
1174    ///
1175    /// ```
1176    /// use orx_parallel::*;
1177    ///
1178    /// let mut a = vec![1, 2, 3];
1179    /// assert!(a.par().any(|x| **x > 0));
1180    /// assert!(!a.par().any(|x| **x > 5));
1181    ///
1182    /// a.clear();
1183    /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
1184    /// ```
1185    fn any<Predicate>(self, predicate: Predicate) -> bool
1186    where
1187        Self::Item: Send,
1188        Predicate: Fn(&Self::Item) -> bool + Sync,
1189    {
1190        self.find(predicate).is_some()
1191    }
1192
1193    /// Consumes the iterator, counting the number of iterations and returning it.
1194    ///
1195    /// # Examples
1196    ///
1197    /// ```
1198    /// use orx_parallel::*;
1199    ///
1200    /// let a = vec![1, 2, 3];
1201    /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
1202    /// ```
1203    fn count(self) -> usize {
1204        self.map(map_count).reduce(reduce_sum).unwrap_or(0)
1205    }
1206
1207    /// Calls a closure on each element of an iterator.
1208    ///
1209    /// # Examples
1210    ///
1211    /// Basic usage:
1212    ///
1213    /// ```
1214    /// use orx_parallel::*;
1215    /// use std::sync::mpsc::channel;
1216    ///
1217    /// let (tx, rx) = channel();
1218    /// (0..5)
1219    ///     .par()
1220    ///     .map(|x| x * 2 + 1)
1221    ///     .for_each(move |x| tx.send(x).unwrap());
1222    ///
1223    /// let mut v: Vec<_> = rx.iter().collect();
1224    /// v.sort(); // order can be mixed, since messages will be sent in parallel
1225    /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
1226    /// ```
1227    ///
1228    /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
1229    /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
1230    /// In the following example, we log every element that satisfies a predicate in parallel.
1231    ///
1232    /// ```
1233    /// use orx_parallel::*;
1234    ///
1235    /// (0..5)
1236    ///     .par()
1237    ///     .flat_map(|x| x * 100..x * 110)
1238    ///     .filter(|&x| x % 3 == 0)
1239    ///     .for_each(|x| println!("{x}"));
1240    /// ```
1241    fn for_each<Operation>(self, operation: Operation)
1242    where
1243        Operation: Fn(Self::Item) + Sync,
1244    {
1245        let map = |x| operation(x);
1246        let _ = self.map(map).reduce(reduce_unit);
1247    }
1248
1249    /// Returns the maximum element of an iterator.
1250    ///
1251    /// If the iterator is empty, None is returned.
1252    ///
1253    /// # Examples
1254    ///
1255    /// ```
1256    /// use orx_parallel::*;
1257    ///
1258    /// let a = vec![1, 2, 3];
1259    /// let b: Vec<u32> = Vec::new();
1260    ///
1261    /// assert_eq!(a.par().max(), Some(&3));
1262    /// assert_eq!(b.par().max(), None);
1263    /// ```
1264    fn max(self) -> Option<Self::Item>
1265    where
1266        Self::Item: Ord + Send,
1267    {
1268        self.reduce(Ord::max)
1269    }
1270
1271    /// Returns the element that gives the maximum value with respect to the specified `compare` function.
1272    ///
1273    /// If the iterator is empty, None is returned.
1274    ///
1275    /// # Examples
1276    ///
1277    /// ```
1278    /// use orx_parallel::*;
1279    ///
1280    /// let a = vec![-3_i32, 0, 1, 5, -10];
1281    /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
1282    /// ```
1283    fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1284    where
1285        Self::Item: Send,
1286        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1287    {
1288        let reduce = |x, y| match compare(&x, &y) {
1289            Ordering::Greater | Ordering::Equal => x,
1290            Ordering::Less => y,
1291        };
1292        self.reduce(reduce)
1293    }
1294
1295    /// Returns the element that gives the maximum value from the specified function.
1296    ///
1297    /// If the iterator is empty, None is returned.
1298    ///
1299    /// # Examples
1300    ///
1301    /// ```
1302    /// use orx_parallel::*;
1303    ///
1304    /// let a = vec![-3_i32, 0, 1, 5, -10];
1305    /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
1306    /// ```
1307    fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
1308    where
1309        Self::Item: Send,
1310        Key: Ord,
1311        GetKey: Fn(&Self::Item) -> Key + Sync,
1312    {
1313        let reduce = |x, y| match key(&x).cmp(&key(&y)) {
1314            Ordering::Greater | Ordering::Equal => x,
1315            Ordering::Less => y,
1316        };
1317        self.reduce(reduce)
1318    }
1319
1320    /// Returns the minimum element of an iterator.
1321    ///
1322    /// If the iterator is empty, None is returned.
1323    ///
1324    /// # Examples
1325    ///
1326    /// ```
1327    /// use orx_parallel::*;
1328    ///
1329    /// let a = vec![1, 2, 3];
1330    /// let b: Vec<u32> = Vec::new();
1331    ///
1332    /// assert_eq!(a.par().min(), Some(&1));
1333    /// assert_eq!(b.par().min(), None);
1334    /// ```
1335    fn min(self) -> Option<Self::Item>
1336    where
1337        Self::Item: Ord + Send,
1338    {
1339        self.reduce(Ord::min)
1340    }
1341
1342    /// Returns the element that gives the minimum value with respect to the specified `compare` function.
1343    ///
1344    /// If the iterator is empty, None is returned.
1345    ///
1346    /// # Examples
1347    ///
1348    /// ```
1349    /// use orx_parallel::*;
1350    ///
1351    /// let a = vec![-3_i32, 0, 1, 5, -10];
1352    /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
1353    /// ```
1354    fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1355    where
1356        Self::Item: Send,
1357        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1358    {
1359        let reduce = |x, y| match compare(&x, &y) {
1360            Ordering::Less | Ordering::Equal => x,
1361            Ordering::Greater => y,
1362        };
1363        self.reduce(reduce)
1364    }
1365
1366    /// Returns the element that gives the minimum value from the specified function.
1367    ///
1368    /// If the iterator is empty, None is returned.
1369    ///
1370    /// # Examples
1371    ///
1372    /// ```
1373    /// use orx_parallel::*;
1374    ///
1375    /// let a = vec![-3_i32, 0, 1, 5, -10];
1376    /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
1377    /// ```
1378    fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
1379    where
1380        Self::Item: Send,
1381        Key: Ord,
1382        GetKey: Fn(&Self::Item) -> Key + Sync,
1383    {
1384        let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1385            Ordering::Less | Ordering::Equal => x,
1386            Ordering::Greater => y,
1387        };
1388        self.reduce(reduce)
1389    }
1390
1391    /// Sums the elements of an iterator.
1392    ///
1393    /// Takes each element, adds them together, and returns the result.
1394    ///
1395    /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
1396    ///
1397    /// `sum` can be used to sum any type implementing [`Sum<Out>`].
1398    ///
1399    /// [`Sum<Out>`]: crate::Sum
1400    ///
1401    /// # Examples
1402    ///
1403    /// ```
1404    /// use orx_parallel::*;
1405    ///
1406    /// let a = vec![1, 2, 3];
1407    /// let sum: i32 = a.par().sum();
1408    ///
1409    /// assert_eq!(sum, 6);
1410    /// ```
1411    fn sum<Out>(self) -> Out
1412    where
1413        Self::Item: Sum<Out>,
1414        Out: Send,
1415    {
1416        self.map(Self::Item::map)
1417            .reduce(Self::Item::reduce)
1418            .unwrap_or(Self::Item::zero())
1419    }
1420
1421    // early exit
1422
1423    /// Returns the first (or any) element of the iterator; returns None if it is empty.
1424    ///
1425    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1426    /// * any element is returned if `IterationOrder::Arbitrary` is set.
1427    ///
1428    /// # Examples
1429    ///
1430    /// The following example demonstrates the usage of first with default `Ordered` iteration.
1431    /// This guarantees that the first element with respect to position in the input sequence
1432    /// is returned.
1433    ///
1434    /// ```
1435    /// use orx_parallel::*;
1436    ///
1437    /// let a: Vec<usize> = vec![];
1438    /// assert_eq!(a.par().copied().first(), None);
1439    ///
1440    /// let a = vec![1, 2, 3];
1441    /// assert_eq!(a.par().copied().first(), Some(1));
1442    ///
1443    /// let a = 1..10_000;
1444    /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
1445    /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
1446    ///
1447    /// // or equivalently,
1448    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1449    /// ```
1450    ///
1451    /// When the order is set to `Arbitrary`, `first` might return any of the elements,
1452    /// whichever is visited first depending on the parallel execution.
1453    ///
1454    /// ```
1455    /// use orx_parallel::*;
1456    ///
1457    /// let a = 1..10_000;
1458    ///
1459    /// // might return either of 3421 or 2*3421
1460    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
1461    /// assert!([3421, 2 * 3421].contains(&any));
1462    ///
1463    /// // or equivalently,
1464    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1465    /// assert!([3421, 2 * 3421].contains(&any));
1466    /// ```
1467    fn first(self) -> Option<Self::Item>
1468    where
1469        Self::Item: Send;
1470
1471    /// Searches for an element of an iterator that satisfies a `predicate`.
1472    ///
1473    /// Depending on the set iteration order of the parallel iterator, returns
1474    ///
1475    /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
1476    /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
1477    ///
1478    /// `find` takes a closure that returns true or false.
1479    /// It applies this closure to each element of the iterator,
1480    /// and returns `Some(x)` where `x` is the first element that returns true.
1481    /// If they all return false, it returns None.
1482    ///
1483    /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
1484    ///
1485    /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
1486    ///
1487    /// # Examples
1488    ///
1489    /// The following example demonstrates the usage of first with default `Ordered` iteration.
1490    /// This guarantees that the first element with respect to position in the input sequence
1491    /// is returned.
1492    ///
1493    /// ```
1494    /// use orx_parallel::*;
1495    ///
1496    /// let a = 1..10_000;
1497    /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
1498    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1499    /// ```
1500    ///
1501    /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
1502    /// whichever is found first depending on the parallel execution.
1503    ///
1504    /// ```
1505    /// use orx_parallel::*;
1506    ///
1507    /// let a = 1..10_000;
1508    ///
1509    /// // might return either of 3421 or 2*3421
1510    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1511    /// assert!([3421, 2 * 3421].contains(&any));
1512    /// ```
1513    fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
1514    where
1515        Self::Item: Send,
1516        Predicate: Fn(&Self::Item) -> bool + Sync,
1517    {
1518        self.filter(&predicate).first()
1519    }
1520}