orx_parallel/par_iter.rs
1use crate::computational_variants::fallible_option::ParOption;
2use crate::par_iter_option::{IntoOption, ParIterOption};
3use crate::par_iter_result::IntoResult;
4use crate::runner::{DefaultRunner, ParallelRunner};
5use crate::using::{UsingClone, UsingFun};
6use crate::{ParIterResult, ParThreadPool, RunnerWithPool};
7use crate::{
8 ParIterUsing, Params,
9 collect_into::ParCollectInto,
10 default_fns::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
11 parameters::{ChunkSize, IterationOrder, NumThreads},
12 special_type_sets::Sum,
13};
14use core::cmp::Ordering;
15use orx_concurrent_iter::ConcurrentIter;
16
17/// Parallel iterator.
18pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
19where
20 R: ParallelRunner,
21{
22 /// Element type of the parallel iterator.
23 type Item;
24
25 /// Returns a reference to the input concurrent iterator.
26 fn con_iter(&self) -> &impl ConcurrentIter;
27
28 /// Parameters of the parallel iterator.
29 ///
30 /// # Examples
31 ///
32 /// ```
33 /// use orx_parallel::*;
34 /// use std::num::NonZero;
35 ///
36 /// let vec = vec![1, 2, 3, 4];
37 ///
38 /// assert_eq!(
39 /// vec.par().params(),
40 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
41 /// );
42 ///
43 /// assert_eq!(
44 /// vec.par().num_threads(0).chunk_size(0).params(),
45 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
46 /// );
47 ///
48 /// assert_eq!(
49 /// vec.par().num_threads(1).params(),
50 /// Params::new(
51 /// NumThreads::Max(NonZero::new(1).unwrap()),
52 /// ChunkSize::Auto,
53 /// IterationOrder::Ordered
54 /// )
55 /// );
56 ///
57 /// assert_eq!(
58 /// vec.par().num_threads(4).chunk_size(64).params(),
59 /// Params::new(
60 /// NumThreads::Max(NonZero::new(4).unwrap()),
61 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
62 /// IterationOrder::Ordered
63 /// )
64 /// );
65 ///
66 /// assert_eq!(
67 /// vec.par()
68 /// .num_threads(8)
69 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
70 /// .iteration_order(IterationOrder::Arbitrary)
71 /// .params(),
72 /// Params::new(
73 /// NumThreads::Max(NonZero::new(8).unwrap()),
74 /// ChunkSize::Min(NonZero::new(16).unwrap()),
75 /// IterationOrder::Arbitrary
76 /// )
77 /// );
78 /// ```
79 fn params(&self) -> Params;
80
81 // params transformations
82
83 /// Sets the number of threads to be used in the parallel execution.
84 /// Integers can be used as the argument with the following mapping:
85 ///
86 /// * `0` -> `NumThreads::Auto`
87 /// * `1` -> `NumThreads::sequential()`
88 /// * `n > 0` -> `NumThreads::Max(n)`
89 ///
90 /// See [`NumThreads`] for details.
91 ///
92 /// # Examples
93 ///
94 /// ```
95 /// use orx_parallel::*;
96 /// use std::num::NonZero;
97 ///
98 /// let vec = vec![1, 2, 3, 4];
99 ///
100 /// // all available threads can be used
101 ///
102 /// assert_eq!(
103 /// vec.par().params(),
104 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
105 /// );
106 ///
107 /// assert_eq!(
108 /// vec.par().num_threads(0).chunk_size(0).params(),
109 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
110 /// );
111 ///
112 /// // computation will be executed sequentially on the main thread, no parallelization
113 ///
114 /// assert_eq!(
115 /// vec.par().num_threads(1).params(),
116 /// Params::new(
117 /// NumThreads::Max(NonZero::new(1).unwrap()),
118 /// ChunkSize::Auto,
119 /// IterationOrder::Ordered
120 /// )
121 /// );
122 ///
123 /// // maximum 4 threads can be used
124 /// assert_eq!(
125 /// vec.par().num_threads(4).chunk_size(64).params(),
126 /// Params::new(
127 /// NumThreads::Max(NonZero::new(4).unwrap()),
128 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
129 /// IterationOrder::Ordered
130 /// )
131 /// );
132 ///
133 /// // maximum 8 threads can be used
134 /// assert_eq!(
135 /// vec.par()
136 /// .num_threads(8)
137 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
138 /// .iteration_order(IterationOrder::Arbitrary)
139 /// .params(),
140 /// Params::new(
141 /// NumThreads::Max(NonZero::new(8).unwrap()),
142 /// ChunkSize::Min(NonZero::new(16).unwrap()),
143 /// IterationOrder::Arbitrary
144 /// )
145 /// );
146 /// ```
147 fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
148
149 /// Sets the number of elements to be pulled from the concurrent iterator during the
150 /// parallel execution. When integers are used as argument, the following mapping applies:
151 ///
152 /// * `0` -> `ChunkSize::Auto`
153 /// * `n > 0` -> `ChunkSize::Exact(n)`
154 ///
155 /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
156 ///
157 /// See [`ChunkSize`] for details.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// use orx_parallel::*;
163 /// use std::num::NonZero;
164 ///
165 /// let vec = vec![1, 2, 3, 4];
166 ///
167 /// // chunk sizes will be dynamically decided by the parallel runner
168 ///
169 /// assert_eq!(
170 /// vec.par().params(),
171 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
172 /// );
173 ///
174 /// assert_eq!(
175 /// vec.par().num_threads(0).chunk_size(0).params(),
176 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
177 /// );
178 ///
179 /// assert_eq!(
180 /// vec.par().num_threads(1).params(),
181 /// Params::new(
182 /// NumThreads::Max(NonZero::new(1).unwrap()),
183 /// ChunkSize::Auto,
184 /// IterationOrder::Ordered
185 /// )
186 /// );
187 ///
188 /// // chunk size will always be 64, parallel runner cannot change
189 ///
190 /// assert_eq!(
191 /// vec.par().num_threads(4).chunk_size(64).params(),
192 /// Params::new(
193 /// NumThreads::Max(NonZero::new(4).unwrap()),
194 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
195 /// IterationOrder::Ordered
196 /// )
197 /// );
198 ///
199 /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
200 ///
201 /// assert_eq!(
202 /// vec.par()
203 /// .num_threads(8)
204 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
205 /// .iteration_order(IterationOrder::Arbitrary)
206 /// .params(),
207 /// Params::new(
208 /// NumThreads::Max(NonZero::new(8).unwrap()),
209 /// ChunkSize::Min(NonZero::new(16).unwrap()),
210 /// IterationOrder::Arbitrary
211 /// )
212 /// );
213 /// ```
214 fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
215
216 /// Sets the iteration order of the parallel computation.
217 ///
218 /// See [`IterationOrder`] for details.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use orx_parallel::*;
224 ///
225 /// let vec = vec![1, 2, 3, 4];
226 ///
227 /// // results are collected in order consistent to the input order,
228 /// // or find returns the first element satisfying the predicate
229 ///
230 /// assert_eq!(
231 /// vec.par().params(),
232 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
233 /// );
234 ///
235 /// assert_eq!(
236 /// vec.par().iteration_order(IterationOrder::Ordered).params(),
237 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
238 /// );
239 ///
240 /// // results might be collected in arbitrary order
241 /// // or find returns the any of the elements satisfying the predicate
242 ///
243 /// assert_eq!(
244 /// vec.par().iteration_order(IterationOrder::Arbitrary).params(),
245 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
246 /// );
247 /// ```
248 fn iteration_order(self, order: IterationOrder) -> Self;
249
250 /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
251 ///
252 /// See also [`with_pool`].
253 ///
254 /// Parallel runner of each computation can be independently specified using `with_runner`.
255 ///
256 /// When not specified the default runner is used, which is:
257 /// * [`RunnerWithPool`] with [`StdDefaultPool`] when "std" feature is enabled,
258 /// * [`RunnerWithPool`] with [`SequentialPool`] when "std" feature is disabled.
259 ///
260 /// Note that [`StdDefaultPool`] uses standard native threads.
261 ///
262 /// When working in a no-std environment, the default runner falls back to sequential.
263 /// In this case, `RunnerWithPool` using a particular thread pool must be passed in using `with_runner`
264 /// transformation to achieve parallel computation.
265 ///
266 /// [`RunnerWithPool`]: crate::RunnerWithPool
267 /// [`StdDefaultPool`]: crate::StdDefaultPool
268 /// [`SequentialPool`]: crate::SequentialPool
269 /// [`with_pool`]: crate::ParIter::with_pool
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// use orx_parallel::*;
275 ///
276 /// let inputs: Vec<_> = (0..42).collect();
277 ///
278 /// // uses the DefaultRunner
279 /// // assuming "std" enabled, RunnerWithPool<StdDefaultPool> will be used; i.e., native threads
280 /// let sum = inputs.par().sum();
281 ///
282 /// // equivalent to:
283 /// #[cfg(feature = "std")]
284 /// {
285 /// let sum2 = inputs.par().with_runner(RunnerWithPool::from(StdDefaultPool::default())).sum();
286 /// assert_eq!(sum, sum2);
287 /// }
288 ///
289 /// #[cfg(not(miri))]
290 /// #[cfg(feature = "scoped_threadpool")]
291 /// {
292 /// let mut pool = scoped_threadpool::Pool::new(8);
293 ///
294 /// // uses the scoped_threadpool::Pool created with 8 threads
295 /// let sum2 = inputs.par().with_runner(RunnerWithPool::from(&mut pool)).sum();
296 /// assert_eq!(sum, sum2);
297 /// }
298 ///
299 /// #[cfg(not(miri))]
300 /// #[cfg(feature = "rayon-core")]
301 /// {
302 /// let pool = rayon_core::ThreadPoolBuilder::new()
303 /// .num_threads(8)
304 /// .build()
305 /// .unwrap();
306 ///
307 /// // uses the rayon-core::ThreadPool created with 8 threads
308 /// let sum2 = inputs.par().with_runner(RunnerWithPool::from(&pool)).sum();
309 /// assert_eq!(sum, sum2);
310 /// }
311 /// ```
312 fn with_runner<Q: ParallelRunner>(self, runner: Q) -> impl ParIter<Q, Item = Self::Item>;
313
314 /// Rather than [`DefaultPool`], uses the parallel runner with the given `pool` implementing
315 /// [`ParThreadPool`].
316 ///
317 /// See also [`with_runner`].
318 ///
319 /// Thread pool of each computation can be independently specified using `with_pool`.
320 ///
321 /// When not specified the default pool is used, which is:
322 /// * [`StdDefaultPool`] when "std" feature is enabled,
323 /// * [`SequentialPool`] when "std" feature is disabled.
324 ///
325 /// Note that [`StdDefaultPool`] uses standard native threads.
326 ///
327 /// When working in a no-std environment, the default pool falls back to sequential.
328 /// In this case, a thread pool must be passed in using `with_pool` transformation to achieve parallel computation.
329 ///
330 /// Note that if a thread pool, say `pool`, is of a type that implements [`ParThreadPool`]; then:
331 /// * `with_pool` can be called with owned value `with_pool(pool)` for all implementors; but also,
332 /// * with a shared reference `with_pool(&pool)` for most of the implementations (eg: rayon-core, yastl), and
333 /// * with a mutable reference `with_pool(&mut pool)` for others (eg: scoped_threadpool).
334 ///
335 /// [`DefaultPool`]: crate::DefaultPool
336 /// [`RunnerWithPool`]: crate::RunnerWithPool
337 /// [`StdDefaultPool`]: crate::StdDefaultPool
338 /// [`SequentialPool`]: crate::SequentialPool
339 /// [`with_runner`]: crate::ParIter::with_runner
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// use orx_parallel::*;
345 ///
346 /// let inputs: Vec<_> = (0..42).collect();
347 ///
348 /// // uses the DefaultPool
349 /// // assuming "std" enabled, StdDefaultPool will be used; i.e., native threads
350 /// let sum = inputs.par().sum();
351 ///
352 /// // equivalent to:
353 /// #[cfg(feature = "std")]
354 /// {
355 /// let sum2 = inputs.par().with_pool(StdDefaultPool::default()).sum();
356 /// assert_eq!(sum, sum2);
357 /// }
358 ///
359 /// #[cfg(not(miri))]
360 /// #[cfg(feature = "scoped_threadpool")]
361 /// {
362 /// let mut pool = scoped_threadpool::Pool::new(8);
363 ///
364 /// // uses the scoped_threadpool::Pool created with 8 threads
365 /// let sum2 = inputs.par().with_pool(&mut pool).sum();
366 /// assert_eq!(sum, sum2);
367 /// }
368 ///
369 /// #[cfg(not(miri))]
370 /// #[cfg(feature = "rayon-core")]
371 /// {
372 /// let pool = rayon_core::ThreadPoolBuilder::new()
373 /// .num_threads(8)
374 /// .build()
375 /// .unwrap();
376 ///
377 /// // uses the rayon-core::ThreadPool created with 8 threads
378 /// let sum2 = inputs.par().with_pool(&pool).sum();
379 /// assert_eq!(sum, sum2);
380 /// }
381 /// ```
382 fn with_pool<P: ParThreadPool>(
383 self,
384 pool: P,
385 ) -> impl ParIter<RunnerWithPool<P, R::Executor>, Item = Self::Item> {
386 let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
387 self.with_runner(runner)
388 }
389
390 // using transformations
391
392 /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
393 /// used variable throughout the computation.
394 ///
395 /// Note that each used thread will obtain exactly one instance of the variable.
396 ///
397 /// The signature of the `using` closure is `(thread_idx: usize) -> U` which will create an instance of
398 /// `U` with respect to the `thread_idx`. The `thread_idx` is the order of the spawned thread; i.e.,
399 /// if the parallel computation uses 8 threads, the thread indices will be 0, 1, ..., 7.
400 ///
401 /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
402 ///
403 /// # Examples
404 ///
405 /// ## Example 1: Channels
406 ///
407 /// The following example is taken from rayon's `for_each_with` documentation and converted to using transformation:
408 ///
409 /// ```ignore
410 /// use orx_parallel::*;
411 /// use std::sync::mpsc::channel;
412 ///
413 /// let (sender, receiver) = channel();
414 ///
415 /// (0..5)
416 /// .into_par()
417 /// .using(|_thread_idx| sender.clone())
418 /// .for_each(|s, x| s.send(x).unwrap());
419 ///
420 /// let mut res: Vec<_> = receiver.iter().collect();
421 ///
422 /// res.sort();
423 ///
424 /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
425 /// ```
426 ///
427 /// ## Example 2: Random Number Generator
428 ///
429 /// Random number generator is one of the common use cases that is important for a certain class of algorithms.
430 ///
431 /// The following example demonstrates how to safely generate random numbers through mutable references within
432 /// a parallel computation.
433 ///
434 /// Notice the differences between sequential and parallel computation.
435 /// * In sequential computation, a mutable reference to `rng` is captured, while in parallel computation, we
436 /// explicitly define that we will be `using` a random number generator.
437 /// * Parallel iterator does not mutable capture any variable from the scope; however, using transformation
438 /// converts the `ParIter` into `ParIterUsing` which allows mutable access within all iterator methods.
439 ///
440 /// ```
441 /// use orx_parallel::*;
442 /// use rand::{Rng, SeedableRng};
443 /// use rand_chacha::ChaCha20Rng;
444 ///
445 /// fn random_walk(rng: &mut impl Rng, position: i64, num_steps: usize) -> i64 {
446 /// (0..num_steps).fold(position, |p, _| random_step(rng, p))
447 /// }
448 ///
449 /// fn random_step(rng: &mut impl Rng, position: i64) -> i64 {
450 /// match rng.random_bool(0.5) {
451 /// true => position + 1, // to right
452 /// false => position - 1, // to left
453 /// }
454 /// }
455 ///
456 /// fn input_positions() -> Vec<i64> {
457 /// (-100..=100).collect()
458 /// }
459 ///
460 /// fn sequential() {
461 /// let positions = input_positions();
462 ///
463 /// let mut rng = ChaCha20Rng::seed_from_u64(42);
464 /// let final_positions: Vec<_> = positions
465 /// .iter()
466 /// .copied()
467 /// .map(|position| random_walk(&mut rng, position, 10))
468 /// .collect();
469 /// let sum_final_positions = final_positions.iter().sum::<i64>();
470 /// }
471 ///
472 /// fn parallel() {
473 /// let positions = input_positions();
474 ///
475 /// let final_positions: Vec<_> = positions
476 /// .par()
477 /// .copied()
478 /// .using(|t_idx| ChaCha20Rng::seed_from_u64(42 * t_idx as u64))
479 /// .map(|rng, position| random_walk(rng, position, 10))
480 /// .collect();
481 /// let sum_final_positions = final_positions.iter().sum::<i64>();
482 /// }
483 ///
484 /// sequential();
485 /// parallel();
486 /// ```
487 ///
488 /// ## Example 3: Metrics Collection
489 ///
490 /// The following example demonstrates how to collect metrics about a parallel computation with `using` transformation and
491 /// some `unsafe` help with interior mutability.
492 ///
493 /// ```
494 /// use orx_parallel::*;
495 /// use std::cell::UnsafeCell;
496 ///
497 /// const N: u64 = 1_000;
498 /// const MAX_NUM_THREADS: usize = 4;
499 ///
500 /// // just some work
501 /// fn fibonacci(n: u64) -> u64 {
502 /// let mut a = 0;
503 /// let mut b = 1;
504 /// for _ in 0..n {
505 /// let c = a + b;
506 /// a = b;
507 /// b = c;
508 /// }
509 /// a
510 /// }
511 ///
512 /// #[derive(Default, Debug)]
513 /// struct ThreadMetrics {
514 /// thread_idx: usize,
515 /// num_items_handled: usize,
516 /// handled_42: bool,
517 /// num_filtered_out: usize,
518 /// }
519 ///
520 /// struct ThreadMetricsWriter<'a> {
521 /// metrics_ref: &'a mut ThreadMetrics,
522 /// }
523 ///
524 /// struct ComputationMetrics {
525 /// thread_metrics: UnsafeCell<[ThreadMetrics; MAX_NUM_THREADS]>,
526 /// }
527 /// unsafe impl Sync for ComputationMetrics {}
528 /// impl ComputationMetrics {
529 /// fn new() -> Self {
530 /// let mut thread_metrics: [ThreadMetrics; MAX_NUM_THREADS] = Default::default();
531 /// for i in 0..MAX_NUM_THREADS {
532 /// thread_metrics[i].thread_idx = i;
533 /// }
534 /// Self {
535 /// thread_metrics: UnsafeCell::new(thread_metrics),
536 /// }
537 /// }
538 /// }
539 ///
540 /// impl ComputationMetrics {
541 /// unsafe fn create_for_thread<'a>(&self, thread_idx: usize) -> ThreadMetricsWriter<'a> {
542 /// // SAFETY: here we create a mutable variable to the thread_idx-th metrics
543 /// // * If we call this method multiple times with the same index,
544 /// // we create multiple mutable references to the same ThreadMetrics,
545 /// // which would lead to a race condition.
546 /// // * We must make sure that `create_for_thread` is called only once per thread.
547 /// // * If we use `create_for_thread` within the `using` call to create mutable values
548 /// // used by the threads, we are certain that the parallel computation
549 /// // will only call this method once per thread; hence, it will not
550 /// // cause the race condition.
551 /// // * On the other hand, we must ensure that we do not call this method
552 /// // externally.
553 /// let array = unsafe { &mut *self.thread_metrics.get() };
554 /// ThreadMetricsWriter {
555 /// metrics_ref: &mut array[thread_idx],
556 /// }
557 /// }
558 /// }
559 ///
560 /// let mut metrics = ComputationMetrics::new();
561 ///
562 /// let input: Vec<u64> = (0..N).collect();
563 ///
564 /// let sum = input
565 /// .par()
566 /// // SAFETY: we do not call `create_for_thread` externally;
567 /// // it is safe if it is called only by the parallel computation.
568 /// // Since we unsafely implement Sync for ComputationMetrics,
569 /// // we must ensure that ComputationMetrics is not used elsewhere.
570 /// .using(|t| unsafe { metrics.create_for_thread(t) })
571 /// .map(|m: &mut ThreadMetricsWriter<'_>, i| {
572 /// // collect some useful metrics
573 /// m.metrics_ref.num_items_handled += 1;
574 /// m.metrics_ref.handled_42 |= *i == 42;
575 ///
576 /// // actual work
577 /// fibonacci((*i % 20) + 1) % 100
578 /// })
579 /// .filter(|m, i| {
580 /// let is_even = i % 2 == 0;
581 ///
582 /// if !is_even {
583 /// m.metrics_ref.num_filtered_out += 1;
584 /// }
585 ///
586 /// is_even
587 /// })
588 /// .num_threads(MAX_NUM_THREADS)
589 /// .sum();
590 ///
591 /// assert_eq!(sum, 9100);
592 ///
593 /// let total_by_metrics: usize = metrics
594 /// .thread_metrics
595 /// .get_mut()
596 /// .iter()
597 /// .map(|x| x.num_items_handled)
598 /// .sum();
599 ///
600 /// assert_eq!(N as usize, total_by_metrics);
601 /// ```
602 ///
603 fn using<U, F>(
604 self,
605 using: F,
606 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
607 where
608 U: 'static,
609 F: Fn(usize) -> U + Sync;
610
611 /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
612 /// used variable throughout the computation.
613 ///
614 /// Note that each used thread will obtain exactly one instance of the variable.
615 ///
616 /// Each used thread receives a clone of the provided `value`.
617 /// Note that, `using_clone(value)` can be considered as a shorthand for `using(|_thread_idx| value.clone())`.
618 ///
619 /// Please see [`using`] for examples.
620 ///
621 /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
622 ///
623 /// [`using`]: crate::ParIter::using
624 fn using_clone<U>(
625 self,
626 value: U,
627 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
628 where
629 U: Clone + 'static;
630
631 // transformations into fallible computations
632
633 /// Transforms a parallel iterator where elements are of the result type; i.e., `ParIter<R, Item = Result<T, E>>`,
634 /// into fallible parallel iterator with item type `T` and error type `E`; i.e., into `ParIterResult<R, Item = T, Err = E>`.
635 ///
636 /// `ParIterResult` is also a parallel iterator; however, with methods specialized for handling fallible computations
637 /// as follows:
638 ///
639 /// * All of its methods are based on the success path with item type of `T`.
640 /// * However, computations short-circuit and immediately return the observed error if any of the items
641 /// is of the `Err` variant of the result enum.
642 ///
643 /// See [`ParIterResult`] for details.
644 ///
645 /// # Examples
646 ///
647 /// ```
648 /// use orx_parallel::*;
649 ///
650 /// // all succeeds
651 ///
652 /// let result_doubled: Result<Vec<i32>, _> = ["1", "2", "3"]
653 /// .into_par()
654 /// .map(|x| x.parse::<i32>()) // ParIter with Item=Result<i32, ParseIntError>
655 /// .into_fallible_result() // ParIterResult with Item=i32 and Err=ParseIntError
656 /// .map(|x| x * 2) // methods focus on the success path with Item=i32
657 /// .collect(); // methods return Result<_, Err>
658 /// // where the Ok variant depends on the computation
659 ///
660 /// assert_eq!(result_doubled, Ok(vec![2, 4, 6]));
661 ///
662 /// // at least one fails
663 ///
664 /// let result_doubled: Result<Vec<i32>, _> = ["1", "x!", "3"]
665 /// .into_par()
666 /// .map(|x| x.parse::<i32>())
667 /// .into_fallible_result()
668 /// .map(|x| x * 2)
669 /// .collect();
670 ///
671 /// assert!(result_doubled.is_err());
672 /// ```
673 fn into_fallible_result<T, E>(self) -> impl ParIterResult<R, Item = T, Err = E>
674 where
675 Self::Item: IntoResult<T, E>;
676
677 /// Transforms a parallel iterator where elements are of the option type; i.e., `ParIter<R, Item = Option<T>>`,
678 /// into fallible parallel iterator with item type `T`; i.e., into `ParIterOption<R, Item = T>`.
679 ///
680 /// `ParIterOption` is also a parallel iterator; however, with methods specialized for handling fallible computations
681 /// as follows:
682 ///
683 /// * All of its methods are based on the success path with item type of `T`.
684 /// * However, computations short-circuit and immediately return None if any of the items
685 /// is of the `None` variant of the option enum.
686 ///
687 /// See [`ParIterResult`] for details.
688 ///
689 /// # Examples
690 ///
691 /// ```
692 /// use orx_parallel::*;
693 ///
694 /// // all succeeds
695 ///
696 /// let result_doubled: Option<Vec<i32>> = ["1", "2", "3"]
697 /// .into_par()
698 /// .map(|x| x.parse::<i32>().ok()) // ParIter with Item=Option<i32>
699 /// .into_fallible_option() // ParIterOption with Item=i32
700 /// .map(|x| x * 2) // methods focus on the success path with Item=i32
701 /// .collect(); // methods return Option<T>
702 /// // where T depends on the computation
703 ///
704 /// assert_eq!(result_doubled, Some(vec![2, 4, 6]));
705 ///
706 /// // at least one fails
707 ///
708 /// let result_doubled: Option<Vec<i32>> = ["1", "x!", "3"]
709 /// .into_par()
710 /// .map(|x| x.parse::<i32>().ok())
711 /// .into_fallible_option()
712 /// .map(|x| x * 2)
713 /// .collect();
714 ///
715 /// assert_eq!(result_doubled, None);
716 /// ```
717 fn into_fallible_option<T>(self) -> impl ParIterOption<R, Item = T>
718 where
719 Self::Item: IntoOption<T>,
720 {
721 ParOption::new(
722 self.map(|x| x.into_result_with_unit_err())
723 .into_fallible_result(),
724 )
725 }
726
727 // computation transformations
728
729 /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
730 ///
731 /// # Examples
732 ///
733 /// ```
734 /// use orx_parallel::*;
735 ///
736 /// let a = [1, 2, 3];
737 ///
738 /// let iter = a.into_par().map(|x| 2 * x);
739 ///
740 /// let b: Vec<_> = iter.collect();
741 /// assert_eq!(b, &[2, 4, 6]);
742 /// ```
743 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
744 where
745 Map: Fn(Self::Item) -> Out + Sync + Clone;
746
747 /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
748 ///
749 /// # Examples
750 ///
751 /// ```
752 /// use orx_parallel::*;
753 ///
754 /// let a = [1, 2, 3];
755 ///
756 /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
757 ///
758 /// let b: Vec<_> = iter.collect();
759 /// assert_eq!(b, &[1, 3]);
760 /// ```
761 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
762 where
763 Filter: Fn(&Self::Item) -> bool + Sync + Clone;
764
765 /// Creates an iterator that works like map, but flattens nested structure.
766 ///
767 /// # Examples
768 ///
769 /// ```
770 /// use orx_parallel::*;
771 ///
772 /// let words = ["alpha", "beta", "gamma"];
773 ///
774 /// // chars() returns an iterator
775 /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
776 ///
777 /// let merged: String = all_chars.iter().collect();
778 /// assert_eq!(merged, "alphabetagamma");
779 /// ```
780 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
781 where
782 IOut: IntoIterator,
783 FlatMap: Fn(Self::Item) -> IOut + Sync + Clone;
784
785 /// Creates an iterator that both filters and maps.
786 ///
787 /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
788 ///
789 /// `filter_map` can be used to make chains of `filter` and `map` more concise.
790 /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
791 ///
792 /// # Examples
793 ///
794 /// ```
795 /// use orx_parallel::*;
796 ///
797 /// let a = ["1", "two", "NaN", "four", "5"];
798 ///
799 /// let numbers: Vec<_> = a
800 /// .into_par()
801 /// .filter_map(|s| s.parse::<usize>().ok())
802 /// .collect();
803 ///
804 /// assert_eq!(numbers, [1, 5]);
805 /// ```
806 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
807 where
808 FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone;
809
810 /// Does something with each element of an iterator, passing the value on.
811 ///
812 /// When using iterators, you’ll often chain several of them together.
813 /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
814 /// To do that, insert a call to `inspect()`.
815 ///
816 /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
817 /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
818 ///
819 /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
820 /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
821 /// collect some intermediate values during parallel execution for further inspection.
822 /// The following example demonstrates such a use case.
823 ///
824 /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
825 ///
826 /// # Examples
827 ///
828 /// ```
829 /// use orx_parallel::*;
830 /// use orx_concurrent_bag::*;
831 ///
832 /// let a = vec![1, 4, 2, 3];
833 ///
834 /// // let's add some inspect() calls to investigate what's happening
835 /// // - log some events
836 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
837 /// let bag = ConcurrentBag::new();
838 ///
839 /// let sum = a
840 /// .par()
841 /// .copied()
842 /// .inspect(|x| println!("about to filter: {x}"))
843 /// .filter(|x| x % 2 == 0)
844 /// .inspect(|x| {
845 /// bag.push(*x);
846 /// println!("made it through filter: {x}");
847 /// })
848 /// .sum();
849 /// println!("{sum}");
850 ///
851 /// let mut values_made_through = bag.into_inner();
852 /// values_made_through.sort();
853 /// assert_eq!(values_made_through, [2, 4]);
854 /// ```
855 ///
856 /// This will print:
857 ///
858 /// ```console
859 /// about to filter: 1
860 /// about to filter: 4
861 /// made it through filter: 4
862 /// about to filter: 2
863 /// made it through filter: 2
864 /// about to filter: 3
865 /// 6
866 /// ```
867 fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
868 where
869 Operation: Fn(&Self::Item) + Sync + Clone,
870 {
871 let map = move |x| {
872 operation(&x);
873 x
874 };
875 self.map(map)
876 }
877
878 // while transformations
879
880 /// Creates an iterator that yields elements based on the predicate `take_while`.
881 ///
882 /// It will call this closure on each element of the iterator, and yield elements while it returns true.
883 ///
884 /// After false is returned, take_while’s job is over, and the rest of the elements are ignored.
885 ///
886 /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
887 ///
888 /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
889 /// elements before the predicate returns false are collected in the correct order; and hence, the result
890 /// is deterministic.
891 ///
892 /// # Examples
893 ///
894 /// ```
895 /// use orx_parallel::*;
896 ///
897 /// let iter = (0..10_000).par().take_while(|x| *x != 5_000);
898 /// let b: Vec<_> = iter.collect();
899 ///
900 /// assert_eq!(b, (0..5_000).collect::<Vec<_>>());
901 /// ```
902 fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
903 where
904 While: Fn(&Self::Item) -> bool + Sync + Clone;
905
906 /// Creates an iterator that both yields elements based on the predicate `map_while` and maps.
907 ///
908 /// `map_while` takes a closure as an argument.
909 /// It will call this closure on each element of the iterator, and yield elements while it returns Some(_).
910 ///
911 /// After None is returned, map_while job is over, and the rest of the elements are ignored.
912 ///
913 /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
914 ///
915 /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
916 /// elements before the predicate returns None are collected in the correct order; and hence, the result
917 /// is deterministic.
918 ///
919 /// ```
920 /// use orx_parallel::*;
921 ///
922 /// let iter = (0..10_000)
923 /// .par()
924 /// .map(|x| x as i32 - 5_000) // -5_000..10_000
925 /// .map_while(|x| 16i32.checked_div(x));
926 /// let b: Vec<_> = iter.collect();
927 ///
928 /// assert_eq!(b, (-5_000..0).map(|x| 16 / x).collect::<Vec<_>>());
929 /// ```
930 fn map_while<Out, MapWhile>(self, map_while: MapWhile) -> impl ParIter<R, Item = Out>
931 where
932 MapWhile: Fn(Self::Item) -> Option<Out> + Sync + Clone,
933 {
934 self.map(map_while).take_while(|x| x.is_some()).map(|x| {
935 // SAFETY: since x passed the whilst(is-some) check, unwrap_unchecked
936 unsafe { x.unwrap_unchecked() }
937 })
938 }
939
940 // special item transformations
941
942 /// Creates an iterator which copies all of its elements.
943 ///
944 /// # Examples
945 ///
946 /// ```
947 /// use orx_parallel::*;
948 ///
949 /// let a = vec![1, 2, 3];
950 ///
951 /// let v_copied: Vec<_> = a.par().copied().collect();
952 ///
953 /// // copied is the same as .map(|&x| x)
954 /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
955 ///
956 /// assert_eq!(v_copied, vec![1, 2, 3]);
957 /// assert_eq!(v_map, vec![1, 2, 3]);
958 /// ```
959 fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
960 where
961 T: 'a + Copy,
962 Self: ParIter<R, Item = &'a T>,
963 {
964 self.map(map_copy)
965 }
966
967 /// Creates an iterator which clones all of its elements.
968 ///
969 /// # Examples
970 ///
971 /// ```
972 /// use orx_parallel::*;
973 ///
974 /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
975 ///
976 /// let v_cloned: Vec<_> = a.par().cloned().collect();
977 ///
978 /// // cloned is the same as .map(|x| x.clone())
979 /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
980 ///
981 /// assert_eq!(
982 /// v_cloned,
983 /// vec![String::from("1"), String::from("2"), String::from("3")]
984 /// );
985 /// assert_eq!(
986 /// v_map,
987 /// vec![String::from("1"), String::from("2"), String::from("3")]
988 /// );
989 /// ```
990 fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
991 where
992 T: 'a + Clone,
993 Self: ParIter<R, Item = &'a T>,
994 {
995 self.map(map_clone)
996 }
997
998 /// Creates an iterator that flattens nested structure.
999 ///
1000 /// This is useful when you have an iterator of iterators or an iterator of things that can be
1001 /// turned into iterators and you want to remove one level of indirection.
1002 ///
1003 /// # Examples
1004 ///
1005 /// Basic usage.
1006 ///
1007 /// ```
1008 /// use orx_parallel::*;
1009 ///
1010 /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
1011 /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
1012 /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
1013 /// ```
1014 ///
1015 /// Mapping and then flattening:
1016 ///
1017 /// ```
1018 /// use orx_parallel::*;
1019 ///
1020 /// let words = vec!["alpha", "beta", "gamma"];
1021 ///
1022 /// // chars() returns an iterator
1023 /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
1024 /// let merged: String = all_characters.into_iter().collect();
1025 /// assert_eq!(merged, "alphabetagamma");
1026 /// ```
1027 ///
1028 /// But actually, you can write this in terms of `flat_map`,
1029 /// which is preferable in this case since it conveys intent more clearly:
1030 ///
1031 /// ```
1032 /// use orx_parallel::*;
1033 ///
1034 /// let words = vec!["alpha", "beta", "gamma"];
1035 ///
1036 /// // chars() returns an iterator
1037 /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
1038 /// let merged: String = all_characters.into_iter().collect();
1039 /// assert_eq!(merged, "alphabetagamma");
1040 /// ```
1041 fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
1042 where
1043 Self::Item: IntoIterator,
1044 {
1045 let map = |e: Self::Item| e.into_iter();
1046 self.flat_map(map)
1047 }
1048
1049 // collect
1050
1051 /// Collects all the items from an iterator into a collection.
1052 ///
1053 /// This is useful when you already have a collection and want to add the iterator items to it.
1054 ///
1055 /// The collection is passed in as owned value, and returned back with the additional elements.
1056 ///
1057 /// All collections implementing [`ParCollectInto`] can be used to collect into.
1058 ///
1059 /// [`ParCollectInto`]: crate::ParCollectInto
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```
1064 /// use orx_parallel::*;
1065 ///
1066 /// let a = vec![1, 2, 3];
1067 ///
1068 /// let vec: Vec<i32> = vec![0, 1];
1069 /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
1070 /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
1071 ///
1072 /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
1073 /// ```
1074 fn collect_into<C>(self, output: C) -> C
1075 where
1076 C: ParCollectInto<Self::Item>;
1077
1078 /// Transforms an iterator into a collection.
1079 ///
1080 /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
1081 /// the type of the result collection; or turbofish annotation can be used.
1082 ///
1083 /// All collections implementing [`ParCollectInto`] can be used to collect into.
1084 ///
1085 /// [`ParCollectInto`]: crate::ParCollectInto
1086 ///
1087 /// # Examples
1088 ///
1089 /// ```
1090 /// use orx_parallel::*;
1091 ///
1092 /// let a = vec![1, 2, 3];
1093 ///
1094 /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
1095 ///
1096 /// assert_eq!(vec![2, 4, 6], doubled);
1097 /// ```
1098 fn collect<C>(self) -> C
1099 where
1100 C: ParCollectInto<Self::Item>,
1101 {
1102 let output = C::empty(self.con_iter().try_get_len());
1103 self.collect_into(output)
1104 }
1105
1106 // reduce
1107
1108 /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
1109 ///
1110 /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
1111 ///
1112 /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
1113 ///
1114 /// # Example
1115 ///
1116 /// ```
1117 /// use orx_parallel::*;
1118 ///
1119 /// let inputs = 1..10;
1120 /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
1121 /// assert_eq!(reduced, 45);
1122 /// ```
1123 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
1124 where
1125 Self::Item: Send,
1126 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
1127
1128 /// Tests if every element of the iterator matches a predicate.
1129 ///
1130 /// `all` takes a `predicate` that returns true or false.
1131 /// It applies this closure to each element of the iterator,
1132 /// and if they all return true, then so does `all`.
1133 /// If any of them returns false, it returns false.
1134 ///
1135 /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
1136 /// given that no matter what else happens, the result will also be false.
1137 ///
1138 /// An empty iterator returns true.
1139 ///
1140 /// # Examples
1141 ///
1142 /// ```
1143 /// use orx_parallel::*;
1144 ///
1145 /// let mut a = vec![1, 2, 3];
1146 /// assert!(a.par().all(|x| **x > 0));
1147 /// assert!(!a.par().all(|x| **x > 2));
1148 ///
1149 /// a.clear();
1150 /// assert!(a.par().all(|x| **x > 2)); // empty iterator
1151 /// ```
1152 fn all<Predicate>(self, predicate: Predicate) -> bool
1153 where
1154 Self::Item: Send,
1155 Predicate: Fn(&Self::Item) -> bool + Sync,
1156 {
1157 let violates = |x: &Self::Item| !predicate(x);
1158 self.find(violates).is_none()
1159 }
1160
1161 /// Tests if any element of the iterator matches a predicate.
1162 ///
1163 /// `any` takes a `predicate` that returns true or false.
1164 /// It applies this closure to each element of the iterator,
1165 /// and if any of the elements returns true, then so does `any`.
1166 /// If all of them return false, it returns false.
1167 ///
1168 /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
1169 /// given that no matter what else happens, the result will also be true.
1170 ///
1171 /// An empty iterator returns false.
1172 ///
1173 /// # Examples
1174 ///
1175 /// ```
1176 /// use orx_parallel::*;
1177 ///
1178 /// let mut a = vec![1, 2, 3];
1179 /// assert!(a.par().any(|x| **x > 0));
1180 /// assert!(!a.par().any(|x| **x > 5));
1181 ///
1182 /// a.clear();
1183 /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
1184 /// ```
1185 fn any<Predicate>(self, predicate: Predicate) -> bool
1186 where
1187 Self::Item: Send,
1188 Predicate: Fn(&Self::Item) -> bool + Sync,
1189 {
1190 self.find(predicate).is_some()
1191 }
1192
1193 /// Consumes the iterator, counting the number of iterations and returning it.
1194 ///
1195 /// # Examples
1196 ///
1197 /// ```
1198 /// use orx_parallel::*;
1199 ///
1200 /// let a = vec![1, 2, 3];
1201 /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
1202 /// ```
1203 fn count(self) -> usize {
1204 self.map(map_count).reduce(reduce_sum).unwrap_or(0)
1205 }
1206
1207 /// Calls a closure on each element of an iterator.
1208 ///
1209 /// # Examples
1210 ///
1211 /// Basic usage:
1212 ///
1213 /// ```
1214 /// use orx_parallel::*;
1215 /// use std::sync::mpsc::channel;
1216 ///
1217 /// let (tx, rx) = channel();
1218 /// (0..5)
1219 /// .par()
1220 /// .map(|x| x * 2 + 1)
1221 /// .for_each(move |x| tx.send(x).unwrap());
1222 ///
1223 /// let mut v: Vec<_> = rx.iter().collect();
1224 /// v.sort(); // order can be mixed, since messages will be sent in parallel
1225 /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
1226 /// ```
1227 ///
1228 /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
1229 /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
1230 /// In the following example, we log every element that satisfies a predicate in parallel.
1231 ///
1232 /// ```
1233 /// use orx_parallel::*;
1234 ///
1235 /// (0..5)
1236 /// .par()
1237 /// .flat_map(|x| x * 100..x * 110)
1238 /// .filter(|&x| x % 3 == 0)
1239 /// .for_each(|x| println!("{x}"));
1240 /// ```
1241 fn for_each<Operation>(self, operation: Operation)
1242 where
1243 Operation: Fn(Self::Item) + Sync,
1244 {
1245 let map = |x| operation(x);
1246 let _ = self.map(map).reduce(reduce_unit);
1247 }
1248
1249 /// Returns the maximum element of an iterator.
1250 ///
1251 /// If the iterator is empty, None is returned.
1252 ///
1253 /// # Examples
1254 ///
1255 /// ```
1256 /// use orx_parallel::*;
1257 ///
1258 /// let a = vec![1, 2, 3];
1259 /// let b: Vec<u32> = Vec::new();
1260 ///
1261 /// assert_eq!(a.par().max(), Some(&3));
1262 /// assert_eq!(b.par().max(), None);
1263 /// ```
1264 fn max(self) -> Option<Self::Item>
1265 where
1266 Self::Item: Ord + Send,
1267 {
1268 self.reduce(Ord::max)
1269 }
1270
1271 /// Returns the element that gives the maximum value with respect to the specified `compare` function.
1272 ///
1273 /// If the iterator is empty, None is returned.
1274 ///
1275 /// # Examples
1276 ///
1277 /// ```
1278 /// use orx_parallel::*;
1279 ///
1280 /// let a = vec![-3_i32, 0, 1, 5, -10];
1281 /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
1282 /// ```
1283 fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1284 where
1285 Self::Item: Send,
1286 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1287 {
1288 let reduce = |x, y| match compare(&x, &y) {
1289 Ordering::Greater | Ordering::Equal => x,
1290 Ordering::Less => y,
1291 };
1292 self.reduce(reduce)
1293 }
1294
1295 /// Returns the element that gives the maximum value from the specified function.
1296 ///
1297 /// If the iterator is empty, None is returned.
1298 ///
1299 /// # Examples
1300 ///
1301 /// ```
1302 /// use orx_parallel::*;
1303 ///
1304 /// let a = vec![-3_i32, 0, 1, 5, -10];
1305 /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
1306 /// ```
1307 fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
1308 where
1309 Self::Item: Send,
1310 Key: Ord,
1311 GetKey: Fn(&Self::Item) -> Key + Sync,
1312 {
1313 let reduce = |x, y| match key(&x).cmp(&key(&y)) {
1314 Ordering::Greater | Ordering::Equal => x,
1315 Ordering::Less => y,
1316 };
1317 self.reduce(reduce)
1318 }
1319
1320 /// Returns the minimum element of an iterator.
1321 ///
1322 /// If the iterator is empty, None is returned.
1323 ///
1324 /// # Examples
1325 ///
1326 /// ```
1327 /// use orx_parallel::*;
1328 ///
1329 /// let a = vec![1, 2, 3];
1330 /// let b: Vec<u32> = Vec::new();
1331 ///
1332 /// assert_eq!(a.par().min(), Some(&1));
1333 /// assert_eq!(b.par().min(), None);
1334 /// ```
1335 fn min(self) -> Option<Self::Item>
1336 where
1337 Self::Item: Ord + Send,
1338 {
1339 self.reduce(Ord::min)
1340 }
1341
1342 /// Returns the element that gives the minimum value with respect to the specified `compare` function.
1343 ///
1344 /// If the iterator is empty, None is returned.
1345 ///
1346 /// # Examples
1347 ///
1348 /// ```
1349 /// use orx_parallel::*;
1350 ///
1351 /// let a = vec![-3_i32, 0, 1, 5, -10];
1352 /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
1353 /// ```
1354 fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1355 where
1356 Self::Item: Send,
1357 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1358 {
1359 let reduce = |x, y| match compare(&x, &y) {
1360 Ordering::Less | Ordering::Equal => x,
1361 Ordering::Greater => y,
1362 };
1363 self.reduce(reduce)
1364 }
1365
1366 /// Returns the element that gives the minimum value from the specified function.
1367 ///
1368 /// If the iterator is empty, None is returned.
1369 ///
1370 /// # Examples
1371 ///
1372 /// ```
1373 /// use orx_parallel::*;
1374 ///
1375 /// let a = vec![-3_i32, 0, 1, 5, -10];
1376 /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
1377 /// ```
1378 fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
1379 where
1380 Self::Item: Send,
1381 Key: Ord,
1382 GetKey: Fn(&Self::Item) -> Key + Sync,
1383 {
1384 let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1385 Ordering::Less | Ordering::Equal => x,
1386 Ordering::Greater => y,
1387 };
1388 self.reduce(reduce)
1389 }
1390
1391 /// Sums the elements of an iterator.
1392 ///
1393 /// Takes each element, adds them together, and returns the result.
1394 ///
1395 /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
1396 ///
1397 /// `sum` can be used to sum any type implementing [`Sum<Out>`].
1398 ///
1399 /// [`Sum<Out>`]: crate::Sum
1400 ///
1401 /// # Examples
1402 ///
1403 /// ```
1404 /// use orx_parallel::*;
1405 ///
1406 /// let a = vec![1, 2, 3];
1407 /// let sum: i32 = a.par().sum();
1408 ///
1409 /// assert_eq!(sum, 6);
1410 /// ```
1411 fn sum<Out>(self) -> Out
1412 where
1413 Self::Item: Sum<Out>,
1414 Out: Send,
1415 {
1416 self.map(Self::Item::map)
1417 .reduce(Self::Item::reduce)
1418 .unwrap_or(Self::Item::zero())
1419 }
1420
1421 // early exit
1422
1423 /// Returns the first (or any) element of the iterator; returns None if it is empty.
1424 ///
1425 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1426 /// * any element is returned if `IterationOrder::Arbitrary` is set.
1427 ///
1428 /// # Examples
1429 ///
1430 /// The following example demonstrates the usage of first with default `Ordered` iteration.
1431 /// This guarantees that the first element with respect to position in the input sequence
1432 /// is returned.
1433 ///
1434 /// ```
1435 /// use orx_parallel::*;
1436 ///
1437 /// let a: Vec<usize> = vec![];
1438 /// assert_eq!(a.par().copied().first(), None);
1439 ///
1440 /// let a = vec![1, 2, 3];
1441 /// assert_eq!(a.par().copied().first(), Some(1));
1442 ///
1443 /// let a = 1..10_000;
1444 /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
1445 /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
1446 ///
1447 /// // or equivalently,
1448 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1449 /// ```
1450 ///
1451 /// When the order is set to `Arbitrary`, `first` might return any of the elements,
1452 /// whichever is visited first depending on the parallel execution.
1453 ///
1454 /// ```
1455 /// use orx_parallel::*;
1456 ///
1457 /// let a = 1..10_000;
1458 ///
1459 /// // might return either of 3421 or 2*3421
1460 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
1461 /// assert!([3421, 2 * 3421].contains(&any));
1462 ///
1463 /// // or equivalently,
1464 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1465 /// assert!([3421, 2 * 3421].contains(&any));
1466 /// ```
1467 fn first(self) -> Option<Self::Item>
1468 where
1469 Self::Item: Send;
1470
1471 /// Searches for an element of an iterator that satisfies a `predicate`.
1472 ///
1473 /// Depending on the set iteration order of the parallel iterator, returns
1474 ///
1475 /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
1476 /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
1477 ///
1478 /// `find` takes a closure that returns true or false.
1479 /// It applies this closure to each element of the iterator,
1480 /// and returns `Some(x)` where `x` is the first element that returns true.
1481 /// If they all return false, it returns None.
1482 ///
1483 /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
1484 ///
1485 /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
1486 ///
1487 /// # Examples
1488 ///
1489 /// The following example demonstrates the usage of first with default `Ordered` iteration.
1490 /// This guarantees that the first element with respect to position in the input sequence
1491 /// is returned.
1492 ///
1493 /// ```
1494 /// use orx_parallel::*;
1495 ///
1496 /// let a = 1..10_000;
1497 /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
1498 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1499 /// ```
1500 ///
1501 /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
1502 /// whichever is found first depending on the parallel execution.
1503 ///
1504 /// ```
1505 /// use orx_parallel::*;
1506 ///
1507 /// let a = 1..10_000;
1508 ///
1509 /// // might return either of 3421 or 2*3421
1510 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1511 /// assert!([3421, 2 * 3421].contains(&any));
1512 /// ```
1513 fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
1514 where
1515 Self::Item: Send,
1516 Predicate: Fn(&Self::Item) -> bool + Sync,
1517 {
1518 self.filter(&predicate).first()
1519 }
1520}