orx_parallel/
par_iter.rs

1use crate::computational_variants::fallible_option::ParOption;
2use crate::par_iter_option::{IntoOption, ParIterOption};
3use crate::par_iter_result::IntoResult;
4use crate::runner::{DefaultRunner, ParallelRunner};
5use crate::using::{UsingClone, UsingFun};
6use crate::{ParIterResult, ParThreadPool, RunnerWithPool};
7use crate::{
8    ParIterUsing, Params,
9    collect_into::ParCollectInto,
10    default_fns::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
11    parameters::{ChunkSize, IterationOrder, NumThreads},
12    special_type_sets::Sum,
13};
14use core::cmp::Ordering;
15use orx_concurrent_iter::ConcurrentIter;
16
17/// Parallel iterator.
18pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
19where
20    R: ParallelRunner,
21{
22    /// Element type of the parallel iterator.
23    type Item;
24
25    /// Returns a reference to the input concurrent iterator.
26    fn con_iter(&self) -> &impl ConcurrentIter;
27
28    /// Parameters of the parallel iterator.
29    ///
30    /// # Examples
31    ///
32    /// ```
33    /// use orx_parallel::*;
34    /// use std::num::NonZero;
35    ///
36    /// let vec = vec![1, 2, 3, 4];
37    ///
38    /// assert_eq!(
39    ///     vec.par().params(),
40    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
41    /// );
42    ///
43    /// assert_eq!(
44    ///     vec.par().num_threads(0).chunk_size(0).params(),
45    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
46    /// );
47    ///
48    /// assert_eq!(
49    ///     vec.par().num_threads(1).params(),
50    ///     Params::new(
51    ///         NumThreads::Max(NonZero::new(1).unwrap()),
52    ///         ChunkSize::Auto,
53    ///         IterationOrder::Ordered
54    ///     )
55    /// );
56    ///
57    /// assert_eq!(
58    ///     vec.par().num_threads(4).chunk_size(64).params(),
59    ///     Params::new(
60    ///         NumThreads::Max(NonZero::new(4).unwrap()),
61    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
62    ///         IterationOrder::Ordered
63    ///     )
64    /// );
65    ///
66    /// assert_eq!(
67    ///     vec.par()
68    ///         .num_threads(8)
69    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
70    ///         .iteration_order(IterationOrder::Arbitrary)
71    ///         .params(),
72    ///     Params::new(
73    ///         NumThreads::Max(NonZero::new(8).unwrap()),
74    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
75    ///         IterationOrder::Arbitrary
76    ///     )
77    /// );
78    /// ```
79    fn params(&self) -> Params;
80
81    // params transformations
82
83    /// Sets the number of threads to be used in the parallel execution.
84    /// Integers can be used as the argument with the following mapping:
85    ///
86    /// * `0` -> `NumThreads::Auto`
87    /// * `1` -> `NumThreads::sequential()`
88    /// * `n > 0` -> `NumThreads::Max(n)`
89    ///
90    /// See [`NumThreads`] for details.
91    ///
92    /// # Examples
93    ///
94    /// ```
95    /// use orx_parallel::*;
96    /// use std::num::NonZero;
97    ///
98    /// let vec = vec![1, 2, 3, 4];
99    ///
100    /// // all available threads can be used
101    ///
102    /// assert_eq!(
103    ///     vec.par().params(),
104    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
105    /// );
106    ///
107    /// assert_eq!(
108    ///     vec.par().num_threads(0).chunk_size(0).params(),
109    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
110    /// );
111    ///
112    /// // computation will be executed sequentially on the main thread, no parallelization
113    ///
114    /// assert_eq!(
115    ///     vec.par().num_threads(1).params(),
116    ///     Params::new(
117    ///         NumThreads::Max(NonZero::new(1).unwrap()),
118    ///         ChunkSize::Auto,
119    ///         IterationOrder::Ordered
120    ///     )
121    /// );
122    ///
123    /// // maximum 4 threads can be used
124    /// assert_eq!(
125    ///     vec.par().num_threads(4).chunk_size(64).params(),
126    ///     Params::new(
127    ///         NumThreads::Max(NonZero::new(4).unwrap()),
128    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
129    ///         IterationOrder::Ordered
130    ///     )
131    /// );
132    ///
133    /// // maximum 8 threads can be used
134    /// assert_eq!(
135    ///     vec.par()
136    ///         .num_threads(8)
137    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
138    ///         .iteration_order(IterationOrder::Arbitrary)
139    ///         .params(),
140    ///     Params::new(
141    ///         NumThreads::Max(NonZero::new(8).unwrap()),
142    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
143    ///         IterationOrder::Arbitrary
144    ///     )
145    /// );
146    /// ```
147    fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
148
149    /// Sets the number of elements to be pulled from the concurrent iterator during the
150    /// parallel execution. When integers are used as argument, the following mapping applies:
151    ///
152    /// * `0` -> `ChunkSize::Auto`
153    /// * `n > 0` -> `ChunkSize::Exact(n)`
154    ///
155    /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
156    ///
157    /// See [`ChunkSize`] for details.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// use orx_parallel::*;
163    /// use std::num::NonZero;
164    ///
165    /// let vec = vec![1, 2, 3, 4];
166    ///
167    /// // chunk sizes will be dynamically decided by the parallel runner
168    ///
169    /// assert_eq!(
170    ///     vec.par().params(),
171    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
172    /// );
173    ///
174    /// assert_eq!(
175    ///     vec.par().num_threads(0).chunk_size(0).params(),
176    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
177    /// );
178    ///
179    /// assert_eq!(
180    ///     vec.par().num_threads(1).params(),
181    ///     Params::new(
182    ///         NumThreads::Max(NonZero::new(1).unwrap()),
183    ///         ChunkSize::Auto,
184    ///         IterationOrder::Ordered
185    ///     )
186    /// );
187    ///
188    /// // chunk size will always be 64, parallel runner cannot change
189    ///
190    /// assert_eq!(
191    ///     vec.par().num_threads(4).chunk_size(64).params(),
192    ///     Params::new(
193    ///         NumThreads::Max(NonZero::new(4).unwrap()),
194    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
195    ///         IterationOrder::Ordered
196    ///     )
197    /// );
198    ///
199    /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
200    ///
201    /// assert_eq!(
202    ///     vec.par()
203    ///         .num_threads(8)
204    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
205    ///         .iteration_order(IterationOrder::Arbitrary)
206    ///         .params(),
207    ///     Params::new(
208    ///         NumThreads::Max(NonZero::new(8).unwrap()),
209    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
210    ///         IterationOrder::Arbitrary
211    ///     )
212    /// );
213    /// ```
214    fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
215
216    /// Sets the iteration order of the parallel computation.
217    ///
218    /// See [`IterationOrder`] for details.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use orx_parallel::*;
224    ///
225    /// let vec = vec![1, 2, 3, 4];
226    ///
227    /// // results are collected in order consistent to the input order,
228    /// // or find returns the first element satisfying the predicate
229    ///
230    /// assert_eq!(
231    ///     vec.par().params(),
232    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
233    /// );
234    ///
235    /// assert_eq!(
236    ///     vec.par().iteration_order(IterationOrder::Ordered).params(),
237    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
238    /// );
239    ///
240    /// // results might be collected in arbitrary order
241    /// // or find returns the any of the elements satisfying the predicate
242    ///
243    /// assert_eq!(
244    ///     vec.par().iteration_order(IterationOrder::Arbitrary).params(),
245    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
246    /// );
247    /// ```
248    fn iteration_order(self, order: IterationOrder) -> Self;
249
250    /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
251    ///
252    /// See also [`with_pool`].
253    ///
254    /// Parallel runner of each computation can be independently specified using `with_runner`.
255    ///
256    /// When not specified the default runner is used, which is:
257    /// * [`RunnerWithPool`] with [`StdDefaultPool`] when "std" feature is enabled,
258    /// * [`RunnerWithPool`] with [`SequentialPool`] when "std" feature is disabled.
259    ///
260    /// Note that [`StdDefaultPool`] uses standard native threads.
261    ///
262    /// When working in a no-std environment, the default runner falls back to sequential.
263    /// In this case, `RunnerWithPool` using a particular thread pool must be passed in using `with_runner`
264    /// transformation to achieve parallel computation.
265    ///
266    /// [`RunnerWithPool`]: crate::RunnerWithPool
267    /// [`StdDefaultPool`]: crate::StdDefaultPool
268    /// [`SequentialPool`]: crate::SequentialPool
269    /// [`with_pool`]: crate::ParIter::with_pool
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// use orx_parallel::*;
275    ///
276    /// let inputs: Vec<_> = (0..42).collect();
277    ///
278    /// // uses the DefaultRunner
279    /// // assuming "std" enabled, RunnerWithPool<StdDefaultPool> will be used; i.e., native threads
280    /// let sum = inputs.par().sum();
281    ///
282    /// // equivalent to:
283    /// let sum2 = inputs.par().with_runner(RunnerWithPool::from(StdDefaultPool::default())).sum();
284    /// assert_eq!(sum, sum2);
285    ///
286    /// #[cfg(feature = "scoped_threadpool")]
287    /// {
288    ///     let mut pool = scoped_threadpool::Pool::new(8);
289    ///
290    ///     // uses the scoped_threadpool::Pool created with 8 threads
291    ///     let sum2 = inputs.par().with_runner(RunnerWithPool::from(&mut pool)).sum();
292    ///     assert_eq!(sum, sum2);
293    /// }
294    ///
295    /// #[cfg(feature = "rayon-core")]
296    /// {
297    ///     let pool = rayon_core::ThreadPoolBuilder::new()
298    ///         .num_threads(8)
299    ///         .build()
300    ///         .unwrap();
301    ///
302    ///     // uses the rayon-core::ThreadPool created with 8 threads
303    ///     let sum2 = inputs.par().with_runner(RunnerWithPool::from(&pool)).sum();
304    ///     assert_eq!(sum, sum2);
305    /// }
306    /// ```
307    fn with_runner<Q: ParallelRunner>(self, runner: Q) -> impl ParIter<Q, Item = Self::Item>;
308
309    /// Rather than [`DefaultPool`], uses the parallel runner with the given `pool` implementing
310    /// [`ParThreadPool`].
311    ///
312    /// See also [`with_runner`].
313    ///
314    /// Thread pool of each computation can be independently specified using `with_pool`.
315    ///
316    /// When not specified the default pool is used, which is:
317    /// * [`StdDefaultPool`] when "std" feature is enabled,
318    /// * [`SequentialPool`] when "std" feature is disabled.
319    ///
320    /// Note that [`StdDefaultPool`] uses standard native threads.
321    ///
322    /// When working in a no-std environment, the default pool falls back to sequential.
323    /// In this case, a thread pool must be passed in using `with_pool` transformation to achieve parallel computation.
324    ///
325    /// Note that if a thread pool, say `pool`, is of a type that implements [`ParThreadPool`]; then:
326    /// * `with_pool` can be called with owned value `with_pool(pool)` for all implementors; but also,
327    /// * with a shared reference `with_pool(&pool)` for most of the implementations (eg: rayon-core, yastl), and
328    /// * with a mutable reference `with_pool(&mut pool)` for others (eg: scoped_threadpool).
329    ///
330    /// [`DefaultPool`]: crate::DefaultPool
331    /// [`RunnerWithPool`]: crate::RunnerWithPool
332    /// [`StdDefaultPool`]: crate::StdDefaultPool
333    /// [`SequentialPool`]: crate::SequentialPool
334    /// [`with_runner`]: crate::ParIter::with_runner
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// use orx_parallel::*;
340    ///
341    /// let inputs: Vec<_> = (0..42).collect();
342    ///
343    /// // uses the DefaultPool
344    /// // assuming "std" enabled, StdDefaultPool will be used; i.e., native threads
345    /// let sum = inputs.par().sum();
346    ///
347    /// // equivalent to:
348    /// let sum2 = inputs.par().with_pool(StdDefaultPool::default()).sum();
349    /// assert_eq!(sum, sum2);
350    ///
351    /// #[cfg(feature = "scoped_threadpool")]
352    /// {
353    ///     let mut pool = scoped_threadpool::Pool::new(8);
354    ///
355    ///     // uses the scoped_threadpool::Pool created with 8 threads
356    ///     let sum2 = inputs.par().with_pool(&mut pool).sum();
357    ///     assert_eq!(sum, sum2);
358    /// }
359    ///
360    /// #[cfg(feature = "rayon-core")]
361    /// {
362    ///     let pool = rayon_core::ThreadPoolBuilder::new()
363    ///         .num_threads(8)
364    ///         .build()
365    ///         .unwrap();
366    ///
367    ///     // uses the rayon-core::ThreadPool created with 8 threads
368    ///     let sum2 = inputs.par().with_pool(&pool).sum();
369    ///     assert_eq!(sum, sum2);
370    /// }
371    /// ```
372    fn with_pool<P: ParThreadPool>(
373        self,
374        pool: P,
375    ) -> impl ParIter<RunnerWithPool<P, R::Executor>, Item = Self::Item> {
376        let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
377        self.with_runner(runner)
378    }
379
380    // using transformations
381
382    /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
383    /// used variable throughout the computation.
384    ///
385    /// Note that each used thread will obtain exactly one instance of the variable.
386    ///
387    /// The signature of the `using` closure is `(thread_idx: usize) -> U` which will create an instance of
388    /// `U` with respect to the `thread_idx`. The `thread_idx` is the order of the spawned thread; i.e.,
389    /// if the parallel computation uses 8 threads, the thread indices will be 0, 1, ..., 7.
390    ///
391    /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
392    ///
393    /// # Examples
394    ///
395    /// ## Example 1: Channels
396    ///
397    /// The following example is taken from rayon's `for_each_with` documentation and converted to using transformation:
398    ///
399    /// ```ignore
400    /// use orx_parallel::*;
401    /// use std::sync::mpsc::channel;
402    ///
403    /// let (sender, receiver) = channel();
404    ///
405    /// (0..5)
406    ///     .into_par()
407    ///     .using(|_thread_idx| sender.clone())
408    ///     .for_each(|s, x| s.send(x).unwrap());
409    ///
410    /// let mut res: Vec<_> = receiver.iter().collect();
411    ///
412    /// res.sort();
413    ///
414    /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
415    /// ```
416    ///
417    /// ## Example 2: Random Number Generator
418    ///
419    /// Random number generator is one of the common use cases that is important for a certain class of algorithms.
420    ///
421    /// The following example demonstrates how to safely generate random numbers through mutable references within
422    /// a parallel computation.
423    ///
424    /// Notice the differences between sequential and parallel computation.
425    /// * In sequential computation, a mutable reference to `rng` is captured, while in parallel computation, we
426    ///   explicitly define that we will be `using` a random number generator.
427    /// * Parallel iterator does not mutable capture any variable from the scope; however, using transformation
428    ///   converts the `ParIter` into `ParIterUsing` which allows mutable access within all iterator methods.
429    ///
430    /// ```
431    /// use orx_parallel::*;
432    /// use rand::{Rng, SeedableRng};
433    /// use rand_chacha::ChaCha20Rng;
434    ///
435    /// fn random_walk(rng: &mut impl Rng, position: i64, num_steps: usize) -> i64 {
436    ///     (0..num_steps).fold(position, |p, _| random_step(rng, p))
437    /// }
438    ///
439    /// fn random_step(rng: &mut impl Rng, position: i64) -> i64 {
440    ///     match rng.random_bool(0.5) {
441    ///         true => position + 1,  // to right
442    ///         false => position - 1, // to left
443    ///     }
444    /// }
445    ///
446    /// fn input_positions() -> Vec<i64> {
447    ///     (-100..=100).collect()
448    /// }
449    ///
450    /// fn sequential() {
451    ///     let positions = input_positions();
452    ///
453    ///     let mut rng = ChaCha20Rng::seed_from_u64(42);
454    ///     let final_positions: Vec<_> = positions
455    ///         .iter()
456    ///         .copied()
457    ///         .map(|position| random_walk(&mut rng, position, 10))
458    ///         .collect();
459    ///     let sum_final_positions = final_positions.iter().sum::<i64>();
460    /// }
461    ///
462    /// fn parallel() {
463    ///     let positions = input_positions();
464    ///
465    ///     let final_positions: Vec<_> = positions
466    ///         .par()
467    ///         .copied()
468    ///         .using(|t_idx| ChaCha20Rng::seed_from_u64(42 * t_idx as u64))
469    ///         .map(|rng, position| random_walk(rng, position, 10))
470    ///         .collect();
471    ///     let sum_final_positions = final_positions.iter().sum::<i64>();
472    /// }
473    ///
474    /// sequential();
475    /// parallel();
476    /// ```
477    ///
478    /// ## Example 3: Metrics Collection
479    ///
480    /// The following example demonstrates how to collect metrics about a parallel computation with `using` transformation and
481    /// some `unsafe` help with interior mutability.
482    ///
483    /// ```
484    /// use orx_parallel::*;
485    /// use std::cell::UnsafeCell;
486    ///
487    /// const N: u64 = 1_000;
488    /// const MAX_NUM_THREADS: usize = 4;
489    ///
490    /// // just some work
491    /// fn fibonacci(n: u64) -> u64 {
492    ///     let mut a = 0;
493    ///     let mut b = 1;
494    ///     for _ in 0..n {
495    ///         let c = a + b;
496    ///         a = b;
497    ///         b = c;
498    ///     }
499    ///     a
500    /// }
501    ///
502    /// #[derive(Default, Debug)]
503    /// struct ThreadMetrics {
504    ///     thread_idx: usize,
505    ///     num_items_handled: usize,
506    ///     handled_42: bool,
507    ///     num_filtered_out: usize,
508    /// }
509    ///
510    /// struct ThreadMetricsWriter<'a> {
511    ///     metrics_ref: &'a mut ThreadMetrics,
512    /// }
513    ///
514    /// struct ComputationMetrics {
515    ///     thread_metrics: UnsafeCell<[ThreadMetrics; MAX_NUM_THREADS]>,
516    /// }
517    /// unsafe impl Sync for ComputationMetrics {}
518    /// impl ComputationMetrics {
519    ///     fn new() -> Self {
520    ///         let mut thread_metrics: [ThreadMetrics; MAX_NUM_THREADS] = Default::default();
521    ///         for i in 0..MAX_NUM_THREADS {
522    ///             thread_metrics[i].thread_idx = i;
523    ///         }
524    ///         Self {
525    ///             thread_metrics: UnsafeCell::new(thread_metrics),
526    ///         }
527    ///     }
528    /// }
529    ///
530    /// impl ComputationMetrics {
531    ///     unsafe fn create_for_thread<'a>(&self, thread_idx: usize) -> ThreadMetricsWriter<'a> {
532    ///         // SAFETY: here we create a mutable variable to the thread_idx-th metrics
533    ///         // * If we call this method multiple times with the same index,
534    ///         //   we create multiple mutable references to the same ThreadMetrics,
535    ///         //   which would lead to a race condition.
536    ///         // * We must make sure that `create_for_thread` is called only once per thread.
537    ///         // * If we use `create_for_thread` within the `using` call to create mutable values
538    ///         //   used by the threads, we are certain that the parallel computation
539    ///         //   will only call this method once per thread; hence, it will not
540    ///         //   cause the race condition.
541    ///         // * On the other hand, we must ensure that we do not call this method
542    ///         //   externally.
543    ///         let array = unsafe { &mut *self.thread_metrics.get() };
544    ///         ThreadMetricsWriter {
545    ///             metrics_ref: &mut array[thread_idx],
546    ///         }
547    ///     }
548    /// }
549    ///
550    /// let mut metrics = ComputationMetrics::new();
551    ///
552    /// let input: Vec<u64> = (0..N).collect();
553    ///
554    /// let sum = input
555    ///     .par()
556    ///     // SAFETY: we do not call `create_for_thread` externally;
557    ///     // it is safe if it is called only by the parallel computation.
558    ///     // Since we unsafely implement Sync for ComputationMetrics,
559    ///     // we must ensure that ComputationMetrics is not used elsewhere.
560    ///     .using(|t| unsafe { metrics.create_for_thread(t) })
561    ///     .map(|m: &mut ThreadMetricsWriter<'_>, i| {
562    ///         // collect some useful metrics
563    ///         m.metrics_ref.num_items_handled += 1;
564    ///         m.metrics_ref.handled_42 |= *i == 42;
565    ///
566    ///         // actual work
567    ///         fibonacci((*i % 20) + 1) % 100
568    ///     })
569    ///     .filter(|m, i| {
570    ///         let is_even = i % 2 == 0;
571    ///
572    ///         if !is_even {
573    ///             m.metrics_ref.num_filtered_out += 1;
574    ///         }
575    ///
576    ///         is_even
577    ///     })
578    ///     .num_threads(MAX_NUM_THREADS)
579    ///     .sum();
580    ///
581    /// assert_eq!(sum, 9100);
582    ///
583    /// let total_by_metrics: usize = metrics
584    ///     .thread_metrics
585    ///     .get_mut()
586    ///     .iter()
587    ///     .map(|x| x.num_items_handled)
588    ///     .sum();
589    ///
590    /// assert_eq!(N as usize, total_by_metrics);
591    /// ```
592    ///
593    fn using<U, F>(
594        self,
595        using: F,
596    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
597    where
598        U: 'static,
599        F: Fn(usize) -> U + Sync;
600
601    /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
602    /// used variable throughout the computation.
603    ///
604    /// Note that each used thread will obtain exactly one instance of the variable.
605    ///
606    /// Each used thread receives a clone of the provided `value`.
607    /// Note that, `using_clone(value)` can be considered as a shorthand for `using(|_thread_idx| value.clone())`.
608    ///
609    /// Please see [`using`] for examples.
610    ///
611    /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
612    ///
613    /// [`using`]: crate::ParIter::using
614    fn using_clone<U>(
615        self,
616        value: U,
617    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
618    where
619        U: Clone + 'static;
620
621    // transformations into fallible computations
622
623    /// Transforms a parallel iterator where elements are of the result type; i.e., `ParIter<R, Item = Result<T, E>>`,
624    ///  into fallible parallel iterator with item type `T` and error type `E`; i.e., into `ParIterResult<R, Item = T, Err = E>`.
625    ///
626    /// `ParIterResult` is also a parallel iterator; however, with methods specialized for handling fallible computations
627    /// as follows:
628    ///
629    /// * All of its methods are based on the success path with item type of `T`.
630    /// * However, computations short-circuit and immediately return the observed error if any of the items
631    ///   is of the `Err` variant of the result enum.
632    ///
633    /// See [`ParIterResult`] for details.
634    ///
635    /// # Examples
636    ///
637    /// ```
638    /// use orx_parallel::*;
639    ///
640    /// // all succeeds
641    ///
642    /// let result_doubled: Result<Vec<i32>, _> = ["1", "2", "3"]
643    ///     .into_par()
644    ///     .map(|x| x.parse::<i32>())  // ParIter with Item=Result<i32, ParseIntError>
645    ///     .into_fallible_result()     // ParIterResult with Item=i32 and Err=ParseIntError
646    ///     .map(|x| x * 2)             // methods focus on the success path with Item=i32
647    ///     .collect();                 // methods return Result<_, Err>
648    ///                                 // where the Ok variant depends on the computation
649    ///
650    /// assert_eq!(result_doubled, Ok(vec![2, 4, 6]));
651    ///
652    /// // at least one fails
653    ///
654    /// let result_doubled: Result<Vec<i32>, _> = ["1", "x!", "3"]
655    ///     .into_par()
656    ///     .map(|x| x.parse::<i32>())
657    ///     .into_fallible_result()
658    ///     .map(|x| x * 2)
659    ///     .collect();
660    ///
661    /// assert!(result_doubled.is_err());
662    /// ```
663    fn into_fallible_result<T, E>(self) -> impl ParIterResult<R, Item = T, Err = E>
664    where
665        Self::Item: IntoResult<T, E>;
666
667    /// Transforms a parallel iterator where elements are of the option type; i.e., `ParIter<R, Item = Option<T>>`,
668    ///  into fallible parallel iterator with item type `T`; i.e., into `ParIterOption<R, Item = T>`.
669    ///
670    /// `ParIterOption` is also a parallel iterator; however, with methods specialized for handling fallible computations
671    /// as follows:
672    ///
673    /// * All of its methods are based on the success path with item type of `T`.
674    /// * However, computations short-circuit and immediately return None if any of the items
675    ///   is of the `None` variant of the option enum.
676    ///
677    /// See [`ParIterResult`] for details.
678    ///
679    /// # Examples
680    ///
681    /// ```
682    /// use orx_parallel::*;
683    ///
684    /// // all succeeds
685    ///
686    /// let result_doubled: Option<Vec<i32>> = ["1", "2", "3"]
687    ///     .into_par()
688    ///     .map(|x| x.parse::<i32>().ok())     // ParIter with Item=Option<i32>
689    ///     .into_fallible_option()             // ParIterOption with Item=i32
690    ///     .map(|x| x * 2)                     // methods focus on the success path with Item=i32
691    ///     .collect();                         // methods return Option<T>
692    ///                                         // where T depends on the computation
693    ///
694    /// assert_eq!(result_doubled, Some(vec![2, 4, 6]));
695    ///
696    /// // at least one fails
697    ///
698    /// let result_doubled: Option<Vec<i32>> = ["1", "x!", "3"]
699    ///     .into_par()
700    ///     .map(|x| x.parse::<i32>().ok())
701    ///     .into_fallible_option()
702    ///     .map(|x| x * 2)
703    ///     .collect();
704    ///
705    /// assert_eq!(result_doubled, None);
706    /// ```
707    fn into_fallible_option<T>(self) -> impl ParIterOption<R, Item = T>
708    where
709        Self::Item: IntoOption<T>,
710    {
711        ParOption::new(
712            self.map(|x| x.into_result_with_unit_err())
713                .into_fallible_result(),
714        )
715    }
716
717    // computation transformations
718
719    /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
720    ///
721    /// # Examples
722    ///
723    /// ```
724    /// use orx_parallel::*;
725    ///
726    /// let a = [1, 2, 3];
727    ///
728    /// let iter = a.into_par().map(|x| 2 * x);
729    ///
730    /// let b: Vec<_> = iter.collect();
731    /// assert_eq!(b, &[2, 4, 6]);
732    /// ```
733    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
734    where
735        Map: Fn(Self::Item) -> Out + Sync + Clone;
736
737    /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
738    ///
739    /// # Examples
740    ///
741    /// ```
742    /// use orx_parallel::*;
743    ///
744    /// let a = [1, 2, 3];
745    ///
746    /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
747    ///
748    /// let b: Vec<_> = iter.collect();
749    /// assert_eq!(b, &[1, 3]);
750    /// ```
751    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
752    where
753        Filter: Fn(&Self::Item) -> bool + Sync + Clone;
754
755    /// Creates an iterator that works like map, but flattens nested structure.
756    ///
757    /// # Examples
758    ///
759    /// ```
760    /// use orx_parallel::*;
761    ///
762    /// let words = ["alpha", "beta", "gamma"];
763    ///
764    /// // chars() returns an iterator
765    /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
766    ///
767    /// let merged: String = all_chars.iter().collect();
768    /// assert_eq!(merged, "alphabetagamma");
769    /// ```
770    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
771    where
772        IOut: IntoIterator,
773        FlatMap: Fn(Self::Item) -> IOut + Sync + Clone;
774
775    /// Creates an iterator that both filters and maps.
776    ///
777    /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
778    ///
779    /// `filter_map` can be used to make chains of `filter` and `map` more concise.
780    /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
781    ///
782    /// # Examples
783    ///
784    /// ```
785    /// use orx_parallel::*;
786    ///
787    /// let a = ["1", "two", "NaN", "four", "5"];
788    ///
789    /// let numbers: Vec<_> = a
790    ///     .into_par()
791    ///     .filter_map(|s| s.parse::<usize>().ok())
792    ///     .collect();
793    ///
794    /// assert_eq!(numbers, [1, 5]);
795    /// ```
796    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
797    where
798        FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone;
799
800    /// Does something with each element of an iterator, passing the value on.
801    ///
802    /// When using iterators, you’ll often chain several of them together.
803    /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
804    /// To do that, insert a call to `inspect()`.
805    ///
806    /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
807    /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
808    ///
809    /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
810    /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
811    /// collect some intermediate values during parallel execution for further inspection.
812    /// The following example demonstrates such a use case.
813    ///
814    /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
815    ///
816    /// # Examples
817    ///
818    /// ```
819    /// use orx_parallel::*;
820    /// use orx_concurrent_bag::*;
821    ///
822    /// let a = vec![1, 4, 2, 3];
823    ///
824    /// // let's add some inspect() calls to investigate what's happening
825    /// // - log some events
826    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
827    /// let bag = ConcurrentBag::new();
828    ///
829    /// let sum = a
830    ///     .par()
831    ///     .copied()
832    ///     .inspect(|x| println!("about to filter: {x}"))
833    ///     .filter(|x| x % 2 == 0)
834    ///     .inspect(|x| {
835    ///         bag.push(*x);
836    ///         println!("made it through filter: {x}");
837    ///     })
838    ///     .sum();
839    /// println!("{sum}");
840    ///
841    /// let mut values_made_through = bag.into_inner();
842    /// values_made_through.sort();
843    /// assert_eq!(values_made_through, [2, 4]);
844    /// ```
845    ///
846    /// This will print:
847    ///
848    /// ```console
849    /// about to filter: 1
850    /// about to filter: 4
851    /// made it through filter: 4
852    /// about to filter: 2
853    /// made it through filter: 2
854    /// about to filter: 3
855    /// 6
856    /// ```
857    fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
858    where
859        Operation: Fn(&Self::Item) + Sync + Clone,
860    {
861        let map = move |x| {
862            operation(&x);
863            x
864        };
865        self.map(map)
866    }
867
868    // while transformations
869
870    /// Creates an iterator that yields elements based on the predicate `take_while`.
871    ///
872    /// It will call this closure on each element of the iterator, and yield elements while it returns true.
873    ///
874    /// After false is returned, take_while’s job is over, and the rest of the elements are ignored.
875    ///
876    /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
877    ///
878    /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
879    /// elements before the predicate returns false are collected in the correct order; and hence, the result
880    /// is deterministic.
881    ///
882    /// # Examples
883    ///
884    /// ```
885    /// use orx_parallel::*;
886    ///
887    /// let iter = (0..10_000).par().take_while(|x| *x != 5_000);
888    /// let b: Vec<_> = iter.collect();
889    ///
890    /// assert_eq!(b, (0..5_000).collect::<Vec<_>>());
891    /// ```
892    fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
893    where
894        While: Fn(&Self::Item) -> bool + Sync + Clone;
895
896    /// Creates an iterator that both yields elements based on the predicate `map_while` and maps.
897    ///
898    /// `map_while` takes a closure as an argument.
899    /// It will call this closure on each element of the iterator, and yield elements while it returns Some(_).
900    ///
901    /// After None is returned, map_while job is over, and the rest of the elements are ignored.
902    ///
903    /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
904    ///
905    /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
906    /// elements before the predicate returns None are collected in the correct order; and hence, the result
907    /// is deterministic.
908    ///
909    /// ```
910    /// use orx_parallel::*;
911    ///
912    /// let iter = (0..10_000)
913    ///     .par()
914    ///     .map(|x| x as i32 - 5_000) // -5_000..10_000
915    ///     .map_while(|x| 16i32.checked_div(x));
916    /// let b: Vec<_> = iter.collect();
917    ///
918    /// assert_eq!(b, (-5_000..0).map(|x| 16 / x).collect::<Vec<_>>());
919    /// ```
920    fn map_while<Out, MapWhile>(self, map_while: MapWhile) -> impl ParIter<R, Item = Out>
921    where
922        MapWhile: Fn(Self::Item) -> Option<Out> + Sync + Clone,
923    {
924        self.map(map_while).take_while(|x| x.is_some()).map(|x| {
925            // SAFETY: since x passed the whilst(is-some) check, unwrap_unchecked
926            unsafe { x.unwrap_unchecked() }
927        })
928    }
929
930    // special item transformations
931
932    /// Creates an iterator which copies all of its elements.
933    ///
934    /// # Examples
935    ///
936    /// ```
937    /// use orx_parallel::*;
938    ///
939    /// let a = vec![1, 2, 3];
940    ///
941    /// let v_copied: Vec<_> = a.par().copied().collect();
942    ///
943    /// // copied is the same as .map(|&x| x)
944    /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
945    ///
946    /// assert_eq!(v_copied, vec![1, 2, 3]);
947    /// assert_eq!(v_map, vec![1, 2, 3]);
948    /// ```
949    fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
950    where
951        T: 'a + Copy,
952        Self: ParIter<R, Item = &'a T>,
953    {
954        self.map(map_copy)
955    }
956
957    /// Creates an iterator which clones all of its elements.
958    ///
959    /// # Examples
960    ///
961    /// ```
962    /// use orx_parallel::*;
963    ///
964    /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
965    ///
966    /// let v_cloned: Vec<_> = a.par().cloned().collect();
967    ///
968    /// // cloned is the same as .map(|x| x.clone())
969    /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
970    ///
971    /// assert_eq!(
972    ///     v_cloned,
973    ///     vec![String::from("1"), String::from("2"), String::from("3")]
974    /// );
975    /// assert_eq!(
976    ///     v_map,
977    ///     vec![String::from("1"), String::from("2"), String::from("3")]
978    /// );
979    /// ```
980    fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
981    where
982        T: 'a + Clone,
983        Self: ParIter<R, Item = &'a T>,
984    {
985        self.map(map_clone)
986    }
987
988    /// Creates an iterator that flattens nested structure.
989    ///
990    /// This is useful when you have an iterator of iterators or an iterator of things that can be
991    /// turned into iterators and you want to remove one level of indirection.
992    ///
993    /// # Examples
994    ///
995    /// Basic usage.
996    ///
997    /// ```
998    /// use orx_parallel::*;
999    ///
1000    /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
1001    /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
1002    /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
1003    /// ```
1004    ///
1005    /// Mapping and then flattening:
1006    ///
1007    /// ```
1008    /// use orx_parallel::*;
1009    ///
1010    /// let words = vec!["alpha", "beta", "gamma"];
1011    ///
1012    /// // chars() returns an iterator
1013    /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
1014    /// let merged: String = all_characters.into_iter().collect();
1015    /// assert_eq!(merged, "alphabetagamma");
1016    /// ```
1017    ///
1018    /// But actually, you can write this in terms of `flat_map`,
1019    /// which is preferable in this case since it conveys intent more clearly:
1020    ///
1021    /// ```
1022    /// use orx_parallel::*;
1023    ///
1024    /// let words = vec!["alpha", "beta", "gamma"];
1025    ///
1026    /// // chars() returns an iterator
1027    /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
1028    /// let merged: String = all_characters.into_iter().collect();
1029    /// assert_eq!(merged, "alphabetagamma");
1030    /// ```
1031    fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
1032    where
1033        Self::Item: IntoIterator,
1034    {
1035        let map = |e: Self::Item| e.into_iter();
1036        self.flat_map(map)
1037    }
1038
1039    // collect
1040
1041    /// Collects all the items from an iterator into a collection.
1042    ///
1043    /// This is useful when you already have a collection and want to add the iterator items to it.
1044    ///
1045    /// The collection is passed in as owned value, and returned back with the additional elements.
1046    ///
1047    /// All collections implementing [`ParCollectInto`] can be used to collect into.
1048    ///
1049    /// [`ParCollectInto`]: crate::ParCollectInto
1050    ///
1051    /// # Examples
1052    ///
1053    /// ```
1054    /// use orx_parallel::*;
1055    ///
1056    /// let a = vec![1, 2, 3];
1057    ///
1058    /// let vec: Vec<i32> = vec![0, 1];
1059    /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
1060    /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
1061    ///
1062    /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
1063    /// ```
1064    fn collect_into<C>(self, output: C) -> C
1065    where
1066        C: ParCollectInto<Self::Item>;
1067
1068    /// Transforms an iterator into a collection.
1069    ///
1070    /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
1071    /// the type of the result collection; or turbofish annotation can be used.
1072    ///
1073    /// All collections implementing [`ParCollectInto`] can be used to collect into.
1074    ///
1075    /// [`ParCollectInto`]: crate::ParCollectInto
1076    ///
1077    /// # Examples
1078    ///
1079    /// ```
1080    /// use orx_parallel::*;
1081    ///
1082    /// let a = vec![1, 2, 3];
1083    ///
1084    /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
1085    ///
1086    /// assert_eq!(vec![2, 4, 6], doubled);
1087    /// ```
1088    fn collect<C>(self) -> C
1089    where
1090        C: ParCollectInto<Self::Item>,
1091    {
1092        let output = C::empty(self.con_iter().try_get_len());
1093        self.collect_into(output)
1094    }
1095
1096    // reduce
1097
1098    /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
1099    ///
1100    /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
1101    ///
1102    /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
1103    ///
1104    /// # Example
1105    ///
1106    /// ```
1107    /// use orx_parallel::*;
1108    ///
1109    /// let inputs = 1..10;
1110    /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
1111    /// assert_eq!(reduced, 45);
1112    /// ```
1113    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
1114    where
1115        Self::Item: Send,
1116        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
1117
1118    /// Tests if every element of the iterator matches a predicate.
1119    ///
1120    /// `all` takes a `predicate` that returns true or false.
1121    /// It applies this closure to each element of the iterator,
1122    /// and if they all return true, then so does `all`.
1123    /// If any of them returns false, it returns false.
1124    ///
1125    /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
1126    /// given that no matter what else happens, the result will also be false.
1127    ///
1128    /// An empty iterator returns true.
1129    ///
1130    /// # Examples
1131    ///
1132    /// ```
1133    /// use orx_parallel::*;
1134    ///
1135    /// let mut a = vec![1, 2, 3];
1136    /// assert!(a.par().all(|x| **x > 0));
1137    /// assert!(!a.par().all(|x| **x > 2));
1138    ///
1139    /// a.clear();
1140    /// assert!(a.par().all(|x| **x > 2)); // empty iterator
1141    /// ```
1142    fn all<Predicate>(self, predicate: Predicate) -> bool
1143    where
1144        Self::Item: Send,
1145        Predicate: Fn(&Self::Item) -> bool + Sync,
1146    {
1147        let violates = |x: &Self::Item| !predicate(x);
1148        self.find(violates).is_none()
1149    }
1150
1151    /// Tests if any element of the iterator matches a predicate.
1152    ///
1153    /// `any` takes a `predicate` that returns true or false.
1154    /// It applies this closure to each element of the iterator,
1155    /// and if any of the elements returns true, then so does `any`.
1156    /// If all of them return false, it returns false.
1157    ///
1158    /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
1159    /// given that no matter what else happens, the result will also be true.
1160    ///
1161    /// An empty iterator returns false.
1162    ///
1163    /// # Examples
1164    ///
1165    /// ```
1166    /// use orx_parallel::*;
1167    ///
1168    /// let mut a = vec![1, 2, 3];
1169    /// assert!(a.par().any(|x| **x > 0));
1170    /// assert!(!a.par().any(|x| **x > 5));
1171    ///
1172    /// a.clear();
1173    /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
1174    /// ```
1175    fn any<Predicate>(self, predicate: Predicate) -> bool
1176    where
1177        Self::Item: Send,
1178        Predicate: Fn(&Self::Item) -> bool + Sync,
1179    {
1180        self.find(predicate).is_some()
1181    }
1182
1183    /// Consumes the iterator, counting the number of iterations and returning it.
1184    ///
1185    /// # Examples
1186    ///
1187    /// ```
1188    /// use orx_parallel::*;
1189    ///
1190    /// let a = vec![1, 2, 3];
1191    /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
1192    /// ```
1193    fn count(self) -> usize {
1194        self.map(map_count).reduce(reduce_sum).unwrap_or(0)
1195    }
1196
1197    /// Calls a closure on each element of an iterator.
1198    ///
1199    /// # Examples
1200    ///
1201    /// Basic usage:
1202    ///
1203    /// ```
1204    /// use orx_parallel::*;
1205    /// use std::sync::mpsc::channel;
1206    ///
1207    /// let (tx, rx) = channel();
1208    /// (0..5)
1209    ///     .par()
1210    ///     .map(|x| x * 2 + 1)
1211    ///     .for_each(move |x| tx.send(x).unwrap());
1212    ///
1213    /// let mut v: Vec<_> = rx.iter().collect();
1214    /// v.sort(); // order can be mixed, since messages will be sent in parallel
1215    /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
1216    /// ```
1217    ///
1218    /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
1219    /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
1220    /// In the following example, we log every element that satisfies a predicate in parallel.
1221    ///
1222    /// ```
1223    /// use orx_parallel::*;
1224    ///
1225    /// (0..5)
1226    ///     .par()
1227    ///     .flat_map(|x| x * 100..x * 110)
1228    ///     .filter(|&x| x % 3 == 0)
1229    ///     .for_each(|x| println!("{x}"));
1230    /// ```
1231    fn for_each<Operation>(self, operation: Operation)
1232    where
1233        Operation: Fn(Self::Item) + Sync,
1234    {
1235        let map = |x| operation(x);
1236        let _ = self.map(map).reduce(reduce_unit);
1237    }
1238
1239    /// Returns the maximum element of an iterator.
1240    ///
1241    /// If the iterator is empty, None is returned.
1242    ///
1243    /// # Examples
1244    ///
1245    /// ```
1246    /// use orx_parallel::*;
1247    ///
1248    /// let a = vec![1, 2, 3];
1249    /// let b: Vec<u32> = Vec::new();
1250    ///
1251    /// assert_eq!(a.par().max(), Some(&3));
1252    /// assert_eq!(b.par().max(), None);
1253    /// ```
1254    fn max(self) -> Option<Self::Item>
1255    where
1256        Self::Item: Ord + Send,
1257    {
1258        self.reduce(Ord::max)
1259    }
1260
1261    /// Returns the element that gives the maximum value with respect to the specified `compare` function.
1262    ///
1263    /// If the iterator is empty, None is returned.
1264    ///
1265    /// # Examples
1266    ///
1267    /// ```
1268    /// use orx_parallel::*;
1269    ///
1270    /// let a = vec![-3_i32, 0, 1, 5, -10];
1271    /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
1272    /// ```
1273    fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1274    where
1275        Self::Item: Send,
1276        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1277    {
1278        let reduce = |x, y| match compare(&x, &y) {
1279            Ordering::Greater | Ordering::Equal => x,
1280            Ordering::Less => y,
1281        };
1282        self.reduce(reduce)
1283    }
1284
1285    /// Returns the element that gives the maximum value from the specified function.
1286    ///
1287    /// If the iterator is empty, None is returned.
1288    ///
1289    /// # Examples
1290    ///
1291    /// ```
1292    /// use orx_parallel::*;
1293    ///
1294    /// let a = vec![-3_i32, 0, 1, 5, -10];
1295    /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
1296    /// ```
1297    fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
1298    where
1299        Self::Item: Send,
1300        Key: Ord,
1301        GetKey: Fn(&Self::Item) -> Key + Sync,
1302    {
1303        let reduce = |x, y| match key(&x).cmp(&key(&y)) {
1304            Ordering::Greater | Ordering::Equal => x,
1305            Ordering::Less => y,
1306        };
1307        self.reduce(reduce)
1308    }
1309
1310    /// Returns the minimum element of an iterator.
1311    ///
1312    /// If the iterator is empty, None is returned.
1313    ///
1314    /// # Examples
1315    ///
1316    /// ```
1317    /// use orx_parallel::*;
1318    ///
1319    /// let a = vec![1, 2, 3];
1320    /// let b: Vec<u32> = Vec::new();
1321    ///
1322    /// assert_eq!(a.par().min(), Some(&1));
1323    /// assert_eq!(b.par().min(), None);
1324    /// ```
1325    fn min(self) -> Option<Self::Item>
1326    where
1327        Self::Item: Ord + Send,
1328    {
1329        self.reduce(Ord::min)
1330    }
1331
1332    /// Returns the element that gives the minimum value with respect to the specified `compare` function.
1333    ///
1334    /// If the iterator is empty, None is returned.
1335    ///
1336    /// # Examples
1337    ///
1338    /// ```
1339    /// use orx_parallel::*;
1340    ///
1341    /// let a = vec![-3_i32, 0, 1, 5, -10];
1342    /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
1343    /// ```
1344    fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1345    where
1346        Self::Item: Send,
1347        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1348    {
1349        let reduce = |x, y| match compare(&x, &y) {
1350            Ordering::Less | Ordering::Equal => x,
1351            Ordering::Greater => y,
1352        };
1353        self.reduce(reduce)
1354    }
1355
1356    /// Returns the element that gives the minimum value from the specified function.
1357    ///
1358    /// If the iterator is empty, None is returned.
1359    ///
1360    /// # Examples
1361    ///
1362    /// ```
1363    /// use orx_parallel::*;
1364    ///
1365    /// let a = vec![-3_i32, 0, 1, 5, -10];
1366    /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
1367    /// ```
1368    fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
1369    where
1370        Self::Item: Send,
1371        Key: Ord,
1372        GetKey: Fn(&Self::Item) -> Key + Sync,
1373    {
1374        let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1375            Ordering::Less | Ordering::Equal => x,
1376            Ordering::Greater => y,
1377        };
1378        self.reduce(reduce)
1379    }
1380
1381    /// Sums the elements of an iterator.
1382    ///
1383    /// Takes each element, adds them together, and returns the result.
1384    ///
1385    /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
1386    ///
1387    /// `sum` can be used to sum any type implementing [`Sum<Out>`].
1388    ///
1389    /// [`Sum<Out>`]: crate::Sum
1390    ///
1391    /// # Examples
1392    ///
1393    /// ```
1394    /// use orx_parallel::*;
1395    ///
1396    /// let a = vec![1, 2, 3];
1397    /// let sum: i32 = a.par().sum();
1398    ///
1399    /// assert_eq!(sum, 6);
1400    /// ```
1401    fn sum<Out>(self) -> Out
1402    where
1403        Self::Item: Sum<Out>,
1404        Out: Send,
1405    {
1406        self.map(Self::Item::map)
1407            .reduce(Self::Item::reduce)
1408            .unwrap_or(Self::Item::zero())
1409    }
1410
1411    // early exit
1412
1413    /// Returns the first (or any) element of the iterator; returns None if it is empty.
1414    ///
1415    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1416    /// * any element is returned if `IterationOrder::Arbitrary` is set.
1417    ///
1418    /// # Examples
1419    ///
1420    /// The following example demonstrates the usage of first with default `Ordered` iteration.
1421    /// This guarantees that the first element with respect to position in the input sequence
1422    /// is returned.
1423    ///
1424    /// ```
1425    /// use orx_parallel::*;
1426    ///
1427    /// let a: Vec<usize> = vec![];
1428    /// assert_eq!(a.par().copied().first(), None);
1429    ///
1430    /// let a = vec![1, 2, 3];
1431    /// assert_eq!(a.par().copied().first(), Some(1));
1432    ///
1433    /// let a = 1..10_000;
1434    /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
1435    /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
1436    ///
1437    /// // or equivalently,
1438    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1439    /// ```
1440    ///
1441    /// When the order is set to `Arbitrary`, `first` might return any of the elements,
1442    /// whichever is visited first depending on the parallel execution.
1443    ///
1444    /// ```
1445    /// use orx_parallel::*;
1446    ///
1447    /// let a = 1..10_000;
1448    ///
1449    /// // might return either of 3421 or 2*3421
1450    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
1451    /// assert!([3421, 2 * 3421].contains(&any));
1452    ///
1453    /// // or equivalently,
1454    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1455    /// assert!([3421, 2 * 3421].contains(&any));
1456    /// ```
1457    fn first(self) -> Option<Self::Item>
1458    where
1459        Self::Item: Send;
1460
1461    /// Searches for an element of an iterator that satisfies a `predicate`.
1462    ///
1463    /// Depending on the set iteration order of the parallel iterator, returns
1464    ///
1465    /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
1466    /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
1467    ///
1468    /// `find` takes a closure that returns true or false.
1469    /// It applies this closure to each element of the iterator,
1470    /// and returns `Some(x)` where `x` is the first element that returns true.
1471    /// If they all return false, it returns None.
1472    ///
1473    /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
1474    ///
1475    /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
1476    ///
1477    /// # Examples
1478    ///
1479    /// The following example demonstrates the usage of first with default `Ordered` iteration.
1480    /// This guarantees that the first element with respect to position in the input sequence
1481    /// is returned.
1482    ///
1483    /// ```
1484    /// use orx_parallel::*;
1485    ///
1486    /// let a = 1..10_000;
1487    /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
1488    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1489    /// ```
1490    ///
1491    /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
1492    /// whichever is found first depending on the parallel execution.
1493    ///
1494    /// ```
1495    /// use orx_parallel::*;
1496    ///
1497    /// let a = 1..10_000;
1498    ///
1499    /// // might return either of 3421 or 2*3421
1500    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1501    /// assert!([3421, 2 * 3421].contains(&any));
1502    /// ```
1503    fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
1504    where
1505        Self::Item: Send,
1506        Predicate: Fn(&Self::Item) -> bool + Sync,
1507    {
1508        self.filter(&predicate).first()
1509    }
1510}