orx_parallel/par_iter.rs
1use crate::computational_variants::fallible_option::ParOption;
2use crate::par_iter_option::{IntoOption, ParIterOption};
3use crate::par_iter_result::IntoResult;
4use crate::runner::{DefaultRunner, ParallelRunner};
5use crate::using::{UsingClone, UsingFun};
6use crate::{ParIterResult, ParThreadPool, RunnerWithPool};
7use crate::{
8 ParIterUsing, Params,
9 collect_into::ParCollectInto,
10 default_fns::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
11 parameters::{ChunkSize, IterationOrder, NumThreads},
12 special_type_sets::Sum,
13};
14use core::cmp::Ordering;
15use orx_concurrent_iter::ConcurrentIter;
16
17/// Parallel iterator.
18pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
19where
20 R: ParallelRunner,
21{
22 /// Element type of the parallel iterator.
23 type Item;
24
25 /// Returns a reference to the input concurrent iterator.
26 fn con_iter(&self) -> &impl ConcurrentIter;
27
28 /// Parameters of the parallel iterator.
29 ///
30 /// # Examples
31 ///
32 /// ```
33 /// use orx_parallel::*;
34 /// use std::num::NonZero;
35 ///
36 /// let vec = vec![1, 2, 3, 4];
37 ///
38 /// assert_eq!(
39 /// vec.par().params(),
40 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
41 /// );
42 ///
43 /// assert_eq!(
44 /// vec.par().num_threads(0).chunk_size(0).params(),
45 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
46 /// );
47 ///
48 /// assert_eq!(
49 /// vec.par().num_threads(1).params(),
50 /// Params::new(
51 /// NumThreads::Max(NonZero::new(1).unwrap()),
52 /// ChunkSize::Auto,
53 /// IterationOrder::Ordered
54 /// )
55 /// );
56 ///
57 /// assert_eq!(
58 /// vec.par().num_threads(4).chunk_size(64).params(),
59 /// Params::new(
60 /// NumThreads::Max(NonZero::new(4).unwrap()),
61 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
62 /// IterationOrder::Ordered
63 /// )
64 /// );
65 ///
66 /// assert_eq!(
67 /// vec.par()
68 /// .num_threads(8)
69 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
70 /// .iteration_order(IterationOrder::Arbitrary)
71 /// .params(),
72 /// Params::new(
73 /// NumThreads::Max(NonZero::new(8).unwrap()),
74 /// ChunkSize::Min(NonZero::new(16).unwrap()),
75 /// IterationOrder::Arbitrary
76 /// )
77 /// );
78 /// ```
79 fn params(&self) -> Params;
80
81 // params transformations
82
83 /// Sets the number of threads to be used in the parallel execution.
84 /// Integers can be used as the argument with the following mapping:
85 ///
86 /// * `0` -> `NumThreads::Auto`
87 /// * `1` -> `NumThreads::sequential()`
88 /// * `n > 0` -> `NumThreads::Max(n)`
89 ///
90 /// See [`NumThreads`] for details.
91 ///
92 /// # Examples
93 ///
94 /// ```
95 /// use orx_parallel::*;
96 /// use std::num::NonZero;
97 ///
98 /// let vec = vec![1, 2, 3, 4];
99 ///
100 /// // all available threads can be used
101 ///
102 /// assert_eq!(
103 /// vec.par().params(),
104 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
105 /// );
106 ///
107 /// assert_eq!(
108 /// vec.par().num_threads(0).chunk_size(0).params(),
109 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
110 /// );
111 ///
112 /// // computation will be executed sequentially on the main thread, no parallelization
113 ///
114 /// assert_eq!(
115 /// vec.par().num_threads(1).params(),
116 /// Params::new(
117 /// NumThreads::Max(NonZero::new(1).unwrap()),
118 /// ChunkSize::Auto,
119 /// IterationOrder::Ordered
120 /// )
121 /// );
122 ///
123 /// // maximum 4 threads can be used
124 /// assert_eq!(
125 /// vec.par().num_threads(4).chunk_size(64).params(),
126 /// Params::new(
127 /// NumThreads::Max(NonZero::new(4).unwrap()),
128 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
129 /// IterationOrder::Ordered
130 /// )
131 /// );
132 ///
133 /// // maximum 8 threads can be used
134 /// assert_eq!(
135 /// vec.par()
136 /// .num_threads(8)
137 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
138 /// .iteration_order(IterationOrder::Arbitrary)
139 /// .params(),
140 /// Params::new(
141 /// NumThreads::Max(NonZero::new(8).unwrap()),
142 /// ChunkSize::Min(NonZero::new(16).unwrap()),
143 /// IterationOrder::Arbitrary
144 /// )
145 /// );
146 /// ```
147 fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
148
149 /// Sets the number of elements to be pulled from the concurrent iterator during the
150 /// parallel execution. When integers are used as argument, the following mapping applies:
151 ///
152 /// * `0` -> `ChunkSize::Auto`
153 /// * `n > 0` -> `ChunkSize::Exact(n)`
154 ///
155 /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
156 ///
157 /// See [`ChunkSize`] for details.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// use orx_parallel::*;
163 /// use std::num::NonZero;
164 ///
165 /// let vec = vec![1, 2, 3, 4];
166 ///
167 /// // chunk sizes will be dynamically decided by the parallel runner
168 ///
169 /// assert_eq!(
170 /// vec.par().params(),
171 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
172 /// );
173 ///
174 /// assert_eq!(
175 /// vec.par().num_threads(0).chunk_size(0).params(),
176 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
177 /// );
178 ///
179 /// assert_eq!(
180 /// vec.par().num_threads(1).params(),
181 /// Params::new(
182 /// NumThreads::Max(NonZero::new(1).unwrap()),
183 /// ChunkSize::Auto,
184 /// IterationOrder::Ordered
185 /// )
186 /// );
187 ///
188 /// // chunk size will always be 64, parallel runner cannot change
189 ///
190 /// assert_eq!(
191 /// vec.par().num_threads(4).chunk_size(64).params(),
192 /// Params::new(
193 /// NumThreads::Max(NonZero::new(4).unwrap()),
194 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
195 /// IterationOrder::Ordered
196 /// )
197 /// );
198 ///
199 /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
200 ///
201 /// assert_eq!(
202 /// vec.par()
203 /// .num_threads(8)
204 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
205 /// .iteration_order(IterationOrder::Arbitrary)
206 /// .params(),
207 /// Params::new(
208 /// NumThreads::Max(NonZero::new(8).unwrap()),
209 /// ChunkSize::Min(NonZero::new(16).unwrap()),
210 /// IterationOrder::Arbitrary
211 /// )
212 /// );
213 /// ```
214 fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
215
216 /// Sets the iteration order of the parallel computation.
217 ///
218 /// See [`IterationOrder`] for details.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use orx_parallel::*;
224 ///
225 /// let vec = vec![1, 2, 3, 4];
226 ///
227 /// // results are collected in order consistent to the input order,
228 /// // or find returns the first element satisfying the predicate
229 ///
230 /// assert_eq!(
231 /// vec.par().params(),
232 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
233 /// );
234 ///
235 /// assert_eq!(
236 /// vec.par().iteration_order(IterationOrder::Ordered).params(),
237 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
238 /// );
239 ///
240 /// // results might be collected in arbitrary order
241 /// // or find returns the any of the elements satisfying the predicate
242 ///
243 /// assert_eq!(
244 /// vec.par().iteration_order(IterationOrder::Arbitrary).params(),
245 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
246 /// );
247 /// ```
248 fn iteration_order(self, order: IterationOrder) -> Self;
249
250 /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
251 ///
252 /// See also [`with_pool`].
253 ///
254 /// Parallel runner of each computation can be independently specified using `with_runner`.
255 ///
256 /// When not specified the default runner is used, which is:
257 /// * [`RunnerWithPool`] with [`StdDefaultPool`] when "std" feature is enabled,
258 /// * [`RunnerWithPool`] with [`SequentialPool`] when "std" feature is disabled.
259 ///
260 /// Note that [`StdDefaultPool`] uses standard native threads.
261 ///
262 /// When working in a no-std environment, the default runner falls back to sequential.
263 /// In this case, `RunnerWithPool` using a particular thread pool must be passed in using `with_runner`
264 /// transformation to achieve parallel computation.
265 ///
266 /// [`RunnerWithPool`]: crate::RunnerWithPool
267 /// [`StdDefaultPool`]: crate::StdDefaultPool
268 /// [`SequentialPool`]: crate::SequentialPool
269 /// [`with_pool`]: crate::ParIter::with_pool
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// use orx_parallel::*;
275 ///
276 /// let inputs: Vec<_> = (0..42).collect();
277 ///
278 /// // uses the DefaultRunner
279 /// // assuming "std" enabled, RunnerWithPool<StdDefaultPool> will be used; i.e., native threads
280 /// let sum = inputs.par().sum();
281 ///
282 /// // equivalent to:
283 /// let sum2 = inputs.par().with_runner(RunnerWithPool::from(StdDefaultPool::default())).sum();
284 /// assert_eq!(sum, sum2);
285 ///
286 /// #[cfg(feature = "scoped_threadpool")]
287 /// {
288 /// let mut pool = scoped_threadpool::Pool::new(8);
289 ///
290 /// // uses the scoped_threadpool::Pool created with 8 threads
291 /// let sum2 = inputs.par().with_runner(RunnerWithPool::from(&mut pool)).sum();
292 /// assert_eq!(sum, sum2);
293 /// }
294 ///
295 /// #[cfg(feature = "rayon-core")]
296 /// {
297 /// let pool = rayon_core::ThreadPoolBuilder::new()
298 /// .num_threads(8)
299 /// .build()
300 /// .unwrap();
301 ///
302 /// // uses the rayon-core::ThreadPool created with 8 threads
303 /// let sum2 = inputs.par().with_runner(RunnerWithPool::from(&pool)).sum();
304 /// assert_eq!(sum, sum2);
305 /// }
306 /// ```
307 fn with_runner<Q: ParallelRunner>(self, runner: Q) -> impl ParIter<Q, Item = Self::Item>;
308
309 /// Rather than [`DefaultPool`], uses the parallel runner with the given `pool` implementing
310 /// [`ParThreadPool`].
311 ///
312 /// See also [`with_runner`].
313 ///
314 /// Thread pool of each computation can be independently specified using `with_pool`.
315 ///
316 /// When not specified the default pool is used, which is:
317 /// * [`StdDefaultPool`] when "std" feature is enabled,
318 /// * [`SequentialPool`] when "std" feature is disabled.
319 ///
320 /// Note that [`StdDefaultPool`] uses standard native threads.
321 ///
322 /// When working in a no-std environment, the default pool falls back to sequential.
323 /// In this case, a thread pool must be passed in using `with_pool` transformation to achieve parallel computation.
324 ///
325 /// Note that if a thread pool, say `pool`, is of a type that implements [`ParThreadPool`]; then:
326 /// * `with_pool` can be called with owned value `with_pool(pool)` for all implementors; but also,
327 /// * with a shared reference `with_pool(&pool)` for most of the implementations (eg: rayon-core, yastl), and
328 /// * with a mutable reference `with_pool(&mut pool)` for others (eg: scoped_threadpool).
329 ///
330 /// [`DefaultPool`]: crate::DefaultPool
331 /// [`RunnerWithPool`]: crate::RunnerWithPool
332 /// [`StdDefaultPool`]: crate::StdDefaultPool
333 /// [`SequentialPool`]: crate::SequentialPool
334 /// [`with_runner`]: crate::ParIter::with_runner
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// use orx_parallel::*;
340 ///
341 /// let inputs: Vec<_> = (0..42).collect();
342 ///
343 /// // uses the DefaultPool
344 /// // assuming "std" enabled, StdDefaultPool will be used; i.e., native threads
345 /// let sum = inputs.par().sum();
346 ///
347 /// // equivalent to:
348 /// let sum2 = inputs.par().with_pool(StdDefaultPool::default()).sum();
349 /// assert_eq!(sum, sum2);
350 ///
351 /// #[cfg(feature = "scoped_threadpool")]
352 /// {
353 /// let mut pool = scoped_threadpool::Pool::new(8);
354 ///
355 /// // uses the scoped_threadpool::Pool created with 8 threads
356 /// let sum2 = inputs.par().with_pool(&mut pool).sum();
357 /// assert_eq!(sum, sum2);
358 /// }
359 ///
360 /// #[cfg(feature = "rayon-core")]
361 /// {
362 /// let pool = rayon_core::ThreadPoolBuilder::new()
363 /// .num_threads(8)
364 /// .build()
365 /// .unwrap();
366 ///
367 /// // uses the rayon-core::ThreadPool created with 8 threads
368 /// let sum2 = inputs.par().with_pool(&pool).sum();
369 /// assert_eq!(sum, sum2);
370 /// }
371 /// ```
372 fn with_pool<P: ParThreadPool>(
373 self,
374 pool: P,
375 ) -> impl ParIter<RunnerWithPool<P, R::Executor>, Item = Self::Item> {
376 let runner = RunnerWithPool::from(pool).with_executor::<R::Executor>();
377 self.with_runner(runner)
378 }
379
380 // using transformations
381
382 /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
383 /// used variable throughout the computation.
384 ///
385 /// Note that each used thread will obtain exactly one instance of the variable.
386 ///
387 /// The signature of the `using` closure is `(thread_idx: usize) -> U` which will create an instance of
388 /// `U` with respect to the `thread_idx`. The `thread_idx` is the order of the spawned thread; i.e.,
389 /// if the parallel computation uses 8 threads, the thread indices will be 0, 1, ..., 7.
390 ///
391 /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
392 ///
393 /// # Examples
394 ///
395 /// ## Example 1: Channels
396 ///
397 /// The following example is taken from rayon's `for_each_with` documentation and converted to using transformation:
398 ///
399 /// ```ignore
400 /// use orx_parallel::*;
401 /// use std::sync::mpsc::channel;
402 ///
403 /// let (sender, receiver) = channel();
404 ///
405 /// (0..5)
406 /// .into_par()
407 /// .using(|_thread_idx| sender.clone())
408 /// .for_each(|s, x| s.send(x).unwrap());
409 ///
410 /// let mut res: Vec<_> = receiver.iter().collect();
411 ///
412 /// res.sort();
413 ///
414 /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
415 /// ```
416 ///
417 /// ## Example 2: Random Number Generator
418 ///
419 /// Random number generator is one of the common use cases that is important for a certain class of algorithms.
420 ///
421 /// The following example demonstrates how to safely generate random numbers through mutable references within
422 /// a parallel computation.
423 ///
424 /// Notice the differences between sequential and parallel computation.
425 /// * In sequential computation, a mutable reference to `rng` is captured, while in parallel computation, we
426 /// explicitly define that we will be `using` a random number generator.
427 /// * Parallel iterator does not mutable capture any variable from the scope; however, using transformation
428 /// converts the `ParIter` into `ParIterUsing` which allows mutable access within all iterator methods.
429 ///
430 /// ```
431 /// use orx_parallel::*;
432 /// use rand::{Rng, SeedableRng};
433 /// use rand_chacha::ChaCha20Rng;
434 ///
435 /// fn random_walk(rng: &mut impl Rng, position: i64, num_steps: usize) -> i64 {
436 /// (0..num_steps).fold(position, |p, _| random_step(rng, p))
437 /// }
438 ///
439 /// fn random_step(rng: &mut impl Rng, position: i64) -> i64 {
440 /// match rng.random_bool(0.5) {
441 /// true => position + 1, // to right
442 /// false => position - 1, // to left
443 /// }
444 /// }
445 ///
446 /// fn input_positions() -> Vec<i64> {
447 /// (-100..=100).collect()
448 /// }
449 ///
450 /// fn sequential() {
451 /// let positions = input_positions();
452 ///
453 /// let mut rng = ChaCha20Rng::seed_from_u64(42);
454 /// let final_positions: Vec<_> = positions
455 /// .iter()
456 /// .copied()
457 /// .map(|position| random_walk(&mut rng, position, 10))
458 /// .collect();
459 /// let sum_final_positions = final_positions.iter().sum::<i64>();
460 /// }
461 ///
462 /// fn parallel() {
463 /// let positions = input_positions();
464 ///
465 /// let final_positions: Vec<_> = positions
466 /// .par()
467 /// .copied()
468 /// .using(|t_idx| ChaCha20Rng::seed_from_u64(42 * t_idx as u64))
469 /// .map(|rng, position| random_walk(rng, position, 10))
470 /// .collect();
471 /// let sum_final_positions = final_positions.iter().sum::<i64>();
472 /// }
473 ///
474 /// sequential();
475 /// parallel();
476 /// ```
477 ///
478 /// ## Example 3: Metrics Collection
479 ///
480 /// The following example demonstrates how to collect metrics about a parallel computation with `using` transformation and
481 /// some `unsafe` help with interior mutability.
482 ///
483 /// ```
484 /// use orx_parallel::*;
485 /// use std::cell::UnsafeCell;
486 ///
487 /// const N: u64 = 1_000;
488 /// const MAX_NUM_THREADS: usize = 4;
489 ///
490 /// // just some work
491 /// fn fibonacci(n: u64) -> u64 {
492 /// let mut a = 0;
493 /// let mut b = 1;
494 /// for _ in 0..n {
495 /// let c = a + b;
496 /// a = b;
497 /// b = c;
498 /// }
499 /// a
500 /// }
501 ///
502 /// #[derive(Default, Debug)]
503 /// struct ThreadMetrics {
504 /// thread_idx: usize,
505 /// num_items_handled: usize,
506 /// handled_42: bool,
507 /// num_filtered_out: usize,
508 /// }
509 ///
510 /// struct ThreadMetricsWriter<'a> {
511 /// metrics_ref: &'a mut ThreadMetrics,
512 /// }
513 ///
514 /// struct ComputationMetrics {
515 /// thread_metrics: UnsafeCell<[ThreadMetrics; MAX_NUM_THREADS]>,
516 /// }
517 /// unsafe impl Sync for ComputationMetrics {}
518 /// impl ComputationMetrics {
519 /// fn new() -> Self {
520 /// let mut thread_metrics: [ThreadMetrics; MAX_NUM_THREADS] = Default::default();
521 /// for i in 0..MAX_NUM_THREADS {
522 /// thread_metrics[i].thread_idx = i;
523 /// }
524 /// Self {
525 /// thread_metrics: UnsafeCell::new(thread_metrics),
526 /// }
527 /// }
528 /// }
529 ///
530 /// impl ComputationMetrics {
531 /// unsafe fn create_for_thread<'a>(&self, thread_idx: usize) -> ThreadMetricsWriter<'a> {
532 /// // SAFETY: here we create a mutable variable to the thread_idx-th metrics
533 /// // * If we call this method multiple times with the same index,
534 /// // we create multiple mutable references to the same ThreadMetrics,
535 /// // which would lead to a race condition.
536 /// // * We must make sure that `create_for_thread` is called only once per thread.
537 /// // * If we use `create_for_thread` within the `using` call to create mutable values
538 /// // used by the threads, we are certain that the parallel computation
539 /// // will only call this method once per thread; hence, it will not
540 /// // cause the race condition.
541 /// // * On the other hand, we must ensure that we do not call this method
542 /// // externally.
543 /// let array = unsafe { &mut *self.thread_metrics.get() };
544 /// ThreadMetricsWriter {
545 /// metrics_ref: &mut array[thread_idx],
546 /// }
547 /// }
548 /// }
549 ///
550 /// let mut metrics = ComputationMetrics::new();
551 ///
552 /// let input: Vec<u64> = (0..N).collect();
553 ///
554 /// let sum = input
555 /// .par()
556 /// // SAFETY: we do not call `create_for_thread` externally;
557 /// // it is safe if it is called only by the parallel computation.
558 /// // Since we unsafely implement Sync for ComputationMetrics,
559 /// // we must ensure that ComputationMetrics is not used elsewhere.
560 /// .using(|t| unsafe { metrics.create_for_thread(t) })
561 /// .map(|m: &mut ThreadMetricsWriter<'_>, i| {
562 /// // collect some useful metrics
563 /// m.metrics_ref.num_items_handled += 1;
564 /// m.metrics_ref.handled_42 |= *i == 42;
565 ///
566 /// // actual work
567 /// fibonacci((*i % 20) + 1) % 100
568 /// })
569 /// .filter(|m, i| {
570 /// let is_even = i % 2 == 0;
571 ///
572 /// if !is_even {
573 /// m.metrics_ref.num_filtered_out += 1;
574 /// }
575 ///
576 /// is_even
577 /// })
578 /// .num_threads(MAX_NUM_THREADS)
579 /// .sum();
580 ///
581 /// assert_eq!(sum, 9100);
582 ///
583 /// let total_by_metrics: usize = metrics
584 /// .thread_metrics
585 /// .get_mut()
586 /// .iter()
587 /// .map(|x| x.num_items_handled)
588 /// .sum();
589 ///
590 /// assert_eq!(N as usize, total_by_metrics);
591 /// ```
592 ///
593 fn using<U, F>(
594 self,
595 using: F,
596 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
597 where
598 U: 'static,
599 F: Fn(usize) -> U + Sync;
600
601 /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
602 /// used variable throughout the computation.
603 ///
604 /// Note that each used thread will obtain exactly one instance of the variable.
605 ///
606 /// Each used thread receives a clone of the provided `value`.
607 /// Note that, `using_clone(value)` can be considered as a shorthand for `using(|_thread_idx| value.clone())`.
608 ///
609 /// Please see [`using`] for examples.
610 ///
611 /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
612 ///
613 /// [`using`]: crate::ParIter::using
614 fn using_clone<U>(
615 self,
616 value: U,
617 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
618 where
619 U: Clone + 'static;
620
621 // transformations into fallible computations
622
623 /// Transforms a parallel iterator where elements are of the result type; i.e., `ParIter<R, Item = Result<T, E>>`,
624 /// into fallible parallel iterator with item type `T` and error type `E`; i.e., into `ParIterResult<R, Item = T, Err = E>`.
625 ///
626 /// `ParIterResult` is also a parallel iterator; however, with methods specialized for handling fallible computations
627 /// as follows:
628 ///
629 /// * All of its methods are based on the success path with item type of `T`.
630 /// * However, computations short-circuit and immediately return the observed error if any of the items
631 /// is of the `Err` variant of the result enum.
632 ///
633 /// See [`ParIterResult`] for details.
634 ///
635 /// # Examples
636 ///
637 /// ```
638 /// use orx_parallel::*;
639 ///
640 /// // all succeeds
641 ///
642 /// let result_doubled: Result<Vec<i32>, _> = ["1", "2", "3"]
643 /// .into_par()
644 /// .map(|x| x.parse::<i32>()) // ParIter with Item=Result<i32, ParseIntError>
645 /// .into_fallible_result() // ParIterResult with Item=i32 and Err=ParseIntError
646 /// .map(|x| x * 2) // methods focus on the success path with Item=i32
647 /// .collect(); // methods return Result<_, Err>
648 /// // where the Ok variant depends on the computation
649 ///
650 /// assert_eq!(result_doubled, Ok(vec![2, 4, 6]));
651 ///
652 /// // at least one fails
653 ///
654 /// let result_doubled: Result<Vec<i32>, _> = ["1", "x!", "3"]
655 /// .into_par()
656 /// .map(|x| x.parse::<i32>())
657 /// .into_fallible_result()
658 /// .map(|x| x * 2)
659 /// .collect();
660 ///
661 /// assert!(result_doubled.is_err());
662 /// ```
663 fn into_fallible_result<T, E>(self) -> impl ParIterResult<R, Item = T, Err = E>
664 where
665 Self::Item: IntoResult<T, E>;
666
667 /// Transforms a parallel iterator where elements are of the option type; i.e., `ParIter<R, Item = Option<T>>`,
668 /// into fallible parallel iterator with item type `T`; i.e., into `ParIterOption<R, Item = T>`.
669 ///
670 /// `ParIterOption` is also a parallel iterator; however, with methods specialized for handling fallible computations
671 /// as follows:
672 ///
673 /// * All of its methods are based on the success path with item type of `T`.
674 /// * However, computations short-circuit and immediately return None if any of the items
675 /// is of the `None` variant of the option enum.
676 ///
677 /// See [`ParIterResult`] for details.
678 ///
679 /// # Examples
680 ///
681 /// ```
682 /// use orx_parallel::*;
683 ///
684 /// // all succeeds
685 ///
686 /// let result_doubled: Option<Vec<i32>> = ["1", "2", "3"]
687 /// .into_par()
688 /// .map(|x| x.parse::<i32>().ok()) // ParIter with Item=Option<i32>
689 /// .into_fallible_option() // ParIterOption with Item=i32
690 /// .map(|x| x * 2) // methods focus on the success path with Item=i32
691 /// .collect(); // methods return Option<T>
692 /// // where T depends on the computation
693 ///
694 /// assert_eq!(result_doubled, Some(vec![2, 4, 6]));
695 ///
696 /// // at least one fails
697 ///
698 /// let result_doubled: Option<Vec<i32>> = ["1", "x!", "3"]
699 /// .into_par()
700 /// .map(|x| x.parse::<i32>().ok())
701 /// .into_fallible_option()
702 /// .map(|x| x * 2)
703 /// .collect();
704 ///
705 /// assert_eq!(result_doubled, None);
706 /// ```
707 fn into_fallible_option<T>(self) -> impl ParIterOption<R, Item = T>
708 where
709 Self::Item: IntoOption<T>,
710 {
711 ParOption::new(
712 self.map(|x| x.into_result_with_unit_err())
713 .into_fallible_result(),
714 )
715 }
716
717 // computation transformations
718
719 /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
720 ///
721 /// # Examples
722 ///
723 /// ```
724 /// use orx_parallel::*;
725 ///
726 /// let a = [1, 2, 3];
727 ///
728 /// let iter = a.into_par().map(|x| 2 * x);
729 ///
730 /// let b: Vec<_> = iter.collect();
731 /// assert_eq!(b, &[2, 4, 6]);
732 /// ```
733 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
734 where
735 Map: Fn(Self::Item) -> Out + Sync + Clone;
736
737 /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
738 ///
739 /// # Examples
740 ///
741 /// ```
742 /// use orx_parallel::*;
743 ///
744 /// let a = [1, 2, 3];
745 ///
746 /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
747 ///
748 /// let b: Vec<_> = iter.collect();
749 /// assert_eq!(b, &[1, 3]);
750 /// ```
751 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
752 where
753 Filter: Fn(&Self::Item) -> bool + Sync + Clone;
754
755 /// Creates an iterator that works like map, but flattens nested structure.
756 ///
757 /// # Examples
758 ///
759 /// ```
760 /// use orx_parallel::*;
761 ///
762 /// let words = ["alpha", "beta", "gamma"];
763 ///
764 /// // chars() returns an iterator
765 /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
766 ///
767 /// let merged: String = all_chars.iter().collect();
768 /// assert_eq!(merged, "alphabetagamma");
769 /// ```
770 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
771 where
772 IOut: IntoIterator,
773 FlatMap: Fn(Self::Item) -> IOut + Sync + Clone;
774
775 /// Creates an iterator that both filters and maps.
776 ///
777 /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
778 ///
779 /// `filter_map` can be used to make chains of `filter` and `map` more concise.
780 /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
781 ///
782 /// # Examples
783 ///
784 /// ```
785 /// use orx_parallel::*;
786 ///
787 /// let a = ["1", "two", "NaN", "four", "5"];
788 ///
789 /// let numbers: Vec<_> = a
790 /// .into_par()
791 /// .filter_map(|s| s.parse::<usize>().ok())
792 /// .collect();
793 ///
794 /// assert_eq!(numbers, [1, 5]);
795 /// ```
796 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
797 where
798 FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone;
799
800 /// Does something with each element of an iterator, passing the value on.
801 ///
802 /// When using iterators, you’ll often chain several of them together.
803 /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
804 /// To do that, insert a call to `inspect()`.
805 ///
806 /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
807 /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
808 ///
809 /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
810 /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
811 /// collect some intermediate values during parallel execution for further inspection.
812 /// The following example demonstrates such a use case.
813 ///
814 /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
815 ///
816 /// # Examples
817 ///
818 /// ```
819 /// use orx_parallel::*;
820 /// use orx_concurrent_bag::*;
821 ///
822 /// let a = vec![1, 4, 2, 3];
823 ///
824 /// // let's add some inspect() calls to investigate what's happening
825 /// // - log some events
826 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
827 /// let bag = ConcurrentBag::new();
828 ///
829 /// let sum = a
830 /// .par()
831 /// .copied()
832 /// .inspect(|x| println!("about to filter: {x}"))
833 /// .filter(|x| x % 2 == 0)
834 /// .inspect(|x| {
835 /// bag.push(*x);
836 /// println!("made it through filter: {x}");
837 /// })
838 /// .sum();
839 /// println!("{sum}");
840 ///
841 /// let mut values_made_through = bag.into_inner();
842 /// values_made_through.sort();
843 /// assert_eq!(values_made_through, [2, 4]);
844 /// ```
845 ///
846 /// This will print:
847 ///
848 /// ```console
849 /// about to filter: 1
850 /// about to filter: 4
851 /// made it through filter: 4
852 /// about to filter: 2
853 /// made it through filter: 2
854 /// about to filter: 3
855 /// 6
856 /// ```
857 fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
858 where
859 Operation: Fn(&Self::Item) + Sync + Clone,
860 {
861 let map = move |x| {
862 operation(&x);
863 x
864 };
865 self.map(map)
866 }
867
868 // while transformations
869
870 /// Creates an iterator that yields elements based on the predicate `take_while`.
871 ///
872 /// It will call this closure on each element of the iterator, and yield elements while it returns true.
873 ///
874 /// After false is returned, take_while’s job is over, and the rest of the elements are ignored.
875 ///
876 /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
877 ///
878 /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
879 /// elements before the predicate returns false are collected in the correct order; and hence, the result
880 /// is deterministic.
881 ///
882 /// # Examples
883 ///
884 /// ```
885 /// use orx_parallel::*;
886 ///
887 /// let iter = (0..10_000).par().take_while(|x| *x != 5_000);
888 /// let b: Vec<_> = iter.collect();
889 ///
890 /// assert_eq!(b, (0..5_000).collect::<Vec<_>>());
891 /// ```
892 fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
893 where
894 While: Fn(&Self::Item) -> bool + Sync + Clone;
895
896 /// Creates an iterator that both yields elements based on the predicate `map_while` and maps.
897 ///
898 /// `map_while` takes a closure as an argument.
899 /// It will call this closure on each element of the iterator, and yield elements while it returns Some(_).
900 ///
901 /// After None is returned, map_while job is over, and the rest of the elements are ignored.
902 ///
903 /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
904 ///
905 /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
906 /// elements before the predicate returns None are collected in the correct order; and hence, the result
907 /// is deterministic.
908 ///
909 /// ```
910 /// use orx_parallel::*;
911 ///
912 /// let iter = (0..10_000)
913 /// .par()
914 /// .map(|x| x as i32 - 5_000) // -5_000..10_000
915 /// .map_while(|x| 16i32.checked_div(x));
916 /// let b: Vec<_> = iter.collect();
917 ///
918 /// assert_eq!(b, (-5_000..0).map(|x| 16 / x).collect::<Vec<_>>());
919 /// ```
920 fn map_while<Out, MapWhile>(self, map_while: MapWhile) -> impl ParIter<R, Item = Out>
921 where
922 MapWhile: Fn(Self::Item) -> Option<Out> + Sync + Clone,
923 {
924 self.map(map_while).take_while(|x| x.is_some()).map(|x| {
925 // SAFETY: since x passed the whilst(is-some) check, unwrap_unchecked
926 unsafe { x.unwrap_unchecked() }
927 })
928 }
929
930 // special item transformations
931
932 /// Creates an iterator which copies all of its elements.
933 ///
934 /// # Examples
935 ///
936 /// ```
937 /// use orx_parallel::*;
938 ///
939 /// let a = vec![1, 2, 3];
940 ///
941 /// let v_copied: Vec<_> = a.par().copied().collect();
942 ///
943 /// // copied is the same as .map(|&x| x)
944 /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
945 ///
946 /// assert_eq!(v_copied, vec![1, 2, 3]);
947 /// assert_eq!(v_map, vec![1, 2, 3]);
948 /// ```
949 fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
950 where
951 T: 'a + Copy,
952 Self: ParIter<R, Item = &'a T>,
953 {
954 self.map(map_copy)
955 }
956
957 /// Creates an iterator which clones all of its elements.
958 ///
959 /// # Examples
960 ///
961 /// ```
962 /// use orx_parallel::*;
963 ///
964 /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
965 ///
966 /// let v_cloned: Vec<_> = a.par().cloned().collect();
967 ///
968 /// // cloned is the same as .map(|x| x.clone())
969 /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
970 ///
971 /// assert_eq!(
972 /// v_cloned,
973 /// vec![String::from("1"), String::from("2"), String::from("3")]
974 /// );
975 /// assert_eq!(
976 /// v_map,
977 /// vec![String::from("1"), String::from("2"), String::from("3")]
978 /// );
979 /// ```
980 fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
981 where
982 T: 'a + Clone,
983 Self: ParIter<R, Item = &'a T>,
984 {
985 self.map(map_clone)
986 }
987
988 /// Creates an iterator that flattens nested structure.
989 ///
990 /// This is useful when you have an iterator of iterators or an iterator of things that can be
991 /// turned into iterators and you want to remove one level of indirection.
992 ///
993 /// # Examples
994 ///
995 /// Basic usage.
996 ///
997 /// ```
998 /// use orx_parallel::*;
999 ///
1000 /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
1001 /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
1002 /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
1003 /// ```
1004 ///
1005 /// Mapping and then flattening:
1006 ///
1007 /// ```
1008 /// use orx_parallel::*;
1009 ///
1010 /// let words = vec!["alpha", "beta", "gamma"];
1011 ///
1012 /// // chars() returns an iterator
1013 /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
1014 /// let merged: String = all_characters.into_iter().collect();
1015 /// assert_eq!(merged, "alphabetagamma");
1016 /// ```
1017 ///
1018 /// But actually, you can write this in terms of `flat_map`,
1019 /// which is preferable in this case since it conveys intent more clearly:
1020 ///
1021 /// ```
1022 /// use orx_parallel::*;
1023 ///
1024 /// let words = vec!["alpha", "beta", "gamma"];
1025 ///
1026 /// // chars() returns an iterator
1027 /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
1028 /// let merged: String = all_characters.into_iter().collect();
1029 /// assert_eq!(merged, "alphabetagamma");
1030 /// ```
1031 fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
1032 where
1033 Self::Item: IntoIterator,
1034 {
1035 let map = |e: Self::Item| e.into_iter();
1036 self.flat_map(map)
1037 }
1038
1039 // collect
1040
1041 /// Collects all the items from an iterator into a collection.
1042 ///
1043 /// This is useful when you already have a collection and want to add the iterator items to it.
1044 ///
1045 /// The collection is passed in as owned value, and returned back with the additional elements.
1046 ///
1047 /// All collections implementing [`ParCollectInto`] can be used to collect into.
1048 ///
1049 /// [`ParCollectInto`]: crate::ParCollectInto
1050 ///
1051 /// # Examples
1052 ///
1053 /// ```
1054 /// use orx_parallel::*;
1055 ///
1056 /// let a = vec![1, 2, 3];
1057 ///
1058 /// let vec: Vec<i32> = vec![0, 1];
1059 /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
1060 /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
1061 ///
1062 /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
1063 /// ```
1064 fn collect_into<C>(self, output: C) -> C
1065 where
1066 C: ParCollectInto<Self::Item>;
1067
1068 /// Transforms an iterator into a collection.
1069 ///
1070 /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
1071 /// the type of the result collection; or turbofish annotation can be used.
1072 ///
1073 /// All collections implementing [`ParCollectInto`] can be used to collect into.
1074 ///
1075 /// [`ParCollectInto`]: crate::ParCollectInto
1076 ///
1077 /// # Examples
1078 ///
1079 /// ```
1080 /// use orx_parallel::*;
1081 ///
1082 /// let a = vec![1, 2, 3];
1083 ///
1084 /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
1085 ///
1086 /// assert_eq!(vec![2, 4, 6], doubled);
1087 /// ```
1088 fn collect<C>(self) -> C
1089 where
1090 C: ParCollectInto<Self::Item>,
1091 {
1092 let output = C::empty(self.con_iter().try_get_len());
1093 self.collect_into(output)
1094 }
1095
1096 // reduce
1097
1098 /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
1099 ///
1100 /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
1101 ///
1102 /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
1103 ///
1104 /// # Example
1105 ///
1106 /// ```
1107 /// use orx_parallel::*;
1108 ///
1109 /// let inputs = 1..10;
1110 /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
1111 /// assert_eq!(reduced, 45);
1112 /// ```
1113 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
1114 where
1115 Self::Item: Send,
1116 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
1117
1118 /// Tests if every element of the iterator matches a predicate.
1119 ///
1120 /// `all` takes a `predicate` that returns true or false.
1121 /// It applies this closure to each element of the iterator,
1122 /// and if they all return true, then so does `all`.
1123 /// If any of them returns false, it returns false.
1124 ///
1125 /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
1126 /// given that no matter what else happens, the result will also be false.
1127 ///
1128 /// An empty iterator returns true.
1129 ///
1130 /// # Examples
1131 ///
1132 /// ```
1133 /// use orx_parallel::*;
1134 ///
1135 /// let mut a = vec![1, 2, 3];
1136 /// assert!(a.par().all(|x| **x > 0));
1137 /// assert!(!a.par().all(|x| **x > 2));
1138 ///
1139 /// a.clear();
1140 /// assert!(a.par().all(|x| **x > 2)); // empty iterator
1141 /// ```
1142 fn all<Predicate>(self, predicate: Predicate) -> bool
1143 where
1144 Self::Item: Send,
1145 Predicate: Fn(&Self::Item) -> bool + Sync,
1146 {
1147 let violates = |x: &Self::Item| !predicate(x);
1148 self.find(violates).is_none()
1149 }
1150
1151 /// Tests if any element of the iterator matches a predicate.
1152 ///
1153 /// `any` takes a `predicate` that returns true or false.
1154 /// It applies this closure to each element of the iterator,
1155 /// and if any of the elements returns true, then so does `any`.
1156 /// If all of them return false, it returns false.
1157 ///
1158 /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
1159 /// given that no matter what else happens, the result will also be true.
1160 ///
1161 /// An empty iterator returns false.
1162 ///
1163 /// # Examples
1164 ///
1165 /// ```
1166 /// use orx_parallel::*;
1167 ///
1168 /// let mut a = vec![1, 2, 3];
1169 /// assert!(a.par().any(|x| **x > 0));
1170 /// assert!(!a.par().any(|x| **x > 5));
1171 ///
1172 /// a.clear();
1173 /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
1174 /// ```
1175 fn any<Predicate>(self, predicate: Predicate) -> bool
1176 where
1177 Self::Item: Send,
1178 Predicate: Fn(&Self::Item) -> bool + Sync,
1179 {
1180 self.find(predicate).is_some()
1181 }
1182
1183 /// Consumes the iterator, counting the number of iterations and returning it.
1184 ///
1185 /// # Examples
1186 ///
1187 /// ```
1188 /// use orx_parallel::*;
1189 ///
1190 /// let a = vec![1, 2, 3];
1191 /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
1192 /// ```
1193 fn count(self) -> usize {
1194 self.map(map_count).reduce(reduce_sum).unwrap_or(0)
1195 }
1196
1197 /// Calls a closure on each element of an iterator.
1198 ///
1199 /// # Examples
1200 ///
1201 /// Basic usage:
1202 ///
1203 /// ```
1204 /// use orx_parallel::*;
1205 /// use std::sync::mpsc::channel;
1206 ///
1207 /// let (tx, rx) = channel();
1208 /// (0..5)
1209 /// .par()
1210 /// .map(|x| x * 2 + 1)
1211 /// .for_each(move |x| tx.send(x).unwrap());
1212 ///
1213 /// let mut v: Vec<_> = rx.iter().collect();
1214 /// v.sort(); // order can be mixed, since messages will be sent in parallel
1215 /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
1216 /// ```
1217 ///
1218 /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
1219 /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
1220 /// In the following example, we log every element that satisfies a predicate in parallel.
1221 ///
1222 /// ```
1223 /// use orx_parallel::*;
1224 ///
1225 /// (0..5)
1226 /// .par()
1227 /// .flat_map(|x| x * 100..x * 110)
1228 /// .filter(|&x| x % 3 == 0)
1229 /// .for_each(|x| println!("{x}"));
1230 /// ```
1231 fn for_each<Operation>(self, operation: Operation)
1232 where
1233 Operation: Fn(Self::Item) + Sync,
1234 {
1235 let map = |x| operation(x);
1236 let _ = self.map(map).reduce(reduce_unit);
1237 }
1238
1239 /// Returns the maximum element of an iterator.
1240 ///
1241 /// If the iterator is empty, None is returned.
1242 ///
1243 /// # Examples
1244 ///
1245 /// ```
1246 /// use orx_parallel::*;
1247 ///
1248 /// let a = vec![1, 2, 3];
1249 /// let b: Vec<u32> = Vec::new();
1250 ///
1251 /// assert_eq!(a.par().max(), Some(&3));
1252 /// assert_eq!(b.par().max(), None);
1253 /// ```
1254 fn max(self) -> Option<Self::Item>
1255 where
1256 Self::Item: Ord + Send,
1257 {
1258 self.reduce(Ord::max)
1259 }
1260
1261 /// Returns the element that gives the maximum value with respect to the specified `compare` function.
1262 ///
1263 /// If the iterator is empty, None is returned.
1264 ///
1265 /// # Examples
1266 ///
1267 /// ```
1268 /// use orx_parallel::*;
1269 ///
1270 /// let a = vec![-3_i32, 0, 1, 5, -10];
1271 /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
1272 /// ```
1273 fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1274 where
1275 Self::Item: Send,
1276 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1277 {
1278 let reduce = |x, y| match compare(&x, &y) {
1279 Ordering::Greater | Ordering::Equal => x,
1280 Ordering::Less => y,
1281 };
1282 self.reduce(reduce)
1283 }
1284
1285 /// Returns the element that gives the maximum value from the specified function.
1286 ///
1287 /// If the iterator is empty, None is returned.
1288 ///
1289 /// # Examples
1290 ///
1291 /// ```
1292 /// use orx_parallel::*;
1293 ///
1294 /// let a = vec![-3_i32, 0, 1, 5, -10];
1295 /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
1296 /// ```
1297 fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
1298 where
1299 Self::Item: Send,
1300 Key: Ord,
1301 GetKey: Fn(&Self::Item) -> Key + Sync,
1302 {
1303 let reduce = |x, y| match key(&x).cmp(&key(&y)) {
1304 Ordering::Greater | Ordering::Equal => x,
1305 Ordering::Less => y,
1306 };
1307 self.reduce(reduce)
1308 }
1309
1310 /// Returns the minimum element of an iterator.
1311 ///
1312 /// If the iterator is empty, None is returned.
1313 ///
1314 /// # Examples
1315 ///
1316 /// ```
1317 /// use orx_parallel::*;
1318 ///
1319 /// let a = vec![1, 2, 3];
1320 /// let b: Vec<u32> = Vec::new();
1321 ///
1322 /// assert_eq!(a.par().min(), Some(&1));
1323 /// assert_eq!(b.par().min(), None);
1324 /// ```
1325 fn min(self) -> Option<Self::Item>
1326 where
1327 Self::Item: Ord + Send,
1328 {
1329 self.reduce(Ord::min)
1330 }
1331
1332 /// Returns the element that gives the minimum value with respect to the specified `compare` function.
1333 ///
1334 /// If the iterator is empty, None is returned.
1335 ///
1336 /// # Examples
1337 ///
1338 /// ```
1339 /// use orx_parallel::*;
1340 ///
1341 /// let a = vec![-3_i32, 0, 1, 5, -10];
1342 /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
1343 /// ```
1344 fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1345 where
1346 Self::Item: Send,
1347 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1348 {
1349 let reduce = |x, y| match compare(&x, &y) {
1350 Ordering::Less | Ordering::Equal => x,
1351 Ordering::Greater => y,
1352 };
1353 self.reduce(reduce)
1354 }
1355
1356 /// Returns the element that gives the minimum value from the specified function.
1357 ///
1358 /// If the iterator is empty, None is returned.
1359 ///
1360 /// # Examples
1361 ///
1362 /// ```
1363 /// use orx_parallel::*;
1364 ///
1365 /// let a = vec![-3_i32, 0, 1, 5, -10];
1366 /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
1367 /// ```
1368 fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
1369 where
1370 Self::Item: Send,
1371 Key: Ord,
1372 GetKey: Fn(&Self::Item) -> Key + Sync,
1373 {
1374 let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1375 Ordering::Less | Ordering::Equal => x,
1376 Ordering::Greater => y,
1377 };
1378 self.reduce(reduce)
1379 }
1380
1381 /// Sums the elements of an iterator.
1382 ///
1383 /// Takes each element, adds them together, and returns the result.
1384 ///
1385 /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
1386 ///
1387 /// `sum` can be used to sum any type implementing [`Sum<Out>`].
1388 ///
1389 /// [`Sum<Out>`]: crate::Sum
1390 ///
1391 /// # Examples
1392 ///
1393 /// ```
1394 /// use orx_parallel::*;
1395 ///
1396 /// let a = vec![1, 2, 3];
1397 /// let sum: i32 = a.par().sum();
1398 ///
1399 /// assert_eq!(sum, 6);
1400 /// ```
1401 fn sum<Out>(self) -> Out
1402 where
1403 Self::Item: Sum<Out>,
1404 Out: Send,
1405 {
1406 self.map(Self::Item::map)
1407 .reduce(Self::Item::reduce)
1408 .unwrap_or(Self::Item::zero())
1409 }
1410
1411 // early exit
1412
1413 /// Returns the first (or any) element of the iterator; returns None if it is empty.
1414 ///
1415 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1416 /// * any element is returned if `IterationOrder::Arbitrary` is set.
1417 ///
1418 /// # Examples
1419 ///
1420 /// The following example demonstrates the usage of first with default `Ordered` iteration.
1421 /// This guarantees that the first element with respect to position in the input sequence
1422 /// is returned.
1423 ///
1424 /// ```
1425 /// use orx_parallel::*;
1426 ///
1427 /// let a: Vec<usize> = vec![];
1428 /// assert_eq!(a.par().copied().first(), None);
1429 ///
1430 /// let a = vec![1, 2, 3];
1431 /// assert_eq!(a.par().copied().first(), Some(1));
1432 ///
1433 /// let a = 1..10_000;
1434 /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
1435 /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
1436 ///
1437 /// // or equivalently,
1438 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1439 /// ```
1440 ///
1441 /// When the order is set to `Arbitrary`, `first` might return any of the elements,
1442 /// whichever is visited first depending on the parallel execution.
1443 ///
1444 /// ```
1445 /// use orx_parallel::*;
1446 ///
1447 /// let a = 1..10_000;
1448 ///
1449 /// // might return either of 3421 or 2*3421
1450 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
1451 /// assert!([3421, 2 * 3421].contains(&any));
1452 ///
1453 /// // or equivalently,
1454 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1455 /// assert!([3421, 2 * 3421].contains(&any));
1456 /// ```
1457 fn first(self) -> Option<Self::Item>
1458 where
1459 Self::Item: Send;
1460
1461 /// Searches for an element of an iterator that satisfies a `predicate`.
1462 ///
1463 /// Depending on the set iteration order of the parallel iterator, returns
1464 ///
1465 /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
1466 /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
1467 ///
1468 /// `find` takes a closure that returns true or false.
1469 /// It applies this closure to each element of the iterator,
1470 /// and returns `Some(x)` where `x` is the first element that returns true.
1471 /// If they all return false, it returns None.
1472 ///
1473 /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
1474 ///
1475 /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
1476 ///
1477 /// # Examples
1478 ///
1479 /// The following example demonstrates the usage of first with default `Ordered` iteration.
1480 /// This guarantees that the first element with respect to position in the input sequence
1481 /// is returned.
1482 ///
1483 /// ```
1484 /// use orx_parallel::*;
1485 ///
1486 /// let a = 1..10_000;
1487 /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
1488 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1489 /// ```
1490 ///
1491 /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
1492 /// whichever is found first depending on the parallel execution.
1493 ///
1494 /// ```
1495 /// use orx_parallel::*;
1496 ///
1497 /// let a = 1..10_000;
1498 ///
1499 /// // might return either of 3421 or 2*3421
1500 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1501 /// assert!([3421, 2 * 3421].contains(&any));
1502 /// ```
1503 fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
1504 where
1505 Self::Item: Send,
1506 Predicate: Fn(&Self::Item) -> bool + Sync,
1507 {
1508 self.filter(&predicate).first()
1509 }
1510}