orx_parallel/
par_iter.rs

1use crate::using::{UsingClone, UsingFun};
2use crate::{
3    ParIterUsing, Params,
4    collect_into::ParCollectInto,
5    computations::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
6    parameters::{ChunkSize, IterationOrder, NumThreads},
7    runner::{DefaultRunner, ParallelRunner},
8    special_type_sets::Sum,
9};
10use orx_concurrent_iter::ConcurrentIter;
11use std::cmp::Ordering;
12
13/// Parallel iterator.
14pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
15where
16    R: ParallelRunner,
17{
18    /// Element type of the parallel iterator.
19    type Item: Send + Sync;
20
21    /// Returns a reference to the input concurrent iterator.
22    fn con_iter(&self) -> &impl ConcurrentIter;
23
24    /// Parameters of the parallel iterator.
25    ///
26    /// # Examples
27    ///
28    /// ```
29    /// use orx_parallel::*;
30    /// use std::num::NonZero;
31    ///
32    /// let vec = vec![1, 2, 3, 4];
33    ///
34    /// assert_eq!(
35    ///     vec.par().params(),
36    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
37    /// );
38    ///
39    /// assert_eq!(
40    ///     vec.par().num_threads(0).chunk_size(0).params(),
41    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
42    /// );
43    ///
44    /// assert_eq!(
45    ///     vec.par().num_threads(1).params(),
46    ///     Params::new(
47    ///         NumThreads::Max(NonZero::new(1).unwrap()),
48    ///         ChunkSize::Auto,
49    ///         IterationOrder::Ordered
50    ///     )
51    /// );
52    ///
53    /// assert_eq!(
54    ///     vec.par().num_threads(4).chunk_size(64).params(),
55    ///     Params::new(
56    ///         NumThreads::Max(NonZero::new(4).unwrap()),
57    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
58    ///         IterationOrder::Ordered
59    ///     )
60    /// );
61    ///
62    /// assert_eq!(
63    ///     vec.par()
64    ///         .num_threads(8)
65    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
66    ///         .iteration_order(IterationOrder::Arbitrary)
67    ///         .params(),
68    ///     Params::new(
69    ///         NumThreads::Max(NonZero::new(8).unwrap()),
70    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
71    ///         IterationOrder::Arbitrary
72    ///     )
73    /// );
74    /// ```
75    fn params(&self) -> Params;
76
77    // params transformations
78
79    /// Sets the number of threads to be used in the parallel execution.
80    /// Integers can be used as the argument with the following mapping:
81    ///
82    /// * `0` -> `NumThreads::Auto`
83    /// * `1` -> `NumThreads::sequential()`
84    /// * `n > 0` -> `NumThreads::Max(n)`
85    ///
86    /// See [`NumThreads`] for details.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// use orx_parallel::*;
92    /// use std::num::NonZero;
93    ///
94    /// let vec = vec![1, 2, 3, 4];
95    ///
96    /// // all available threads can be used
97    ///
98    /// assert_eq!(
99    ///     vec.par().params(),
100    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
101    /// );
102    ///
103    /// assert_eq!(
104    ///     vec.par().num_threads(0).chunk_size(0).params(),
105    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
106    /// );
107    ///
108    /// // computation will be executed sequentially on the main thread, no parallelization
109    ///
110    /// assert_eq!(
111    ///     vec.par().num_threads(1).params(),
112    ///     Params::new(
113    ///         NumThreads::Max(NonZero::new(1).unwrap()),
114    ///         ChunkSize::Auto,
115    ///         IterationOrder::Ordered
116    ///     )
117    /// );
118    ///
119    /// // maximum 4 threads can be used
120    /// assert_eq!(
121    ///     vec.par().num_threads(4).chunk_size(64).params(),
122    ///     Params::new(
123    ///         NumThreads::Max(NonZero::new(4).unwrap()),
124    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
125    ///         IterationOrder::Ordered
126    ///     )
127    /// );
128    ///
129    /// // maximum 8 threads can be used
130    /// assert_eq!(
131    ///     vec.par()
132    ///         .num_threads(8)
133    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
134    ///         .iteration_order(IterationOrder::Arbitrary)
135    ///         .params(),
136    ///     Params::new(
137    ///         NumThreads::Max(NonZero::new(8).unwrap()),
138    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
139    ///         IterationOrder::Arbitrary
140    ///     )
141    /// );
142    /// ```
143    fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
144
145    /// Sets the number of elements to be pulled from the concurrent iterator during the
146    /// parallel execution. When integers are used as argument, the following mapping applies:
147    ///
148    /// * `0` -> `ChunkSize::Auto`
149    /// * `n > 0` -> `ChunkSize::Exact(n)`
150    ///
151    /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
152    ///
153    /// See [`ChunkSize`] for details.
154    ///
155    /// # Examples
156    ///
157    /// ```
158    /// use orx_parallel::*;
159    /// use std::num::NonZero;
160    ///
161    /// let vec = vec![1, 2, 3, 4];
162    ///
163    /// // chunk sizes will be dynamically decided by the parallel runner
164    ///
165    /// assert_eq!(
166    ///     vec.par().params(),
167    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
168    /// );
169    ///
170    /// assert_eq!(
171    ///     vec.par().num_threads(0).chunk_size(0).params(),
172    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
173    /// );
174    ///
175    /// assert_eq!(
176    ///     vec.par().num_threads(1).params(),
177    ///     Params::new(
178    ///         NumThreads::Max(NonZero::new(1).unwrap()),
179    ///         ChunkSize::Auto,
180    ///         IterationOrder::Ordered
181    ///     )
182    /// );
183    ///
184    /// // chunk size will always be 64, parallel runner cannot change
185    ///
186    /// assert_eq!(
187    ///     vec.par().num_threads(4).chunk_size(64).params(),
188    ///     Params::new(
189    ///         NumThreads::Max(NonZero::new(4).unwrap()),
190    ///         ChunkSize::Exact(NonZero::new(64).unwrap()),
191    ///         IterationOrder::Ordered
192    ///     )
193    /// );
194    ///
195    /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
196    ///
197    /// assert_eq!(
198    ///     vec.par()
199    ///         .num_threads(8)
200    ///         .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
201    ///         .iteration_order(IterationOrder::Arbitrary)
202    ///         .params(),
203    ///     Params::new(
204    ///         NumThreads::Max(NonZero::new(8).unwrap()),
205    ///         ChunkSize::Min(NonZero::new(16).unwrap()),
206    ///         IterationOrder::Arbitrary
207    ///     )
208    /// );
209    /// ```
210    fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
211
212    /// Sets the iteration order of the parallel computation.
213    ///
214    /// # Examples
215    ///
216    /// ```
217    /// use orx_parallel::*;
218    ///
219    /// let vec = vec![1, 2, 3, 4];
220    ///
221    /// // results are collected in order consistent to the input order,
222    /// // or find returns the first element satisfying the predicate
223    ///
224    /// assert_eq!(
225    ///     vec.par().params(),
226    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
227    /// );
228    ///
229    /// assert_eq!(
230    ///     vec.par().iteration_order(IterationOrder::Ordered).params(),
231    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
232    /// );
233    ///
234    /// // results might be collected in arbitrary order
235    /// // or find returns the any of the elements satisfying the predicate
236    ///
237    /// assert_eq!(
238    ///     vec.par().iteration_order(IterationOrder::Arbitrary).params(),
239    ///     Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
240    /// );
241    /// ```
242    fn iteration_order(self, collect: IterationOrder) -> Self;
243
244    /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
245    ///
246    /// # Examples
247    ///
248    /// ```ignore
249    /// use orx_parallel::*;
250    ///
251    /// let inputs = vec![1, 2, 3, 4];
252    ///
253    /// // uses the default runner
254    /// let sum = inputs.par().sum();
255    ///
256    /// // uses the custom parallel runner MyParallelRunner: ParallelRunner
257    /// let sum = inputs.par().with_runner::<MyParallelRunner>().sum();
258    /// ```
259    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item>;
260
261    // using transformations
262
263    /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
264    /// used variable throughout the computation.
265    ///
266    /// Note that each used thread will obtain exactly one instance of the variable.
267    ///
268    /// The signature of the `using` closure is `(thread_idx: usize) -> U` which will create an instance of
269    /// `U` with respect to the `thread_idx`. The `thread_idx` is the order of the spawned thread; i.e.,
270    /// if the parallel computation uses 8 threads, the thread indices will be 0, 1, ..., 7.
271    ///
272    /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
273    ///
274    /// # Examples
275    ///
276    /// ## Example 1: Channels
277    ///
278    /// The following example is taken from rayon's `for_each_with` documentation and converted to using transformation:
279    ///
280    /// ```ignore
281    /// use orx_parallel::*;
282    /// use std::sync::mpsc::channel;
283    ///
284    /// let (sender, receiver) = channel();
285    ///
286    /// (0..5)
287    ///     .into_par()
288    ///     .using(|_thread_idx| sender.clone())
289    ///     .for_each(|s, x| s.send(x).unwrap());
290    ///
291    /// let mut res: Vec<_> = receiver.iter().collect();
292    ///
293    /// res.sort();
294    ///
295    /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
296    /// ```
297    ///
298    /// ## Example 2: Random Number Generator
299    ///
300    /// Random number generator is one of the common use cases that is important for a certain class of algorithms.
301    ///
302    /// The following example demonstrates how to safely generate random numbers through mutable references within
303    /// a parallel computation.
304    ///
305    /// Notice the differences between sequential and parallel computation.
306    /// * In sequential computation, a mutable reference to `rng` is captured, while in parallel computation, we
307    ///   explicitly define that we will be `using` a random number generator.
308    /// * Parallel iterator does not mutable capture any variable from the scope; however, using transformation
309    ///   converts the `ParIter` into `ParIterUsing` which allows mutable access within all iterator methods.
310    ///
311    /// ```
312    /// use orx_parallel::*;
313    /// use rand::{Rng, SeedableRng};
314    /// use rand_chacha::ChaCha20Rng;
315    ///
316    /// fn random_walk(rng: &mut impl Rng, position: i64, num_steps: usize) -> i64 {
317    ///     (0..num_steps).fold(position, |p, _| random_step(rng, p))
318    /// }
319    ///
320    /// fn random_step(rng: &mut impl Rng, position: i64) -> i64 {
321    ///     match rng.random_bool(0.5) {
322    ///         true => position + 1,  // to right
323    ///         false => position - 1, // to left
324    ///     }
325    /// }
326    ///
327    /// fn input_positions() -> Vec<i64> {
328    ///     (-100..=100).collect()
329    /// }
330    ///
331    /// fn sequential() {
332    ///     let positions = input_positions();
333    ///
334    ///     let mut rng = ChaCha20Rng::seed_from_u64(42);
335    ///     let final_positions: Vec<_> = positions
336    ///         .iter()
337    ///         .copied()
338    ///         .map(|position| random_walk(&mut rng, position, 10))
339    ///         .collect();
340    ///     let sum_final_positions = final_positions.iter().sum::<i64>();
341    /// }
342    ///
343    /// fn parallel() {
344    ///     let positions = input_positions();
345    ///
346    ///     let final_positions: Vec<_> = positions
347    ///         .par()
348    ///         .copied()
349    ///         .using(|t_idx| ChaCha20Rng::seed_from_u64(42 * t_idx as u64))
350    ///         .map(|rng, position| random_walk(rng, position, 10))
351    ///         .collect();
352    ///     let sum_final_positions = final_positions.iter().sum::<i64>();
353    /// }
354    ///
355    /// sequential();
356    /// parallel();
357    /// ```
358    ///
359    /// ## Example 3: Metrics Collection
360    ///
361    /// The following example demonstrates how to collect metrics about a parallel computation with `using` transformation and
362    /// some `unsafe` help with interior mutability.
363    ///
364    /// ```
365    /// use orx_parallel::*;
366    /// use std::cell::UnsafeCell;
367    ///
368    /// const N: u64 = 1_000;
369    /// const MAX_NUM_THREADS: usize = 4;
370    ///
371    /// // just some work
372    /// fn fibonacci(n: u64) -> u64 {
373    ///     let mut a = 0;
374    ///     let mut b = 1;
375    ///     for _ in 0..n {
376    ///         let c = a + b;
377    ///         a = b;
378    ///         b = c;
379    ///     }
380    ///     a
381    /// }
382    ///
383    /// #[derive(Default, Debug)]
384    /// struct ThreadMetrics {
385    ///     thread_idx: usize,
386    ///     num_items_handled: usize,
387    ///     handled_42: bool,
388    ///     num_filtered_out: usize,
389    /// }
390    ///
391    /// struct ThreadMetricsWriter<'a> {
392    ///     metrics_ref: &'a mut ThreadMetrics,
393    /// }
394    ///
395    /// struct ComputationMetrics {
396    ///     thread_metrics: UnsafeCell<[ThreadMetrics; MAX_NUM_THREADS]>,
397    /// }
398    /// impl ComputationMetrics {
399    ///     fn new() -> Self {
400    ///         let mut thread_metrics: [ThreadMetrics; MAX_NUM_THREADS] = Default::default();
401    ///         for i in 0..MAX_NUM_THREADS {
402    ///             thread_metrics[i].thread_idx = i;
403    ///         }
404    ///         Self {
405    ///             thread_metrics: UnsafeCell::new(thread_metrics),
406    ///         }
407    ///     }
408    /// }
409    ///
410    /// impl ComputationMetrics {
411    ///     unsafe fn create_for_thread<'a>(&mut self, thread_idx: usize) -> ThreadMetricsWriter<'a> {
412    ///         // SAFETY: here we create a mutable variable to the thread_idx-th metrics
413    ///         // * If we call this method multiple times with the same index,
414    ///         //   we create multiple mutable references to the same ThreadMetrics,
415    ///         //   which would lead to a race condition.
416    ///         // * We must make sure that `create_for_thread` is called only once per thread.
417    ///         // * If we use `create_for_thread` within the `using` call to create mutable values
418    ///         //   used by the threads, we are certain that the parallel computation
419    ///         //   will only call this method once per thread; hence, it will not
420    ///         //   cause the race condition.
421    ///         // * On the other hand, we must ensure that we do not call this method
422    ///         //   externally.
423    ///         let array = unsafe { &mut *self.thread_metrics.get() };
424    ///         ThreadMetricsWriter {
425    ///             metrics_ref: &mut array[thread_idx],
426    ///         }
427    ///     }
428    /// }
429    ///
430    /// let mut metrics = ComputationMetrics::new();
431    ///
432    /// let input: Vec<u64> = (0..N).collect();
433    ///
434    /// let sum = input
435    ///     .par()
436    ///     // SAFETY: we do not call `create_for_thread` externally;
437    ///     // it is safe if it is called only by the parallel computation.
438    ///     .using(|t| unsafe { metrics.create_for_thread(t) })
439    ///     .map(|m: &mut ThreadMetricsWriter<'_>, i| {
440    ///         // collect some useful metrics
441    ///         m.metrics_ref.num_items_handled += 1;
442    ///         m.metrics_ref.handled_42 |= *i == 42;
443    ///
444    ///         // actual work
445    ///         fibonacci((*i % 20) + 1) % 100
446    ///     })
447    ///     .filter(|m, i| {
448    ///         let is_even = i % 2 == 0;
449    ///
450    ///         if !is_even {
451    ///             m.metrics_ref.num_filtered_out += 1;
452    ///         }
453    ///
454    ///         is_even
455    ///     })
456    ///     .num_threads(MAX_NUM_THREADS)
457    ///     .sum();
458    ///
459    /// let total_by_metrics: usize = metrics
460    ///     .thread_metrics
461    ///     .get_mut()
462    ///     .iter()
463    ///     .map(|x| x.num_items_handled)
464    ///     .sum();
465    ///
466    /// assert_eq!(N as usize, total_by_metrics);
467    /// ```
468    ///
469    fn using<U, F>(
470        self,
471        using: F,
472    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
473    where
474        U: Send,
475        F: FnMut(usize) -> U;
476
477    /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
478    /// used variable throughout the computation.
479    ///
480    /// Note that each used thread will obtain exactly one instance of the variable.
481    ///
482    /// Each used thread receives a clone of the provided `value`.
483    /// Note that, `using_clone(value)` can be considered as a shorthand for `using(|_thread_idx| value.clone())`.
484    ///
485    /// Please see [`using`] for examples.
486    ///
487    /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
488    ///
489    /// [`using`]: crate::ParIter::using
490    fn using_clone<U>(
491        self,
492        value: U,
493    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
494    where
495        U: Clone + Send;
496
497    // computation transformations
498
499    /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
500    ///
501    /// # Examples
502    ///
503    /// ```
504    /// use orx_parallel::*;
505    ///
506    /// let a = [1, 2, 3];
507    ///
508    /// let iter = a.into_par().map(|x| 2 * x);
509    ///
510    /// let b: Vec<_> = iter.collect();
511    /// assert_eq!(b, &[2, 4, 6]);
512    /// ```
513    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
514    where
515        Out: Send + Sync,
516        Map: Fn(Self::Item) -> Out + Send + Sync + Clone;
517
518    /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
519    ///
520    /// # Examples
521    ///
522    /// ```
523    /// use orx_parallel::*;
524    ///
525    /// let a = [1, 2, 3];
526    ///
527    /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
528    ///
529    /// let b: Vec<_> = iter.collect();
530    /// assert_eq!(b, &[1, 3]);
531    /// ```
532    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
533    where
534        Filter: Fn(&Self::Item) -> bool + Send + Sync + Clone;
535
536    /// Creates an iterator that works like map, but flattens nested structure.
537    ///
538    /// # Examples
539    ///
540    /// ```
541    /// use orx_parallel::*;
542    ///
543    /// let words = ["alpha", "beta", "gamma"];
544    ///
545    /// // chars() returns an iterator
546    /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
547    ///
548    /// let merged: String = all_chars.iter().collect();
549    /// assert_eq!(merged, "alphabetagamma");
550    /// ```
551    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
552    where
553        IOut: IntoIterator + Send + Sync,
554        IOut::IntoIter: Send + Sync,
555        IOut::Item: Send + Sync,
556        FlatMap: Fn(Self::Item) -> IOut + Send + Sync + Clone;
557
558    /// Creates an iterator that both filters and maps.
559    ///
560    /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
561    ///
562    /// `filter_map` can be used to make chains of `filter` and `map` more concise.
563    /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
564    ///
565    /// # Examples
566    ///
567    /// ```
568    /// use orx_parallel::*;
569    ///
570    /// let a = ["1", "two", "NaN", "four", "5"];
571    ///
572    /// let numbers: Vec<_> = a
573    ///     .into_par()
574    ///     .filter_map(|s| s.parse::<usize>().ok())
575    ///     .collect();
576    ///
577    /// assert_eq!(numbers, [1, 5]);
578    /// ```
579    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
580    where
581        Out: Send + Sync,
582        FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone;
583
584    /// Does something with each element of an iterator, passing the value on.
585    ///
586    /// When using iterators, you’ll often chain several of them together.
587    /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
588    /// To do that, insert a call to `inspect()`.
589    ///
590    /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
591    /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
592    ///
593    /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
594    /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
595    /// collect some intermediate values during parallel execution for further inspection.
596    /// The following example demonstrates such a use case.
597    ///
598    /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
599    ///
600    /// # Examples
601    ///
602    /// ```
603    /// use orx_parallel::*;
604    /// use orx_concurrent_bag::*;
605    ///
606    /// let a = vec![1, 4, 2, 3];
607    ///
608    /// // let's add some inspect() calls to investigate what's happening
609    /// // - log some events
610    /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
611    /// let bag = ConcurrentBag::new();
612    ///
613    /// let sum = a
614    ///     .par()
615    ///     .copied()
616    ///     .inspect(|x| println!("about to filter: {x}"))
617    ///     .filter(|x| x % 2 == 0)
618    ///     .inspect(|x| {
619    ///         bag.push(*x);
620    ///         println!("made it through filter: {x}");
621    ///     })
622    ///     .sum();
623    /// println!("{sum}");
624    ///
625    /// let mut values_made_through = bag.into_inner();
626    /// values_made_through.sort();
627    /// assert_eq!(values_made_through, [2, 4]);
628    /// ```
629    ///
630    /// This will print:
631    ///
632    /// ```console
633    /// about to filter: 1
634    /// about to filter: 4
635    /// made it through filter: 4
636    /// about to filter: 2
637    /// made it through filter: 2
638    /// about to filter: 3
639    /// 6
640    /// ```
641    fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
642    where
643        Operation: Fn(&Self::Item) + Sync + Send + Clone,
644    {
645        let map = move |x| {
646            operation(&x);
647            x
648        };
649        self.map(map)
650    }
651
652    // special item transformations
653
654    /// Creates an iterator which copies all of its elements.
655    ///
656    /// # Examples
657    ///
658    /// ```
659    /// use orx_parallel::*;
660    ///
661    /// let a = vec![1, 2, 3];
662    ///
663    /// let v_copied: Vec<_> = a.par().copied().collect();
664    ///
665    /// // copied is the same as .map(|&x| x)
666    /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
667    ///
668    /// assert_eq!(v_copied, vec![1, 2, 3]);
669    /// assert_eq!(v_map, vec![1, 2, 3]);
670    /// ```
671    fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
672    where
673        T: 'a + Copy + Send + Sync,
674        Self: ParIter<R, Item = &'a T>,
675    {
676        self.map(map_copy)
677    }
678
679    /// Creates an iterator which clones all of its elements.
680    ///
681    /// # Examples
682    ///
683    /// ```
684    /// use orx_parallel::*;
685    ///
686    /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
687    ///
688    /// let v_cloned: Vec<_> = a.par().cloned().collect();
689    ///
690    /// // cloned is the same as .map(|x| x.clone())
691    /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
692    ///
693    /// assert_eq!(
694    ///     v_cloned,
695    ///     vec![String::from("1"), String::from("2"), String::from("3")]
696    /// );
697    /// assert_eq!(
698    ///     v_map,
699    ///     vec![String::from("1"), String::from("2"), String::from("3")]
700    /// );
701    /// ```
702    fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
703    where
704        T: 'a + Clone + Send + Sync,
705        Self: ParIter<R, Item = &'a T>,
706    {
707        self.map(map_clone)
708    }
709
710    /// Creates an iterator that flattens nested structure.
711    ///
712    /// This is useful when you have an iterator of iterators or an iterator of things that can be
713    /// turned into iterators and you want to remove one level of indirection.
714    ///
715    /// # Examples
716    ///
717    /// Basic usage.
718    ///
719    /// ```
720    /// use orx_parallel::*;
721    ///
722    /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
723    /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
724    /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
725    /// ```
726    ///
727    /// Mapping and then flattening:
728    ///
729    /// ```
730    /// use orx_parallel::*;
731    ///
732    /// let words = vec!["alpha", "beta", "gamma"];
733    ///
734    /// // chars() returns an iterator
735    /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
736    /// let merged: String = all_characters.into_iter().collect();
737    /// assert_eq!(merged, "alphabetagamma");
738    /// ```
739    ///
740    /// But actually, you can write this in terms of `flat_map`,
741    /// which is preferable in this case since it conveys intent more clearly:
742    ///
743    /// ```
744    /// use orx_parallel::*;
745    ///
746    /// let words = vec!["alpha", "beta", "gamma"];
747    ///
748    /// // chars() returns an iterator
749    /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
750    /// let merged: String = all_characters.into_iter().collect();
751    /// assert_eq!(merged, "alphabetagamma");
752    /// ```
753    fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
754    where
755        Self::Item: IntoIterator,
756        <Self::Item as IntoIterator>::IntoIter: Send + Sync,
757        <Self::Item as IntoIterator>::Item: Send + Sync,
758        R: Send + Sync,
759        Self: Send + Sync,
760    {
761        let map = |e: Self::Item| e.into_iter();
762        self.flat_map(map)
763    }
764
765    // collect
766
767    /// Collects all the items from an iterator into a collection.
768    ///
769    /// This is useful when you already have a collection and want to add the iterator items to it.
770    ///
771    /// The collection is passed in as owned value, and returned back with the additional elements.
772    ///
773    /// All collections implementing [`ParCollectInto`] can be used to collect into.
774    ///
775    /// [`ParCollectInto`]: crate::ParCollectInto
776    ///
777    /// # Examples
778    ///
779    /// ```
780    /// use orx_parallel::*;
781    ///
782    /// let a = vec![1, 2, 3];
783    ///
784    /// let vec: Vec<i32> = vec![0, 1];
785    /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
786    /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
787    ///
788    /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
789    /// ```
790    fn collect_into<C>(self, output: C) -> C
791    where
792        C: ParCollectInto<Self::Item>;
793
794    /// Transforms an iterator into a collection.
795    ///
796    /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
797    /// the type of the result collection; or turbofish annotation can be used.
798    ///
799    /// All collections implementing [`ParCollectInto`] can be used to collect into.
800    ///
801    /// [`ParCollectInto`]: crate::ParCollectInto
802    ///
803    /// # Examples
804    ///
805    /// ```
806    /// use orx_parallel::*;
807    ///
808    /// let a = vec![1, 2, 3];
809    ///
810    /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
811    ///
812    /// assert_eq!(vec![2, 4, 6], doubled);
813    /// ```
814    fn collect<C>(self) -> C
815    where
816        C: ParCollectInto<Self::Item>,
817    {
818        let output = C::empty(self.con_iter().try_get_len());
819        self.collect_into(output)
820    }
821
822    // reduce
823
824    /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
825    ///
826    /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
827    ///
828    /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
829    ///
830    /// # Example
831    ///
832    /// ```
833    /// use orx_parallel::*;
834    ///
835    /// let inputs = 1..10;
836    /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
837    /// assert_eq!(reduced, 45);
838    /// ```
839    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
840    where
841        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync;
842
843    /// Tests if every element of the iterator matches a predicate.
844    ///
845    /// `all` takes a `predicate` that returns true or false.
846    /// It applies this closure to each element of the iterator,
847    /// and if they all return true, then so does `all`.
848    /// If any of them returns false, it returns false.
849    ///
850    /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
851    /// given that no matter what else happens, the result will also be false.
852    ///
853    /// An empty iterator returns true.
854    ///
855    /// # Examples
856    ///
857    /// ```
858    /// use orx_parallel::*;
859    ///
860    /// let mut a = vec![1, 2, 3];
861    /// assert!(a.par().all(|x| **x > 0));
862    /// assert!(!a.par().all(|x| **x > 2));
863    ///
864    /// a.clear();
865    /// assert!(a.par().all(|x| **x > 2)); // empty iterator
866    /// ```
867    fn all<Predicate>(self, predicate: Predicate) -> bool
868    where
869        Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
870    {
871        let violates = |x: &Self::Item| !predicate(x);
872        self.find(violates).is_none()
873    }
874
875    /// Tests if any element of the iterator matches a predicate.
876    ///
877    /// `any` takes a `predicate` that returns true or false.
878    /// It applies this closure to each element of the iterator,
879    /// and if any of the elements returns true, then so does `any`.
880    /// If all of them return false, it returns false.
881    ///
882    /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
883    /// given that no matter what else happens, the result will also be true.
884    ///
885    /// An empty iterator returns false.
886    ///
887    /// # Examples
888    ///
889    /// ```
890    /// use orx_parallel::*;
891    ///
892    /// let mut a = vec![1, 2, 3];
893    /// assert!(a.par().any(|x| **x > 0));
894    /// assert!(!a.par().any(|x| **x > 5));
895    ///
896    /// a.clear();
897    /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
898    /// ```
899    fn any<Predicate>(self, predicate: Predicate) -> bool
900    where
901        Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
902    {
903        self.find(predicate).is_some()
904    }
905
906    /// Consumes the iterator, counting the number of iterations and returning it.
907    ///
908    /// # Examples
909    ///
910    /// ```
911    /// use orx_parallel::*;
912    ///
913    /// let a = vec![1, 2, 3];
914    /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
915    /// ```
916    fn count(self) -> usize {
917        self.map(map_count).reduce(reduce_sum).unwrap_or(0)
918    }
919
920    /// Calls a closure on each element of an iterator.
921    ///
922    /// # Examples
923    ///
924    /// Basic usage:
925    ///
926    /// ```
927    /// use orx_parallel::*;
928    /// use std::sync::mpsc::channel;
929    ///
930    /// let (tx, rx) = channel();
931    /// (0..5)
932    ///     .par()
933    ///     .map(|x| x * 2 + 1)
934    ///     .for_each(move |x| tx.send(x).unwrap());
935    ///
936    /// let mut v: Vec<_> = rx.iter().collect();
937    /// v.sort(); // order can be mixed, since messages will be sent in parallel
938    /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
939    /// ```
940    ///
941    /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
942    /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
943    /// In the following example, we log every element that satisfies a predicate in parallel.
944    ///
945    /// ```
946    /// use orx_parallel::*;
947    ///
948    /// (0..5)
949    ///     .par()
950    ///     .flat_map(|x| x * 100..x * 110)
951    ///     .filter(|&x| x % 3 == 0)
952    ///     .for_each(|x| println!("{x}"));
953    /// ```
954    fn for_each<Operation>(self, operation: Operation)
955    where
956        Operation: Fn(Self::Item) + Sync + Send,
957    {
958        let map = |x| operation(x);
959        let _ = self.map(map).reduce(reduce_unit);
960    }
961
962    /// Returns the maximum element of an iterator.
963    ///
964    /// If the iterator is empty, None is returned.
965    ///
966    /// # Examples
967    ///
968    /// ```
969    /// use orx_parallel::*;
970    ///
971    /// let a = vec![1, 2, 3];
972    /// let b: Vec<u32> = Vec::new();
973    ///
974    /// assert_eq!(a.par().max(), Some(&3));
975    /// assert_eq!(b.par().max(), None);
976    /// ```
977    fn max(self) -> Option<Self::Item>
978    where
979        Self::Item: Ord,
980    {
981        self.reduce(Ord::max)
982    }
983
984    /// Returns the element that gives the maximum value with respect to the specified `compare` function.
985    ///
986    /// If the iterator is empty, None is returned.
987    ///
988    /// # Examples
989    ///
990    /// ```
991    /// use orx_parallel::*;
992    ///
993    /// let a = vec![-3_i32, 0, 1, 5, -10];
994    /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
995    /// ```
996    fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
997    where
998        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
999    {
1000        let reduce = |x, y| match compare(&x, &y) {
1001            Ordering::Greater | Ordering::Equal => x,
1002            Ordering::Less => y,
1003        };
1004        self.reduce(reduce)
1005    }
1006
1007    /// Returns the element that gives the maximum value from the specified function.
1008    ///
1009    /// If the iterator is empty, None is returned.
1010    ///
1011    /// # Examples
1012    ///
1013    /// ```
1014    /// use orx_parallel::*;
1015    ///
1016    /// let a = vec![-3_i32, 0, 1, 5, -10];
1017    /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
1018    /// ```
1019    fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
1020    where
1021        Key: Ord,
1022        GetKey: Fn(&Self::Item) -> Key + Sync,
1023    {
1024        let reduce = |x, y| match key(&x).cmp(&key(&y)) {
1025            Ordering::Greater | Ordering::Equal => x,
1026            Ordering::Less => y,
1027        };
1028        self.reduce(reduce)
1029    }
1030
1031    /// Returns the minimum element of an iterator.
1032    ///
1033    /// If the iterator is empty, None is returned.
1034    ///
1035    /// # Examples
1036    ///
1037    /// ```
1038    /// use orx_parallel::*;
1039    ///
1040    /// let a = vec![1, 2, 3];
1041    /// let b: Vec<u32> = Vec::new();
1042    ///
1043    /// assert_eq!(a.par().min(), Some(&1));
1044    /// assert_eq!(b.par().min(), None);
1045    /// ```
1046    fn min(self) -> Option<Self::Item>
1047    where
1048        Self::Item: Ord,
1049    {
1050        self.reduce(Ord::min)
1051    }
1052
1053    /// Returns the element that gives the minimum value with respect to the specified `compare` function.
1054    ///
1055    /// If the iterator is empty, None is returned.
1056    ///
1057    /// # Examples
1058    ///
1059    /// ```
1060    /// use orx_parallel::*;
1061    ///
1062    /// let a = vec![-3_i32, 0, 1, 5, -10];
1063    /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
1064    /// ```
1065    fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1066    where
1067        Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1068    {
1069        let reduce = |x, y| match compare(&x, &y) {
1070            Ordering::Less | Ordering::Equal => x,
1071            Ordering::Greater => y,
1072        };
1073        self.reduce(reduce)
1074    }
1075
1076    /// Returns the element that gives the minimum value from the specified function.
1077    ///
1078    /// If the iterator is empty, None is returned.
1079    ///
1080    /// # Examples
1081    ///
1082    /// ```
1083    /// use orx_parallel::*;
1084    ///
1085    /// let a = vec![-3_i32, 0, 1, 5, -10];
1086    /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
1087    /// ```
1088    fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
1089    where
1090        Key: Ord,
1091        GetKey: Fn(&Self::Item) -> Key + Sync,
1092    {
1093        let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1094            Ordering::Less | Ordering::Equal => x,
1095            Ordering::Greater => y,
1096        };
1097        self.reduce(reduce)
1098    }
1099
1100    /// Sums the elements of an iterator.
1101    ///
1102    /// Takes each element, adds them together, and returns the result.
1103    ///
1104    /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
1105    ///
1106    /// `sum` can be used to sum any type implementing [`Sum<Out>`].
1107    ///
1108    /// [`Sum<Out>`]: crate::Sum
1109    ///
1110    /// # Examples
1111    ///
1112    /// ```
1113    /// use orx_parallel::*;
1114    ///
1115    /// let a = vec![1, 2, 3];
1116    /// let sum: i32 = a.par().sum();
1117    ///
1118    /// assert_eq!(sum, 6);
1119    /// ```
1120    fn sum<Out>(self) -> Out
1121    where
1122        Self::Item: Sum<Out>,
1123        Out: Send + Sync,
1124    {
1125        self.map(Self::Item::map)
1126            .reduce(Self::Item::reduce)
1127            .unwrap_or(Self::Item::zero())
1128    }
1129
1130    // early exit
1131
1132    /// Returns the first (or any) element of the iterator; returns None if it is empty.
1133    ///
1134    /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1135    /// * any element is returned if `IterationOrder::Arbitrary` is set.
1136    ///
1137    /// # Examples
1138    ///
1139    /// The following example demonstrates the usage of first with default `Ordered` iteration.
1140    /// This guarantees that the first element with respect to position in the input sequence
1141    /// is returned.
1142    ///
1143    /// ```
1144    /// use orx_parallel::*;
1145    ///
1146    /// let a: Vec<usize> = vec![];
1147    /// assert_eq!(a.par().copied().first(), None);
1148    ///
1149    /// let a = vec![1, 2, 3];
1150    /// assert_eq!(a.par().copied().first(), Some(1));
1151    ///
1152    /// let a = 1..10_000;
1153    /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
1154    /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
1155    ///
1156    /// // or equivalently,
1157    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1158    /// ```
1159    ///
1160    /// When the order is set to `Arbitrary`, `first` might return any of the elements,
1161    /// whichever is visited first depending on the parallel execution.
1162    ///
1163    /// ```
1164    /// use orx_parallel::*;
1165    ///
1166    /// let a = 1..10_000;
1167    ///
1168    /// // might return either of 3421 or 2*3421
1169    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
1170    /// assert!([3421, 2 * 3421].contains(&any));
1171    ///
1172    /// // or equivalently,
1173    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1174    /// assert!([3421, 2 * 3421].contains(&any));
1175    fn first(self) -> Option<Self::Item>;
1176
1177    /// Searches for an element of an iterator that satisfies a `predicate`.
1178    ///
1179    /// Depending on the set iteration order of the parallel iterator, returns
1180    ///
1181    /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
1182    /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
1183    ///
1184    /// `find` takes a closure that returns true or false.
1185    /// It applies this closure to each element of the iterator,
1186    /// and returns `Some(x)` where `x` is the first element that returns true.
1187    /// If they all return false, it returns None.
1188    ///
1189    /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
1190    ///
1191    /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
1192    ///
1193    /// # Examples
1194    ///
1195    /// The following example demonstrates the usage of first with default `Ordered` iteration.
1196    /// This guarantees that the first element with respect to position in the input sequence
1197    /// is returned.
1198    ///
1199    /// ```
1200    /// use orx_parallel::*;
1201    ///
1202    /// let a = 1..10_000;
1203    /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
1204    /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1205    /// ```
1206    ///
1207    /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
1208    /// whichever is found first depending on the parallel execution.
1209    ///
1210    /// ```
1211    /// use orx_parallel::*;
1212    ///
1213    /// let a = 1..10_000;
1214    ///
1215    /// // might return either of 3421 or 2*3421
1216    /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1217    /// assert!([3421, 2 * 3421].contains(&any));
1218    /// ```
1219    fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
1220    where
1221        Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
1222    {
1223        self.filter(predicate).first()
1224    }
1225}