orx_parallel/
par_iter.rs

1use crate::ParIterResult;
2use crate::computational_variants::fallible_option::ParOption;
3use crate::par_iter_option::{IntoOption, ParIterOption};
4use crate::par_iter_result::IntoResult;
5use crate::using::{UsingClone, UsingFun};
6use crate::{
7    ParIterUsing, Params,
8    collect_into::ParCollectInto,
9    computations::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
10    parameters::{ChunkSize, IterationOrder, NumThreads},
11    runner::{DefaultRunner, ParallelRunner},
12    special_type_sets::Sum,
13};
14use core::cmp::Ordering;
15use orx_concurrent_iter::ConcurrentIter;
16
17/// Parallel iterator.
18pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
19where
20    R: ParallelRunner,
21{
22    /// Element type of the parallel iterator.
23    type Item;
24
25    /// Returns a reference to the input concurrent iterator.
26    fn con_iter(&self) -> &impl ConcurrentIter;
27
28    /// Parameters of the parallel iterator.
29    ///
30    /// # Examples
31    ///
32    /// ```
33    /// use orx_parallel::*;
34    /// use std::num::NonZero;
35    ///
36    /// let vec = vec![1, 2, 3, 4];
37    ///
38    /// assert_eq!(
39    ///     vec.par().params(),
40    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
41    /// );
42    ///
43    /// assert_eq!(
44    ///     vec.par().num_threads(0).chunk_size(0).params(),
45    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
46    /// );
47    ///
48    /// assert_eq!(
49    ///     vec.par().num_threads(1).params(),
50    ///     Params::new(
51    ///         NumThreads::Max(NonZero::new(1).unwrap()),
52    ///         ChunkSize::Auto,
53    ///         IterationOrder::Ordered
54    ///     )
55    /// );
56    ///
57    /// assert_eq!(
58    ///     vec.par().num_threads(4).chunk_size(64).params(),
59    ///     Params::new(
60    ///         NumThreads::Max(NonZero::new(4).unwrap()),
61    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
62    ///         IterationOrder::Ordered
63    ///     )
64    /// );
65    ///
66    /// assert_eq!(
67    ///     vec.par()
68    ///         .num_threads(8)
69    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
70    ///         .iteration_order(IterationOrder::Arbitrary)
71    ///         .params(),
72    ///     Params::new(
73    ///         NumThreads::Max(NonZero::new(8).unwrap()),
74    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
75    ///         IterationOrder::Arbitrary
76    ///     )
77    /// );
78    /// ```
79    fn params(&self) -> Params;
80
81    // params transformations
82
83    /// Sets the number of threads to be used in the parallel execution.
84    /// Integers can be used as the argument with the following mapping:
85    ///
86    /// * `0` -> `NumThreads::Auto`
87    /// * `1` -> `NumThreads::sequential()`
88    /// * `n > 0` -> `NumThreads::Max(n)`
89    ///
90    /// See [`NumThreads`] for details.
91    ///
92    /// # Examples
93    ///
94    /// ```
95    /// use orx_parallel::*;
96    /// use std::num::NonZero;
97    ///
98    /// let vec = vec![1, 2, 3, 4];
99    ///
100    /// // all available threads can be used
101    ///
102    /// assert_eq!(
103    ///     vec.par().params(),
104    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
105    /// );
106    ///
107    /// assert_eq!(
108    ///     vec.par().num_threads(0).chunk_size(0).params(),
109    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
110    /// );
111    ///
112    /// // computation will be executed sequentially on the main thread, no parallelization
113    ///
114    /// assert_eq!(
115    ///     vec.par().num_threads(1).params(),
116    ///     Params::new(
117    ///         NumThreads::Max(NonZero::new(1).unwrap()),
118    ///         ChunkSize::Auto,
119    ///         IterationOrder::Ordered
120    ///     )
121    /// );
122    ///
123    /// // maximum 4 threads can be used
124    /// assert_eq!(
125    ///     vec.par().num_threads(4).chunk_size(64).params(),
126    ///     Params::new(
127    ///         NumThreads::Max(NonZero::new(4).unwrap()),
128    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
129    ///         IterationOrder::Ordered
130    ///     )
131    /// );
132    ///
133    /// // maximum 8 threads can be used
134    /// assert_eq!(
135    ///     vec.par()
136    ///         .num_threads(8)
137    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
138    ///         .iteration_order(IterationOrder::Arbitrary)
139    ///         .params(),
140    ///     Params::new(
141    ///         NumThreads::Max(NonZero::new(8).unwrap()),
142    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
143    ///         IterationOrder::Arbitrary
144    ///     )
145    /// );
146    /// ```
147    fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
148
149    /// Sets the number of elements to be pulled from the concurrent iterator during the
150    /// parallel execution. When integers are used as argument, the following mapping applies:
151    ///
152    /// * `0` -> `ChunkSize::Auto`
153    /// * `n > 0` -> `ChunkSize::Exact(n)`
154    ///
155    /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
156    ///
157    /// See [`ChunkSize`] for details.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// use orx_parallel::*;
163    /// use std::num::NonZero;
164    ///
165    /// let vec = vec![1, 2, 3, 4];
166    ///
167    /// // chunk sizes will be dynamically decided by the parallel runner
168    ///
169    /// assert_eq!(
170    ///     vec.par().params(),
171    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
172    /// );
173    ///
174    /// assert_eq!(
175    ///     vec.par().num_threads(0).chunk_size(0).params(),
176    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
177    /// );
178    ///
179    /// assert_eq!(
180    ///     vec.par().num_threads(1).params(),
181    ///     Params::new(
182    ///         NumThreads::Max(NonZero::new(1).unwrap()),
183    ///         ChunkSize::Auto,
184    ///         IterationOrder::Ordered
185    ///     )
186    /// );
187    ///
188    /// // chunk size will always be 64, parallel runner cannot change
189    ///
190    /// assert_eq!(
191    ///     vec.par().num_threads(4).chunk_size(64).params(),
192    ///     Params::new(
193    ///         NumThreads::Max(NonZero::new(4).unwrap()),
194    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
195    ///         IterationOrder::Ordered
196    ///     )
197    /// );
198    ///
199    /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
200    ///
201    /// assert_eq!(
202    ///     vec.par()
203    ///         .num_threads(8)
204    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
205    ///         .iteration_order(IterationOrder::Arbitrary)
206    ///         .params(),
207    ///     Params::new(
208    ///         NumThreads::Max(NonZero::new(8).unwrap()),
209    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
210    ///         IterationOrder::Arbitrary
211    ///     )
212    /// );
213    /// ```
214    fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
215
216    /// Sets the iteration order of the parallel computation.
217    ///
218    /// See [`IterationOrder`] for details.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use orx_parallel::*;
224    ///
225    /// let vec = vec![1, 2, 3, 4];
226    ///
227    /// // results are collected in order consistent to the input order,
228    /// // or find returns the first element satisfying the predicate
229    ///
230    /// assert_eq!(
231    ///     vec.par().params(),
232    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
233    /// );
234    ///
235    /// assert_eq!(
236    ///     vec.par().iteration_order(IterationOrder::Ordered).params(),
237    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
238    /// );
239    ///
240    /// // results might be collected in arbitrary order
241    /// // or find returns the any of the elements satisfying the predicate
242    ///
243    /// assert_eq!(
244    ///     vec.par().iteration_order(IterationOrder::Arbitrary).params(),
245    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
246    /// );
247    /// ```
248    fn iteration_order(self, order: IterationOrder) -> Self;
249
250    /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
251    ///
252    /// # Examples
253    ///
254    /// ```ignore
255    /// use orx_parallel::*;
256    ///
257    /// let inputs = vec![1, 2, 3, 4];
258    ///
259    /// // uses the default runner
260    /// let sum = inputs.par().sum();
261    ///
262    /// // uses the custom parallel runner MyParallelRunner: ParallelRunner
263    /// let sum = inputs.par().with_runner::<MyParallelRunner>().sum();
264    /// ```
265    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item>;
266
267    // using transformations
268
269    /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
270    /// used variable throughout the computation.
271    ///
272    /// Note that each used thread will obtain exactly one instance of the variable.
273    ///
274    /// The signature of the `using` closure is `(thread_idx: usize) -> U` which will create an instance of
275    /// `U` with respect to the `thread_idx`. The `thread_idx` is the order of the spawned thread; i.e.,
276    /// if the parallel computation uses 8 threads, the thread indices will be 0, 1, ..., 7.
277    ///
278    /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
279    ///
280    /// # Examples
281    ///
282    /// ## Example 1: Channels
283    ///
284    /// The following example is taken from rayon's `for_each_with` documentation and converted to using transformation:
285    ///
286    /// ```ignore
287    /// use orx_parallel::*;
288    /// use std::sync::mpsc::channel;
289    ///
290    /// let (sender, receiver) = channel();
291    ///
292    /// (0..5)
293    ///     .into_par()
294    ///     .using(|_thread_idx| sender.clone())
295    ///     .for_each(|s, x| s.send(x).unwrap());
296    ///
297    /// let mut res: Vec<_> = receiver.iter().collect();
298    ///
299    /// res.sort();
300    ///
301    /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
302    /// ```
303    ///
304    /// ## Example 2: Random Number Generator
305    ///
306    /// Random number generator is one of the common use cases that is important for a certain class of algorithms.
307    ///
308    /// The following example demonstrates how to safely generate random numbers through mutable references within
309    /// a parallel computation.
310    ///
311    /// Notice the differences between sequential and parallel computation.
312    /// * In sequential computation, a mutable reference to `rng` is captured, while in parallel computation, we
313    ///   explicitly define that we will be `using` a random number generator.
314    /// * Parallel iterator does not mutable capture any variable from the scope; however, using transformation
315    ///   converts the `ParIter` into `ParIterUsing` which allows mutable access within all iterator methods.
316    ///
317    /// ```
318    /// use orx_parallel::*;
319    /// use rand::{Rng, SeedableRng};
320    /// use rand_chacha::ChaCha20Rng;
321    ///
322    /// fn random_walk(rng: &mut impl Rng, position: i64, num_steps: usize) -> i64 {
323    ///     (0..num_steps).fold(position, |p, _| random_step(rng, p))
324    /// }
325    ///
326    /// fn random_step(rng: &mut impl Rng, position: i64) -> i64 {
327    ///     match rng.random_bool(0.5) {
328    ///         true => position + 1,  // to right
329    ///         false => position - 1, // to left
330    ///     }
331    /// }
332    ///
333    /// fn input_positions() -> Vec<i64> {
334    ///     (-100..=100).collect()
335    /// }
336    ///
337    /// fn sequential() {
338    ///     let positions = input_positions();
339    ///
340    ///     let mut rng = ChaCha20Rng::seed_from_u64(42);
341    ///     let final_positions: Vec<_> = positions
342    ///         .iter()
343    ///         .copied()
344    ///         .map(|position| random_walk(&mut rng, position, 10))
345    ///         .collect();
346    ///     let sum_final_positions = final_positions.iter().sum::<i64>();
347    /// }
348    ///
349    /// fn parallel() {
350    ///     let positions = input_positions();
351    ///
352    ///     let final_positions: Vec<_> = positions
353    ///         .par()
354    ///         .copied()
355    ///         .using(|t_idx| ChaCha20Rng::seed_from_u64(42 * t_idx as u64))
356    ///         .map(|rng, position| random_walk(rng, position, 10))
357    ///         .collect();
358    ///     let sum_final_positions = final_positions.iter().sum::<i64>();
359    /// }
360    ///
361    /// sequential();
362    /// parallel();
363    /// ```
364    ///
365    /// ## Example 3: Metrics Collection
366    ///
367    /// The following example demonstrates how to collect metrics about a parallel computation with `using` transformation and
368    /// some `unsafe` help with interior mutability.
369    ///
370    /// ```
371    /// use orx_parallel::*;
372    /// use std::cell::UnsafeCell;
373    ///
374    /// const N: u64 = 1_000;
375    /// const MAX_NUM_THREADS: usize = 4;
376    ///
377    /// // just some work
378    /// fn fibonacci(n: u64) -> u64 {
379    ///     let mut a = 0;
380    ///     let mut b = 1;
381    ///     for _ in 0..n {
382    ///         let c = a + b;
383    ///         a = b;
384    ///         b = c;
385    ///     }
386    ///     a
387    /// }
388    ///
389    /// #[derive(Default, Debug)]
390    /// struct ThreadMetrics {
391    ///     thread_idx: usize,
392    ///     num_items_handled: usize,
393    ///     handled_42: bool,
394    ///     num_filtered_out: usize,
395    /// }
396    ///
397    /// struct ThreadMetricsWriter<'a> {
398    ///     metrics_ref: &'a mut ThreadMetrics,
399    /// }
400    ///
401    /// struct ComputationMetrics {
402    ///     thread_metrics: UnsafeCell<[ThreadMetrics; MAX_NUM_THREADS]>,
403    /// }
404    /// impl ComputationMetrics {
405    ///     fn new() -> Self {
406    ///         let mut thread_metrics: [ThreadMetrics; MAX_NUM_THREADS] = Default::default();
407    ///         for i in 0..MAX_NUM_THREADS {
408    ///             thread_metrics[i].thread_idx = i;
409    ///         }
410    ///         Self {
411    ///             thread_metrics: UnsafeCell::new(thread_metrics),
412    ///         }
413    ///     }
414    /// }
415    ///
416    /// impl ComputationMetrics {
417    ///     unsafe fn create_for_thread<'a>(&mut self, thread_idx: usize) -> ThreadMetricsWriter<'a> {
418    ///         // SAFETY: here we create a mutable variable to the thread_idx-th metrics
419    ///         // * If we call this method multiple times with the same index,
420    ///         //   we create multiple mutable references to the same ThreadMetrics,
421    ///         //   which would lead to a race condition.
422    ///         // * We must make sure that `create_for_thread` is called only once per thread.
423    ///         // * If we use `create_for_thread` within the `using` call to create mutable values
424    ///         //   used by the threads, we are certain that the parallel computation
425    ///         //   will only call this method once per thread; hence, it will not
426    ///         //   cause the race condition.
427    ///         // * On the other hand, we must ensure that we do not call this method
428    ///         //   externally.
429    ///         let array = unsafe { &mut *self.thread_metrics.get() };
430    ///         ThreadMetricsWriter {
431    ///             metrics_ref: &mut array[thread_idx],
432    ///         }
433    ///     }
434    /// }
435    ///
436    /// let mut metrics = ComputationMetrics::new();
437    ///
438    /// let input: Vec<u64> = (0..N).collect();
439    ///
440    /// let sum = input
441    ///     .par()
442    ///     // SAFETY: we do not call `create_for_thread` externally;
443    ///     // it is safe if it is called only by the parallel computation.
444    ///     .using(|t| unsafe { metrics.create_for_thread(t) })
445    ///     .map(|m: &mut ThreadMetricsWriter<'_>, i| {
446    ///         // collect some useful metrics
447    ///         m.metrics_ref.num_items_handled += 1;
448    ///         m.metrics_ref.handled_42 |= *i == 42;
449    ///
450    ///         // actual work
451    ///         fibonacci((*i % 20) + 1) % 100
452    ///     })
453    ///     .filter(|m, i| {
454    ///         let is_even = i % 2 == 0;
455    ///
456    ///         if !is_even {
457    ///             m.metrics_ref.num_filtered_out += 1;
458    ///         }
459    ///
460    ///         is_even
461    ///     })
462    ///     .num_threads(MAX_NUM_THREADS)
463    ///     .sum();
464    ///
465    /// let total_by_metrics: usize = metrics
466    ///     .thread_metrics
467    ///     .get_mut()
468    ///     .iter()
469    ///     .map(|x| x.num_items_handled)
470    ///     .sum();
471    ///
472    /// assert_eq!(N as usize, total_by_metrics);
473    /// ```
474    ///
475    fn using<U, F>(
476        self,
477        using: F,
478    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
479    where
480        U: Send + 'static,
481        F: FnMut(usize) -> U;
482
483    /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
484    /// used variable throughout the computation.
485    ///
486    /// Note that each used thread will obtain exactly one instance of the variable.
487    ///
488    /// Each used thread receives a clone of the provided `value`.
489    /// Note that, `using_clone(value)` can be considered as a shorthand for `using(|_thread_idx| value.clone())`.
490    ///
491    /// Please see [`using`] for examples.
492    ///
493    /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
494    ///
495    /// [`using`]: crate::ParIter::using
496    fn using_clone<U>(
497        self,
498        value: U,
499    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
500    where
501        U: Clone + Send + 'static;
502
503    // transformations into fallible computations
504
505    /// Transforms a parallel iterator where elements are of the result type; i.e., `ParIter<R, Item = Result<T, E>>`,
506    ///  into fallible parallel iterator with item type `T` and error type `E`; i.e., into `ParIterResult<R, Item = T, Err = E>`.
507    ///
508    /// `ParIterResult` is also a parallel iterator; however, with methods specialized for handling fallible computations
509    /// as follows:
510    ///
511    /// * All of its methods are based on the success path with item type of `T`.
512    /// * However, computations short-circuit and immediately return the observed error if any of the items
513    ///   is of the `Err` variant of the result enum.
514    ///
515    /// See [`ParIterResult`] for details.
516    ///
517    /// # Examples
518    ///
519    /// ```
520    /// use orx_parallel::*;
521    ///
522    /// // all succeeds
523    ///
524    /// let result_doubled: Result<Vec<i32>, _> = ["1", "2", "3"]
525    ///     .into_par()
526    ///     .map(|x| x.parse::<i32>())  // ParIter with Item=Result<i32, ParseIntError>
527    ///     .into_fallible_result()     // ParIterResult with Item=i32 and Err=ParseIntError
528    ///     .map(|x| x * 2)             // methods focus on the success path with Item=i32
529    ///     .collect();                 // methods return Result<_, Err>
530    ///                                 // where the Ok variant depends on the computation
531    ///
532    /// assert_eq!(result_doubled, Ok(vec![2, 4, 6]));
533    ///
534    /// // at least one fails
535    ///
536    /// let result_doubled: Result<Vec<i32>, _> = ["1", "x!", "3"]
537    ///     .into_par()
538    ///     .map(|x| x.parse::<i32>())
539    ///     .into_fallible_result()
540    ///     .map(|x| x * 2)
541    ///     .collect();
542    ///
543    /// assert!(result_doubled.is_err());
544    /// ```
545    fn into_fallible_result<T, E>(self) -> impl ParIterResult<R, Item = T, Err = E>
546    where
547        Self::Item: IntoResult<T, E>;
548
549    /// Transforms a parallel iterator where elements are of the option type; i.e., `ParIter<R, Item = Option<T>>`,
550    ///  into fallible parallel iterator with item type `T`; i.e., into `ParIterOption<R, Item = T>`.
551    ///
552    /// `ParIterOption` is also a parallel iterator; however, with methods specialized for handling fallible computations
553    /// as follows:
554    ///
555    /// * All of its methods are based on the success path with item type of `T`.
556    /// * However, computations short-circuit and immediately return None if any of the items
557    ///   is of the `None` variant of the option enum.
558    ///
559    /// See [`ParIterResult`] for details.
560    ///
561    /// # Examples
562    ///
563    /// ```
564    /// use orx_parallel::*;
565    ///
566    /// // all succeeds
567    ///
568    /// let result_doubled: Option<Vec<i32>> = ["1", "2", "3"]
569    ///     .into_par()
570    ///     .map(|x| x.parse::<i32>().ok())     // ParIter with Item=Option<i32>
571    ///     .into_fallible_option()             // ParIterOption with Item=i32
572    ///     .map(|x| x * 2)                     // methods focus on the success path with Item=i32
573    ///     .collect();                         // methods return Option<T>
574    ///                                         // where T depends on the computation
575    ///
576    /// assert_eq!(result_doubled, Some(vec![2, 4, 6]));
577    ///
578    /// // at least one fails
579    ///
580    /// let result_doubled: Option<Vec<i32>> = ["1", "x!", "3"]
581    ///     .into_par()
582    ///     .map(|x| x.parse::<i32>().ok())
583    ///     .into_fallible_option()
584    ///     .map(|x| x * 2)
585    ///     .collect();
586    ///
587    /// assert_eq!(result_doubled, None);
588    /// ```
589    fn into_fallible_option<T>(self) -> impl ParIterOption<R, Item = T>
590    where
591        Self::Item: IntoOption<T>,
592    {
593        ParOption::new(
594            self.map(|x| x.into_result_with_unit_err())
595                .into_fallible_result(),
596        )
597    }
598
599    // computation transformations
600
601    /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// use orx_parallel::*;
607    ///
608    /// let a = [1, 2, 3];
609    ///
610    /// let iter = a.into_par().map(|x| 2 * x);
611    ///
612    /// let b: Vec<_> = iter.collect();
613    /// assert_eq!(b, &[2, 4, 6]);
614    /// ```
615    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
616    where
617        Map: Fn(Self::Item) -> Out + Sync + Clone;
618
619    /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
620    ///
621    /// # Examples
622    ///
623    /// ```
624    /// use orx_parallel::*;
625    ///
626    /// let a = [1, 2, 3];
627    ///
628    /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
629    ///
630    /// let b: Vec<_> = iter.collect();
631    /// assert_eq!(b, &[1, 3]);
632    /// ```
633    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
634    where
635        Filter: Fn(&Self::Item) -> bool + Sync + Clone;
636
637    /// Creates an iterator that works like map, but flattens nested structure.
638    ///
639    /// # Examples
640    ///
641    /// ```
642    /// use orx_parallel::*;
643    ///
644    /// let words = ["alpha", "beta", "gamma"];
645    ///
646    /// // chars() returns an iterator
647    /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
648    ///
649    /// let merged: String = all_chars.iter().collect();
650    /// assert_eq!(merged, "alphabetagamma");
651    /// ```
652    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
653    where
654        IOut: IntoIterator,
655        FlatMap: Fn(Self::Item) -> IOut + Sync + Clone;
656
657    /// Creates an iterator that both filters and maps.
658    ///
659    /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
660    ///
661    /// `filter_map` can be used to make chains of `filter` and `map` more concise.
662    /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
663    ///
664    /// # Examples
665    ///
666    /// ```
667    /// use orx_parallel::*;
668    ///
669    /// let a = ["1", "two", "NaN", "four", "5"];
670    ///
671    /// let numbers: Vec<_> = a
672    ///     .into_par()
673    ///     .filter_map(|s| s.parse::<usize>().ok())
674    ///     .collect();
675    ///
676    /// assert_eq!(numbers, [1, 5]);
677    /// ```
678    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
679    where
680        FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone;
681
682    /// Does something with each element of an iterator, passing the value on.
683    ///
684    /// When using iterators, you’ll often chain several of them together.
685    /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
686    /// To do that, insert a call to `inspect()`.
687    ///
688    /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
689    /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
690    ///
691    /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
692    /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
693    /// collect some intermediate values during parallel execution for further inspection.
694    /// The following example demonstrates such a use case.
695    ///
696    /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
697    ///
698    /// # Examples
699    ///
700    /// ```
701    /// use orx_parallel::*;
702    /// use orx_concurrent_bag::*;
703    ///
704    /// let a = vec![1, 4, 2, 3];
705    ///
706    /// // let's add some inspect() calls to investigate what's happening
707    /// // - log some events
708    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
709    /// let bag = ConcurrentBag::new();
710    ///
711    /// let sum = a
712    ///     .par()
713    ///     .copied()
714    ///     .inspect(|x| println!("about to filter: {x}"))
715    ///     .filter(|x| x % 2 == 0)
716    ///     .inspect(|x| {
717    ///         bag.push(*x);
718    ///         println!("made it through filter: {x}");
719    ///     })
720    ///     .sum();
721    /// println!("{sum}");
722    ///
723    /// let mut values_made_through = bag.into_inner();
724    /// values_made_through.sort();
725    /// assert_eq!(values_made_through, [2, 4]);
726    /// ```
727    ///
728    /// This will print:
729    ///
730    /// ```console
731    /// about to filter: 1
732    /// about to filter: 4
733    /// made it through filter: 4
734    /// about to filter: 2
735    /// made it through filter: 2
736    /// about to filter: 3
737    /// 6
738    /// ```
739    fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
740    where
741        Operation: Fn(&Self::Item) + Sync + Clone,
742    {
743        let map = move |x| {
744            operation(&x);
745            x
746        };
747        self.map(map)
748    }
749
750    // while transformations
751
752    /// Creates an iterator that yields elements based on the predicate `take_while`.
753    ///
754    /// It will call this closure on each element of the iterator, and yield elements while it returns true.
755    ///
756    /// After false is returned, take_while’s job is over, and the rest of the elements are ignored.
757    ///
758    /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
759    ///
760    /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
761    /// elements before the predicate returns false are collected in the correct order; and hence, the result
762    /// is deterministic.
763    ///
764    /// # Examples
765    ///
766    /// ```
767    /// use orx_parallel::*;
768    ///
769    /// let iter = (0..10_000).par().take_while(|x| *x != 5_000);
770    /// let b: Vec<_> = iter.collect();
771    ///
772    /// assert_eq!(b, (0..5_000).collect::<Vec<_>>());
773    /// ```
774    fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
775    where
776        While: Fn(&Self::Item) -> bool + Sync + Clone;
777
778    /// Creates an iterator that both yields elements based on the predicate `map_while` and maps.
779    ///
780    /// `map_while` takes a closure as an argument.
781    /// It will call this closure on each element of the iterator, and yield elements while it returns Some(_).
782    ///
783    /// After None is returned, map_while job is over, and the rest of the elements are ignored.
784    ///
785    /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
786    ///
787    /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
788    /// elements before the predicate returns None are collected in the correct order; and hence, the result
789    /// is deterministic.
790    ///
791    /// ```
792    /// use orx_parallel::*;
793    ///
794    /// let iter = (0..10_000)
795    ///     .par()
796    ///     .map(|x| x as i32 - 5_000) // -5_000..10_000
797    ///     .map_while(|x| 16i32.checked_div(x));
798    /// let b: Vec<_> = iter.collect();
799    ///
800    /// assert_eq!(b, (-5_000..0).map(|x| 16 / x).collect::<Vec<_>>());
801    /// ```
802    fn map_while<Out, MapWhile>(self, map_while: MapWhile) -> impl ParIter<R, Item = Out>
803    where
804        MapWhile: Fn(Self::Item) -> Option<Out> + Sync + Clone,
805    {
806        self.map(map_while).take_while(|x| x.is_some()).map(|x| {
807            // SAFETY: since x passed the whilst(is-some) check, unwrap_unchecked
808            unsafe { x.unwrap_unchecked() }
809        })
810    }
811
812    // special item transformations
813
814    /// Creates an iterator which copies all of its elements.
815    ///
816    /// # Examples
817    ///
818    /// ```
819    /// use orx_parallel::*;
820    ///
821    /// let a = vec![1, 2, 3];
822    ///
823    /// let v_copied: Vec<_> = a.par().copied().collect();
824    ///
825    /// // copied is the same as .map(|&x| x)
826    /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
827    ///
828    /// assert_eq!(v_copied, vec![1, 2, 3]);
829    /// assert_eq!(v_map, vec![1, 2, 3]);
830    /// ```
831    fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
832    where
833        T: 'a + Copy,
834        Self: ParIter<R, Item = &'a T>,
835    {
836        self.map(map_copy)
837    }
838
839    /// Creates an iterator which clones all of its elements.
840    ///
841    /// # Examples
842    ///
843    /// ```
844    /// use orx_parallel::*;
845    ///
846    /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
847    ///
848    /// let v_cloned: Vec<_> = a.par().cloned().collect();
849    ///
850    /// // cloned is the same as .map(|x| x.clone())
851    /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
852    ///
853    /// assert_eq!(
854    ///     v_cloned,
855    ///     vec![String::from("1"), String::from("2"), String::from("3")]
856    /// );
857    /// assert_eq!(
858    ///     v_map,
859    ///     vec![String::from("1"), String::from("2"), String::from("3")]
860    /// );
861    /// ```
862    fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
863    where
864        T: 'a + Clone,
865        Self: ParIter<R, Item = &'a T>,
866    {
867        self.map(map_clone)
868    }
869
870    /// Creates an iterator that flattens nested structure.
871    ///
872    /// This is useful when you have an iterator of iterators or an iterator of things that can be
873    /// turned into iterators and you want to remove one level of indirection.
874    ///
875    /// # Examples
876    ///
877    /// Basic usage.
878    ///
879    /// ```
880    /// use orx_parallel::*;
881    ///
882    /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
883    /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
884    /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
885    /// ```
886    ///
887    /// Mapping and then flattening:
888    ///
889    /// ```
890    /// use orx_parallel::*;
891    ///
892    /// let words = vec!["alpha", "beta", "gamma"];
893    ///
894    /// // chars() returns an iterator
895    /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
896    /// let merged: String = all_characters.into_iter().collect();
897    /// assert_eq!(merged, "alphabetagamma");
898    /// ```
899    ///
900    /// But actually, you can write this in terms of `flat_map`,
901    /// which is preferable in this case since it conveys intent more clearly:
902    ///
903    /// ```
904    /// use orx_parallel::*;
905    ///
906    /// let words = vec!["alpha", "beta", "gamma"];
907    ///
908    /// // chars() returns an iterator
909    /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
910    /// let merged: String = all_characters.into_iter().collect();
911    /// assert_eq!(merged, "alphabetagamma");
912    /// ```
913    fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
914    where
915        Self::Item: IntoIterator,
916    {
917        let map = |e: Self::Item| e.into_iter();
918        self.flat_map(map)
919    }
920
921    // collect
922
923    /// Collects all the items from an iterator into a collection.
924    ///
925    /// This is useful when you already have a collection and want to add the iterator items to it.
926    ///
927    /// The collection is passed in as owned value, and returned back with the additional elements.
928    ///
929    /// All collections implementing [`ParCollectInto`] can be used to collect into.
930    ///
931    /// [`ParCollectInto`]: crate::ParCollectInto
932    ///
933    /// # Examples
934    ///
935    /// ```
936    /// use orx_parallel::*;
937    ///
938    /// let a = vec![1, 2, 3];
939    ///
940    /// let vec: Vec<i32> = vec![0, 1];
941    /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
942    /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
943    ///
944    /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
945    /// ```
946    fn collect_into<C>(self, output: C) -> C
947    where
948        C: ParCollectInto<Self::Item>;
949
950    /// Transforms an iterator into a collection.
951    ///
952    /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
953    /// the type of the result collection; or turbofish annotation can be used.
954    ///
955    /// All collections implementing [`ParCollectInto`] can be used to collect into.
956    ///
957    /// [`ParCollectInto`]: crate::ParCollectInto
958    ///
959    /// # Examples
960    ///
961    /// ```
962    /// use orx_parallel::*;
963    ///
964    /// let a = vec![1, 2, 3];
965    ///
966    /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
967    ///
968    /// assert_eq!(vec![2, 4, 6], doubled);
969    /// ```
970    fn collect<C>(self) -> C
971    where
972        C: ParCollectInto<Self::Item>,
973    {
974        let output = C::empty(self.con_iter().try_get_len());
975        self.collect_into(output)
976    }
977
978    // reduce
979
980    /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
981    ///
982    /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
983    ///
984    /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
985    ///
986    /// # Example
987    ///
988    /// ```
989    /// use orx_parallel::*;
990    ///
991    /// let inputs = 1..10;
992    /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
993    /// assert_eq!(reduced, 45);
994    /// ```
995    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
996    where
997        Self::Item: Send,
998        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
999
1000    /// Tests if every element of the iterator matches a predicate.
1001    ///
1002    /// `all` takes a `predicate` that returns true or false.
1003    /// It applies this closure to each element of the iterator,
1004    /// and if they all return true, then so does `all`.
1005    /// If any of them returns false, it returns false.
1006    ///
1007    /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
1008    /// given that no matter what else happens, the result will also be false.
1009    ///
1010    /// An empty iterator returns true.
1011    ///
1012    /// # Examples
1013    ///
1014    /// ```
1015    /// use orx_parallel::*;
1016    ///
1017    /// let mut a = vec![1, 2, 3];
1018    /// assert!(a.par().all(|x| **x > 0));
1019    /// assert!(!a.par().all(|x| **x > 2));
1020    ///
1021    /// a.clear();
1022    /// assert!(a.par().all(|x| **x > 2)); // empty iterator
1023    /// ```
1024    fn all<Predicate>(self, predicate: Predicate) -> bool
1025    where
1026        Self::Item: Send,
1027        Predicate: Fn(&Self::Item) -> bool + Sync,
1028    {
1029        let violates = |x: &Self::Item| !predicate(x);
1030        self.find(violates).is_none()
1031    }
1032
1033    /// Tests if any element of the iterator matches a predicate.
1034    ///
1035    /// `any` takes a `predicate` that returns true or false.
1036    /// It applies this closure to each element of the iterator,
1037    /// and if any of the elements returns true, then so does `any`.
1038    /// If all of them return false, it returns false.
1039    ///
1040    /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
1041    /// given that no matter what else happens, the result will also be true.
1042    ///
1043    /// An empty iterator returns false.
1044    ///
1045    /// # Examples
1046    ///
1047    /// ```
1048    /// use orx_parallel::*;
1049    ///
1050    /// let mut a = vec![1, 2, 3];
1051    /// assert!(a.par().any(|x| **x > 0));
1052    /// assert!(!a.par().any(|x| **x > 5));
1053    ///
1054    /// a.clear();
1055    /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
1056    /// ```
1057    fn any<Predicate>(self, predicate: Predicate) -> bool
1058    where
1059        Self::Item: Send,
1060        Predicate: Fn(&Self::Item) -> bool + Sync,
1061    {
1062        self.find(predicate).is_some()
1063    }
1064
1065    /// Consumes the iterator, counting the number of iterations and returning it.
1066    ///
1067    /// # Examples
1068    ///
1069    /// ```
1070    /// use orx_parallel::*;
1071    ///
1072    /// let a = vec![1, 2, 3];
1073    /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
1074    /// ```
1075    fn count(self) -> usize {
1076        self.map(map_count).reduce(reduce_sum).unwrap_or(0)
1077    }
1078
1079    /// Calls a closure on each element of an iterator.
1080    ///
1081    /// # Examples
1082    ///
1083    /// Basic usage:
1084    ///
1085    /// ```
1086    /// use orx_parallel::*;
1087    /// use std::sync::mpsc::channel;
1088    ///
1089    /// let (tx, rx) = channel();
1090    /// (0..5)
1091    ///     .par()
1092    ///     .map(|x| x * 2 + 1)
1093    ///     .for_each(move |x| tx.send(x).unwrap());
1094    ///
1095    /// let mut v: Vec<_> = rx.iter().collect();
1096    /// v.sort(); // order can be mixed, since messages will be sent in parallel
1097    /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
1098    /// ```
1099    ///
1100    /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
1101    /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
1102    /// In the following example, we log every element that satisfies a predicate in parallel.
1103    ///
1104    /// ```
1105    /// use orx_parallel::*;
1106    ///
1107    /// (0..5)
1108    ///     .par()
1109    ///     .flat_map(|x| x * 100..x * 110)
1110    ///     .filter(|&x| x % 3 == 0)
1111    ///     .for_each(|x| println!("{x}"));
1112    /// ```
1113    fn for_each<Operation>(self, operation: Operation)
1114    where
1115        Operation: Fn(Self::Item) + Sync,
1116    {
1117        let map = |x| operation(x);
1118        let _ = self.map(map).reduce(reduce_unit);
1119    }
1120
1121    /// Returns the maximum element of an iterator.
1122    ///
1123    /// If the iterator is empty, None is returned.
1124    ///
1125    /// # Examples
1126    ///
1127    /// ```
1128    /// use orx_parallel::*;
1129    ///
1130    /// let a = vec![1, 2, 3];
1131    /// let b: Vec<u32> = Vec::new();
1132    ///
1133    /// assert_eq!(a.par().max(), Some(&3));
1134    /// assert_eq!(b.par().max(), None);
1135    /// ```
1136    fn max(self) -> Option<Self::Item>
1137    where
1138        Self::Item: Ord + Send,
1139    {
1140        self.reduce(Ord::max)
1141    }
1142
1143    /// Returns the element that gives the maximum value with respect to the specified `compare` function.
1144    ///
1145    /// If the iterator is empty, None is returned.
1146    ///
1147    /// # Examples
1148    ///
1149    /// ```
1150    /// use orx_parallel::*;
1151    ///
1152    /// let a = vec![-3_i32, 0, 1, 5, -10];
1153    /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
1154    /// ```
1155    fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1156    where
1157        Self::Item: Send,
1158        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1159    {
1160        let reduce = |x, y| match compare(&x, &y) {
1161            Ordering::Greater | Ordering::Equal => x,
1162            Ordering::Less => y,
1163        };
1164        self.reduce(reduce)
1165    }
1166
1167    /// Returns the element that gives the maximum value from the specified function.
1168    ///
1169    /// If the iterator is empty, None is returned.
1170    ///
1171    /// # Examples
1172    ///
1173    /// ```
1174    /// use orx_parallel::*;
1175    ///
1176    /// let a = vec![-3_i32, 0, 1, 5, -10];
1177    /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
1178    /// ```
1179    fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
1180    where
1181        Self::Item: Send,
1182        Key: Ord,
1183        GetKey: Fn(&Self::Item) -> Key + Sync,
1184    {
1185        let reduce = |x, y| match key(&x).cmp(&key(&y)) {
1186            Ordering::Greater | Ordering::Equal => x,
1187            Ordering::Less => y,
1188        };
1189        self.reduce(reduce)
1190    }
1191
1192    /// Returns the minimum element of an iterator.
1193    ///
1194    /// If the iterator is empty, None is returned.
1195    ///
1196    /// # Examples
1197    ///
1198    /// ```
1199    /// use orx_parallel::*;
1200    ///
1201    /// let a = vec![1, 2, 3];
1202    /// let b: Vec<u32> = Vec::new();
1203    ///
1204    /// assert_eq!(a.par().min(), Some(&1));
1205    /// assert_eq!(b.par().min(), None);
1206    /// ```
1207    fn min(self) -> Option<Self::Item>
1208    where
1209        Self::Item: Ord + Send,
1210    {
1211        self.reduce(Ord::min)
1212    }
1213
1214    /// Returns the element that gives the minimum value with respect to the specified `compare` function.
1215    ///
1216    /// If the iterator is empty, None is returned.
1217    ///
1218    /// # Examples
1219    ///
1220    /// ```
1221    /// use orx_parallel::*;
1222    ///
1223    /// let a = vec![-3_i32, 0, 1, 5, -10];
1224    /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
1225    /// ```
1226    fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1227    where
1228        Self::Item: Send,
1229        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1230    {
1231        let reduce = |x, y| match compare(&x, &y) {
1232            Ordering::Less | Ordering::Equal => x,
1233            Ordering::Greater => y,
1234        };
1235        self.reduce(reduce)
1236    }
1237
1238    /// Returns the element that gives the minimum value from the specified function.
1239    ///
1240    /// If the iterator is empty, None is returned.
1241    ///
1242    /// # Examples
1243    ///
1244    /// ```
1245    /// use orx_parallel::*;
1246    ///
1247    /// let a = vec![-3_i32, 0, 1, 5, -10];
1248    /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
1249    /// ```
1250    fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
1251    where
1252        Self::Item: Send,
1253        Key: Ord,
1254        GetKey: Fn(&Self::Item) -> Key + Sync,
1255    {
1256        let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1257            Ordering::Less | Ordering::Equal => x,
1258            Ordering::Greater => y,
1259        };
1260        self.reduce(reduce)
1261    }
1262
1263    /// Sums the elements of an iterator.
1264    ///
1265    /// Takes each element, adds them together, and returns the result.
1266    ///
1267    /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
1268    ///
1269    /// `sum` can be used to sum any type implementing [`Sum<Out>`].
1270    ///
1271    /// [`Sum<Out>`]: crate::Sum
1272    ///
1273    /// # Examples
1274    ///
1275    /// ```
1276    /// use orx_parallel::*;
1277    ///
1278    /// let a = vec![1, 2, 3];
1279    /// let sum: i32 = a.par().sum();
1280    ///
1281    /// assert_eq!(sum, 6);
1282    /// ```
1283    fn sum<Out>(self) -> Out
1284    where
1285        Self::Item: Sum<Out>,
1286        Out: Send,
1287    {
1288        self.map(Self::Item::map)
1289            .reduce(Self::Item::reduce)
1290            .unwrap_or(Self::Item::zero())
1291    }
1292
1293    // early exit
1294
1295    /// Returns the first (or any) element of the iterator; returns None if it is empty.
1296    ///
1297    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1298    /// * any element is returned if `IterationOrder::Arbitrary` is set.
1299    ///
1300    /// # Examples
1301    ///
1302    /// The following example demonstrates the usage of first with default `Ordered` iteration.
1303    /// This guarantees that the first element with respect to position in the input sequence
1304    /// is returned.
1305    ///
1306    /// ```
1307    /// use orx_parallel::*;
1308    ///
1309    /// let a: Vec<usize> = vec![];
1310    /// assert_eq!(a.par().copied().first(), None);
1311    ///
1312    /// let a = vec![1, 2, 3];
1313    /// assert_eq!(a.par().copied().first(), Some(1));
1314    ///
1315    /// let a = 1..10_000;
1316    /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
1317    /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
1318    ///
1319    /// // or equivalently,
1320    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1321    /// ```
1322    ///
1323    /// When the order is set to `Arbitrary`, `first` might return any of the elements,
1324    /// whichever is visited first depending on the parallel execution.
1325    ///
1326    /// ```
1327    /// use orx_parallel::*;
1328    ///
1329    /// let a = 1..10_000;
1330    ///
1331    /// // might return either of 3421 or 2*3421
1332    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
1333    /// assert!([3421, 2 * 3421].contains(&any));
1334    ///
1335    /// // or equivalently,
1336    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1337    /// assert!([3421, 2 * 3421].contains(&any));
1338    /// ```
1339    fn first(self) -> Option<Self::Item>
1340    where
1341        Self::Item: Send;
1342
1343    /// Searches for an element of an iterator that satisfies a `predicate`.
1344    ///
1345    /// Depending on the set iteration order of the parallel iterator, returns
1346    ///
1347    /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
1348    /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
1349    ///
1350    /// `find` takes a closure that returns true or false.
1351    /// It applies this closure to each element of the iterator,
1352    /// and returns `Some(x)` where `x` is the first element that returns true.
1353    /// If they all return false, it returns None.
1354    ///
1355    /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
1356    ///
1357    /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
1358    ///
1359    /// # Examples
1360    ///
1361    /// The following example demonstrates the usage of first with default `Ordered` iteration.
1362    /// This guarantees that the first element with respect to position in the input sequence
1363    /// is returned.
1364    ///
1365    /// ```
1366    /// use orx_parallel::*;
1367    ///
1368    /// let a = 1..10_000;
1369    /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
1370    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1371    /// ```
1372    ///
1373    /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
1374    /// whichever is found first depending on the parallel execution.
1375    ///
1376    /// ```
1377    /// use orx_parallel::*;
1378    ///
1379    /// let a = 1..10_000;
1380    ///
1381    /// // might return either of 3421 or 2*3421
1382    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1383    /// assert!([3421, 2 * 3421].contains(&any));
1384    /// ```
1385    fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
1386    where
1387        Self::Item: Send,
1388        Predicate: Fn(&Self::Item) -> bool + Sync,
1389    {
1390        self.filter(&predicate).first()
1391    }
1392}