orx_parallel/par_iter.rs
1use crate::ParIterResult;
2use crate::computational_variants::fallible_option::ParOption;
3use crate::par_iter_option::{IntoOption, ParIterOption};
4use crate::par_iter_result::IntoResult;
5use crate::using::{UsingClone, UsingFun};
6use crate::{
7 ParIterUsing, Params,
8 collect_into::ParCollectInto,
9 computations::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
10 parameters::{ChunkSize, IterationOrder, NumThreads},
11 runner::{DefaultRunner, ParallelRunner},
12 special_type_sets::Sum,
13};
14use core::cmp::Ordering;
15use orx_concurrent_iter::ConcurrentIter;
16
17/// Parallel iterator.
18pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
19where
20 R: ParallelRunner,
21{
22 /// Element type of the parallel iterator.
23 type Item;
24
25 /// Returns a reference to the input concurrent iterator.
26 fn con_iter(&self) -> &impl ConcurrentIter;
27
28 /// Parameters of the parallel iterator.
29 ///
30 /// # Examples
31 ///
32 /// ```
33 /// use orx_parallel::*;
34 /// use std::num::NonZero;
35 ///
36 /// let vec = vec![1, 2, 3, 4];
37 ///
38 /// assert_eq!(
39 /// vec.par().params(),
40 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
41 /// );
42 ///
43 /// assert_eq!(
44 /// vec.par().num_threads(0).chunk_size(0).params(),
45 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
46 /// );
47 ///
48 /// assert_eq!(
49 /// vec.par().num_threads(1).params(),
50 /// Params::new(
51 /// NumThreads::Max(NonZero::new(1).unwrap()),
52 /// ChunkSize::Auto,
53 /// IterationOrder::Ordered
54 /// )
55 /// );
56 ///
57 /// assert_eq!(
58 /// vec.par().num_threads(4).chunk_size(64).params(),
59 /// Params::new(
60 /// NumThreads::Max(NonZero::new(4).unwrap()),
61 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
62 /// IterationOrder::Ordered
63 /// )
64 /// );
65 ///
66 /// assert_eq!(
67 /// vec.par()
68 /// .num_threads(8)
69 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
70 /// .iteration_order(IterationOrder::Arbitrary)
71 /// .params(),
72 /// Params::new(
73 /// NumThreads::Max(NonZero::new(8).unwrap()),
74 /// ChunkSize::Min(NonZero::new(16).unwrap()),
75 /// IterationOrder::Arbitrary
76 /// )
77 /// );
78 /// ```
79 fn params(&self) -> Params;
80
81 // params transformations
82
83 /// Sets the number of threads to be used in the parallel execution.
84 /// Integers can be used as the argument with the following mapping:
85 ///
86 /// * `0` -> `NumThreads::Auto`
87 /// * `1` -> `NumThreads::sequential()`
88 /// * `n > 0` -> `NumThreads::Max(n)`
89 ///
90 /// See [`NumThreads`] for details.
91 ///
92 /// # Examples
93 ///
94 /// ```
95 /// use orx_parallel::*;
96 /// use std::num::NonZero;
97 ///
98 /// let vec = vec![1, 2, 3, 4];
99 ///
100 /// // all available threads can be used
101 ///
102 /// assert_eq!(
103 /// vec.par().params(),
104 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
105 /// );
106 ///
107 /// assert_eq!(
108 /// vec.par().num_threads(0).chunk_size(0).params(),
109 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
110 /// );
111 ///
112 /// // computation will be executed sequentially on the main thread, no parallelization
113 ///
114 /// assert_eq!(
115 /// vec.par().num_threads(1).params(),
116 /// Params::new(
117 /// NumThreads::Max(NonZero::new(1).unwrap()),
118 /// ChunkSize::Auto,
119 /// IterationOrder::Ordered
120 /// )
121 /// );
122 ///
123 /// // maximum 4 threads can be used
124 /// assert_eq!(
125 /// vec.par().num_threads(4).chunk_size(64).params(),
126 /// Params::new(
127 /// NumThreads::Max(NonZero::new(4).unwrap()),
128 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
129 /// IterationOrder::Ordered
130 /// )
131 /// );
132 ///
133 /// // maximum 8 threads can be used
134 /// assert_eq!(
135 /// vec.par()
136 /// .num_threads(8)
137 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
138 /// .iteration_order(IterationOrder::Arbitrary)
139 /// .params(),
140 /// Params::new(
141 /// NumThreads::Max(NonZero::new(8).unwrap()),
142 /// ChunkSize::Min(NonZero::new(16).unwrap()),
143 /// IterationOrder::Arbitrary
144 /// )
145 /// );
146 /// ```
147 fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
148
149 /// Sets the number of elements to be pulled from the concurrent iterator during the
150 /// parallel execution. When integers are used as argument, the following mapping applies:
151 ///
152 /// * `0` -> `ChunkSize::Auto`
153 /// * `n > 0` -> `ChunkSize::Exact(n)`
154 ///
155 /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
156 ///
157 /// See [`ChunkSize`] for details.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// use orx_parallel::*;
163 /// use std::num::NonZero;
164 ///
165 /// let vec = vec![1, 2, 3, 4];
166 ///
167 /// // chunk sizes will be dynamically decided by the parallel runner
168 ///
169 /// assert_eq!(
170 /// vec.par().params(),
171 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
172 /// );
173 ///
174 /// assert_eq!(
175 /// vec.par().num_threads(0).chunk_size(0).params(),
176 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
177 /// );
178 ///
179 /// assert_eq!(
180 /// vec.par().num_threads(1).params(),
181 /// Params::new(
182 /// NumThreads::Max(NonZero::new(1).unwrap()),
183 /// ChunkSize::Auto,
184 /// IterationOrder::Ordered
185 /// )
186 /// );
187 ///
188 /// // chunk size will always be 64, parallel runner cannot change
189 ///
190 /// assert_eq!(
191 /// vec.par().num_threads(4).chunk_size(64).params(),
192 /// Params::new(
193 /// NumThreads::Max(NonZero::new(4).unwrap()),
194 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
195 /// IterationOrder::Ordered
196 /// )
197 /// );
198 ///
199 /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
200 ///
201 /// assert_eq!(
202 /// vec.par()
203 /// .num_threads(8)
204 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
205 /// .iteration_order(IterationOrder::Arbitrary)
206 /// .params(),
207 /// Params::new(
208 /// NumThreads::Max(NonZero::new(8).unwrap()),
209 /// ChunkSize::Min(NonZero::new(16).unwrap()),
210 /// IterationOrder::Arbitrary
211 /// )
212 /// );
213 /// ```
214 fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
215
216 /// Sets the iteration order of the parallel computation.
217 ///
218 /// See [`IterationOrder`] for details.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use orx_parallel::*;
224 ///
225 /// let vec = vec![1, 2, 3, 4];
226 ///
227 /// // results are collected in order consistent to the input order,
228 /// // or find returns the first element satisfying the predicate
229 ///
230 /// assert_eq!(
231 /// vec.par().params(),
232 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
233 /// );
234 ///
235 /// assert_eq!(
236 /// vec.par().iteration_order(IterationOrder::Ordered).params(),
237 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
238 /// );
239 ///
240 /// // results might be collected in arbitrary order
241 /// // or find returns the any of the elements satisfying the predicate
242 ///
243 /// assert_eq!(
244 /// vec.par().iteration_order(IterationOrder::Arbitrary).params(),
245 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
246 /// );
247 /// ```
248 fn iteration_order(self, order: IterationOrder) -> Self;
249
250 /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
251 ///
252 /// # Examples
253 ///
254 /// ```ignore
255 /// use orx_parallel::*;
256 ///
257 /// let inputs = vec![1, 2, 3, 4];
258 ///
259 /// // uses the default runner
260 /// let sum = inputs.par().sum();
261 ///
262 /// // uses the custom parallel runner MyParallelRunner: ParallelRunner
263 /// let sum = inputs.par().with_runner::<MyParallelRunner>().sum();
264 /// ```
265 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item>;
266
267 // using transformations
268
269 /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
270 /// used variable throughout the computation.
271 ///
272 /// Note that each used thread will obtain exactly one instance of the variable.
273 ///
274 /// The signature of the `using` closure is `(thread_idx: usize) -> U` which will create an instance of
275 /// `U` with respect to the `thread_idx`. The `thread_idx` is the order of the spawned thread; i.e.,
276 /// if the parallel computation uses 8 threads, the thread indices will be 0, 1, ..., 7.
277 ///
278 /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
279 ///
280 /// # Examples
281 ///
282 /// ## Example 1: Channels
283 ///
284 /// The following example is taken from rayon's `for_each_with` documentation and converted to using transformation:
285 ///
286 /// ```ignore
287 /// use orx_parallel::*;
288 /// use std::sync::mpsc::channel;
289 ///
290 /// let (sender, receiver) = channel();
291 ///
292 /// (0..5)
293 /// .into_par()
294 /// .using(|_thread_idx| sender.clone())
295 /// .for_each(|s, x| s.send(x).unwrap());
296 ///
297 /// let mut res: Vec<_> = receiver.iter().collect();
298 ///
299 /// res.sort();
300 ///
301 /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
302 /// ```
303 ///
304 /// ## Example 2: Random Number Generator
305 ///
306 /// Random number generator is one of the common use cases that is important for a certain class of algorithms.
307 ///
308 /// The following example demonstrates how to safely generate random numbers through mutable references within
309 /// a parallel computation.
310 ///
311 /// Notice the differences between sequential and parallel computation.
312 /// * In sequential computation, a mutable reference to `rng` is captured, while in parallel computation, we
313 /// explicitly define that we will be `using` a random number generator.
314 /// * Parallel iterator does not mutable capture any variable from the scope; however, using transformation
315 /// converts the `ParIter` into `ParIterUsing` which allows mutable access within all iterator methods.
316 ///
317 /// ```
318 /// use orx_parallel::*;
319 /// use rand::{Rng, SeedableRng};
320 /// use rand_chacha::ChaCha20Rng;
321 ///
322 /// fn random_walk(rng: &mut impl Rng, position: i64, num_steps: usize) -> i64 {
323 /// (0..num_steps).fold(position, |p, _| random_step(rng, p))
324 /// }
325 ///
326 /// fn random_step(rng: &mut impl Rng, position: i64) -> i64 {
327 /// match rng.random_bool(0.5) {
328 /// true => position + 1, // to right
329 /// false => position - 1, // to left
330 /// }
331 /// }
332 ///
333 /// fn input_positions() -> Vec<i64> {
334 /// (-100..=100).collect()
335 /// }
336 ///
337 /// fn sequential() {
338 /// let positions = input_positions();
339 ///
340 /// let mut rng = ChaCha20Rng::seed_from_u64(42);
341 /// let final_positions: Vec<_> = positions
342 /// .iter()
343 /// .copied()
344 /// .map(|position| random_walk(&mut rng, position, 10))
345 /// .collect();
346 /// let sum_final_positions = final_positions.iter().sum::<i64>();
347 /// }
348 ///
349 /// fn parallel() {
350 /// let positions = input_positions();
351 ///
352 /// let final_positions: Vec<_> = positions
353 /// .par()
354 /// .copied()
355 /// .using(|t_idx| ChaCha20Rng::seed_from_u64(42 * t_idx as u64))
356 /// .map(|rng, position| random_walk(rng, position, 10))
357 /// .collect();
358 /// let sum_final_positions = final_positions.iter().sum::<i64>();
359 /// }
360 ///
361 /// sequential();
362 /// parallel();
363 /// ```
364 ///
365 /// ## Example 3: Metrics Collection
366 ///
367 /// The following example demonstrates how to collect metrics about a parallel computation with `using` transformation and
368 /// some `unsafe` help with interior mutability.
369 ///
370 /// ```
371 /// use orx_parallel::*;
372 /// use std::cell::UnsafeCell;
373 ///
374 /// const N: u64 = 1_000;
375 /// const MAX_NUM_THREADS: usize = 4;
376 ///
377 /// // just some work
378 /// fn fibonacci(n: u64) -> u64 {
379 /// let mut a = 0;
380 /// let mut b = 1;
381 /// for _ in 0..n {
382 /// let c = a + b;
383 /// a = b;
384 /// b = c;
385 /// }
386 /// a
387 /// }
388 ///
389 /// #[derive(Default, Debug)]
390 /// struct ThreadMetrics {
391 /// thread_idx: usize,
392 /// num_items_handled: usize,
393 /// handled_42: bool,
394 /// num_filtered_out: usize,
395 /// }
396 ///
397 /// struct ThreadMetricsWriter<'a> {
398 /// metrics_ref: &'a mut ThreadMetrics,
399 /// }
400 ///
401 /// struct ComputationMetrics {
402 /// thread_metrics: UnsafeCell<[ThreadMetrics; MAX_NUM_THREADS]>,
403 /// }
404 /// impl ComputationMetrics {
405 /// fn new() -> Self {
406 /// let mut thread_metrics: [ThreadMetrics; MAX_NUM_THREADS] = Default::default();
407 /// for i in 0..MAX_NUM_THREADS {
408 /// thread_metrics[i].thread_idx = i;
409 /// }
410 /// Self {
411 /// thread_metrics: UnsafeCell::new(thread_metrics),
412 /// }
413 /// }
414 /// }
415 ///
416 /// impl ComputationMetrics {
417 /// unsafe fn create_for_thread<'a>(&mut self, thread_idx: usize) -> ThreadMetricsWriter<'a> {
418 /// // SAFETY: here we create a mutable variable to the thread_idx-th metrics
419 /// // * If we call this method multiple times with the same index,
420 /// // we create multiple mutable references to the same ThreadMetrics,
421 /// // which would lead to a race condition.
422 /// // * We must make sure that `create_for_thread` is called only once per thread.
423 /// // * If we use `create_for_thread` within the `using` call to create mutable values
424 /// // used by the threads, we are certain that the parallel computation
425 /// // will only call this method once per thread; hence, it will not
426 /// // cause the race condition.
427 /// // * On the other hand, we must ensure that we do not call this method
428 /// // externally.
429 /// let array = unsafe { &mut *self.thread_metrics.get() };
430 /// ThreadMetricsWriter {
431 /// metrics_ref: &mut array[thread_idx],
432 /// }
433 /// }
434 /// }
435 ///
436 /// let mut metrics = ComputationMetrics::new();
437 ///
438 /// let input: Vec<u64> = (0..N).collect();
439 ///
440 /// let sum = input
441 /// .par()
442 /// // SAFETY: we do not call `create_for_thread` externally;
443 /// // it is safe if it is called only by the parallel computation.
444 /// .using(|t| unsafe { metrics.create_for_thread(t) })
445 /// .map(|m: &mut ThreadMetricsWriter<'_>, i| {
446 /// // collect some useful metrics
447 /// m.metrics_ref.num_items_handled += 1;
448 /// m.metrics_ref.handled_42 |= *i == 42;
449 ///
450 /// // actual work
451 /// fibonacci((*i % 20) + 1) % 100
452 /// })
453 /// .filter(|m, i| {
454 /// let is_even = i % 2 == 0;
455 ///
456 /// if !is_even {
457 /// m.metrics_ref.num_filtered_out += 1;
458 /// }
459 ///
460 /// is_even
461 /// })
462 /// .num_threads(MAX_NUM_THREADS)
463 /// .sum();
464 ///
465 /// let total_by_metrics: usize = metrics
466 /// .thread_metrics
467 /// .get_mut()
468 /// .iter()
469 /// .map(|x| x.num_items_handled)
470 /// .sum();
471 ///
472 /// assert_eq!(N as usize, total_by_metrics);
473 /// ```
474 ///
475 fn using<U, F>(
476 self,
477 using: F,
478 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
479 where
480 U: Send + 'static,
481 F: FnMut(usize) -> U;
482
483 /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
484 /// used variable throughout the computation.
485 ///
486 /// Note that each used thread will obtain exactly one instance of the variable.
487 ///
488 /// Each used thread receives a clone of the provided `value`.
489 /// Note that, `using_clone(value)` can be considered as a shorthand for `using(|_thread_idx| value.clone())`.
490 ///
491 /// Please see [`using`] for examples.
492 ///
493 /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
494 ///
495 /// [`using`]: crate::ParIter::using
496 fn using_clone<U>(
497 self,
498 value: U,
499 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
500 where
501 U: Clone + Send + 'static;
502
503 // transformations into fallible computations
504
505 /// Transforms a parallel iterator where elements are of the result type; i.e., `ParIter<R, Item = Result<T, E>>`,
506 /// into fallible parallel iterator with item type `T` and error type `E`; i.e., into `ParIterResult<R, Item = T, Err = E>`.
507 ///
508 /// `ParIterResult` is also a parallel iterator; however, with methods specialized for handling fallible computations
509 /// as follows:
510 ///
511 /// * All of its methods are based on the success path with item type of `T`.
512 /// * However, computations short-circuit and immediately return the observed error if any of the items
513 /// is of the `Err` variant of the result enum.
514 ///
515 /// See [`ParIterResult`] for details.
516 ///
517 /// # Examples
518 ///
519 /// ```
520 /// use orx_parallel::*;
521 ///
522 /// // all succeeds
523 ///
524 /// let result_doubled: Result<Vec<i32>, _> = ["1", "2", "3"]
525 /// .into_par()
526 /// .map(|x| x.parse::<i32>()) // ParIter with Item=Result<i32, ParseIntError>
527 /// .into_fallible_result() // ParIterResult with Item=i32 and Err=ParseIntError
528 /// .map(|x| x * 2) // methods focus on the success path with Item=i32
529 /// .collect(); // methods return Result<_, Err>
530 /// // where the Ok variant depends on the computation
531 ///
532 /// assert_eq!(result_doubled, Ok(vec![2, 4, 6]));
533 ///
534 /// // at least one fails
535 ///
536 /// let result_doubled: Result<Vec<i32>, _> = ["1", "x!", "3"]
537 /// .into_par()
538 /// .map(|x| x.parse::<i32>())
539 /// .into_fallible_result()
540 /// .map(|x| x * 2)
541 /// .collect();
542 ///
543 /// assert!(result_doubled.is_err());
544 /// ```
545 fn into_fallible_result<T, E>(self) -> impl ParIterResult<R, Item = T, Err = E>
546 where
547 Self::Item: IntoResult<T, E>;
548
549 /// Transforms a parallel iterator where elements are of the option type; i.e., `ParIter<R, Item = Option<T>>`,
550 /// into fallible parallel iterator with item type `T`; i.e., into `ParIterOption<R, Item = T>`.
551 ///
552 /// `ParIterOption` is also a parallel iterator; however, with methods specialized for handling fallible computations
553 /// as follows:
554 ///
555 /// * All of its methods are based on the success path with item type of `T`.
556 /// * However, computations short-circuit and immediately return None if any of the items
557 /// is of the `None` variant of the option enum.
558 ///
559 /// See [`ParIterResult`] for details.
560 ///
561 /// # Examples
562 ///
563 /// ```
564 /// use orx_parallel::*;
565 ///
566 /// // all succeeds
567 ///
568 /// let result_doubled: Option<Vec<i32>> = ["1", "2", "3"]
569 /// .into_par()
570 /// .map(|x| x.parse::<i32>().ok()) // ParIter with Item=Option<i32>
571 /// .into_fallible_option() // ParIterOption with Item=i32
572 /// .map(|x| x * 2) // methods focus on the success path with Item=i32
573 /// .collect(); // methods return Option<T>
574 /// // where T depends on the computation
575 ///
576 /// assert_eq!(result_doubled, Some(vec![2, 4, 6]));
577 ///
578 /// // at least one fails
579 ///
580 /// let result_doubled: Option<Vec<i32>> = ["1", "x!", "3"]
581 /// .into_par()
582 /// .map(|x| x.parse::<i32>().ok())
583 /// .into_fallible_option()
584 /// .map(|x| x * 2)
585 /// .collect();
586 ///
587 /// assert_eq!(result_doubled, None);
588 /// ```
589 fn into_fallible_option<T>(self) -> impl ParIterOption<R, Item = T>
590 where
591 Self::Item: IntoOption<T>,
592 {
593 ParOption::new(
594 self.map(|x| x.into_result_with_unit_err())
595 .into_fallible_result(),
596 )
597 }
598
599 // computation transformations
600
601 /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
602 ///
603 /// # Examples
604 ///
605 /// ```
606 /// use orx_parallel::*;
607 ///
608 /// let a = [1, 2, 3];
609 ///
610 /// let iter = a.into_par().map(|x| 2 * x);
611 ///
612 /// let b: Vec<_> = iter.collect();
613 /// assert_eq!(b, &[2, 4, 6]);
614 /// ```
615 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
616 where
617 Map: Fn(Self::Item) -> Out + Sync + Clone;
618
619 /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
620 ///
621 /// # Examples
622 ///
623 /// ```
624 /// use orx_parallel::*;
625 ///
626 /// let a = [1, 2, 3];
627 ///
628 /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
629 ///
630 /// let b: Vec<_> = iter.collect();
631 /// assert_eq!(b, &[1, 3]);
632 /// ```
633 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
634 where
635 Filter: Fn(&Self::Item) -> bool + Sync + Clone;
636
637 /// Creates an iterator that works like map, but flattens nested structure.
638 ///
639 /// # Examples
640 ///
641 /// ```
642 /// use orx_parallel::*;
643 ///
644 /// let words = ["alpha", "beta", "gamma"];
645 ///
646 /// // chars() returns an iterator
647 /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
648 ///
649 /// let merged: String = all_chars.iter().collect();
650 /// assert_eq!(merged, "alphabetagamma");
651 /// ```
652 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
653 where
654 IOut: IntoIterator,
655 FlatMap: Fn(Self::Item) -> IOut + Sync + Clone;
656
657 /// Creates an iterator that both filters and maps.
658 ///
659 /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
660 ///
661 /// `filter_map` can be used to make chains of `filter` and `map` more concise.
662 /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
663 ///
664 /// # Examples
665 ///
666 /// ```
667 /// use orx_parallel::*;
668 ///
669 /// let a = ["1", "two", "NaN", "four", "5"];
670 ///
671 /// let numbers: Vec<_> = a
672 /// .into_par()
673 /// .filter_map(|s| s.parse::<usize>().ok())
674 /// .collect();
675 ///
676 /// assert_eq!(numbers, [1, 5]);
677 /// ```
678 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
679 where
680 FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone;
681
682 /// Does something with each element of an iterator, passing the value on.
683 ///
684 /// When using iterators, you’ll often chain several of them together.
685 /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
686 /// To do that, insert a call to `inspect()`.
687 ///
688 /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
689 /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
690 ///
691 /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
692 /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
693 /// collect some intermediate values during parallel execution for further inspection.
694 /// The following example demonstrates such a use case.
695 ///
696 /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
697 ///
698 /// # Examples
699 ///
700 /// ```
701 /// use orx_parallel::*;
702 /// use orx_concurrent_bag::*;
703 ///
704 /// let a = vec![1, 4, 2, 3];
705 ///
706 /// // let's add some inspect() calls to investigate what's happening
707 /// // - log some events
708 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
709 /// let bag = ConcurrentBag::new();
710 ///
711 /// let sum = a
712 /// .par()
713 /// .copied()
714 /// .inspect(|x| println!("about to filter: {x}"))
715 /// .filter(|x| x % 2 == 0)
716 /// .inspect(|x| {
717 /// bag.push(*x);
718 /// println!("made it through filter: {x}");
719 /// })
720 /// .sum();
721 /// println!("{sum}");
722 ///
723 /// let mut values_made_through = bag.into_inner();
724 /// values_made_through.sort();
725 /// assert_eq!(values_made_through, [2, 4]);
726 /// ```
727 ///
728 /// This will print:
729 ///
730 /// ```console
731 /// about to filter: 1
732 /// about to filter: 4
733 /// made it through filter: 4
734 /// about to filter: 2
735 /// made it through filter: 2
736 /// about to filter: 3
737 /// 6
738 /// ```
739 fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
740 where
741 Operation: Fn(&Self::Item) + Sync + Clone,
742 {
743 let map = move |x| {
744 operation(&x);
745 x
746 };
747 self.map(map)
748 }
749
750 // while transformations
751
752 /// Creates an iterator that yields elements based on the predicate `take_while`.
753 ///
754 /// It will call this closure on each element of the iterator, and yield elements while it returns true.
755 ///
756 /// After false is returned, take_while’s job is over, and the rest of the elements are ignored.
757 ///
758 /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
759 ///
760 /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
761 /// elements before the predicate returns false are collected in the correct order; and hence, the result
762 /// is deterministic.
763 ///
764 /// # Examples
765 ///
766 /// ```
767 /// use orx_parallel::*;
768 ///
769 /// let iter = (0..10_000).par().take_while(|x| *x != 5_000);
770 /// let b: Vec<_> = iter.collect();
771 ///
772 /// assert_eq!(b, (0..5_000).collect::<Vec<_>>());
773 /// ```
774 fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
775 where
776 While: Fn(&Self::Item) -> bool + Sync + Clone;
777
778 /// Creates an iterator that both yields elements based on the predicate `map_while` and maps.
779 ///
780 /// `map_while` takes a closure as an argument.
781 /// It will call this closure on each element of the iterator, and yield elements while it returns Some(_).
782 ///
783 /// After None is returned, map_while job is over, and the rest of the elements are ignored.
784 ///
785 /// Unlike regular sequential iterators, the result is not always deterministic due to parallel execution.
786 ///
787 /// However, as demonstrated in the example below, `collect` with ordered execution makes sure that all
788 /// elements before the predicate returns None are collected in the correct order; and hence, the result
789 /// is deterministic.
790 ///
791 /// ```
792 /// use orx_parallel::*;
793 ///
794 /// let iter = (0..10_000)
795 /// .par()
796 /// .map(|x| x as i32 - 5_000) // -5_000..10_000
797 /// .map_while(|x| 16i32.checked_div(x));
798 /// let b: Vec<_> = iter.collect();
799 ///
800 /// assert_eq!(b, (-5_000..0).map(|x| 16 / x).collect::<Vec<_>>());
801 /// ```
802 fn map_while<Out, MapWhile>(self, map_while: MapWhile) -> impl ParIter<R, Item = Out>
803 where
804 MapWhile: Fn(Self::Item) -> Option<Out> + Sync + Clone,
805 {
806 self.map(map_while).take_while(|x| x.is_some()).map(|x| {
807 // SAFETY: since x passed the whilst(is-some) check, unwrap_unchecked
808 unsafe { x.unwrap_unchecked() }
809 })
810 }
811
812 // special item transformations
813
814 /// Creates an iterator which copies all of its elements.
815 ///
816 /// # Examples
817 ///
818 /// ```
819 /// use orx_parallel::*;
820 ///
821 /// let a = vec![1, 2, 3];
822 ///
823 /// let v_copied: Vec<_> = a.par().copied().collect();
824 ///
825 /// // copied is the same as .map(|&x| x)
826 /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
827 ///
828 /// assert_eq!(v_copied, vec![1, 2, 3]);
829 /// assert_eq!(v_map, vec![1, 2, 3]);
830 /// ```
831 fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
832 where
833 T: 'a + Copy,
834 Self: ParIter<R, Item = &'a T>,
835 {
836 self.map(map_copy)
837 }
838
839 /// Creates an iterator which clones all of its elements.
840 ///
841 /// # Examples
842 ///
843 /// ```
844 /// use orx_parallel::*;
845 ///
846 /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
847 ///
848 /// let v_cloned: Vec<_> = a.par().cloned().collect();
849 ///
850 /// // cloned is the same as .map(|x| x.clone())
851 /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
852 ///
853 /// assert_eq!(
854 /// v_cloned,
855 /// vec![String::from("1"), String::from("2"), String::from("3")]
856 /// );
857 /// assert_eq!(
858 /// v_map,
859 /// vec![String::from("1"), String::from("2"), String::from("3")]
860 /// );
861 /// ```
862 fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
863 where
864 T: 'a + Clone,
865 Self: ParIter<R, Item = &'a T>,
866 {
867 self.map(map_clone)
868 }
869
870 /// Creates an iterator that flattens nested structure.
871 ///
872 /// This is useful when you have an iterator of iterators or an iterator of things that can be
873 /// turned into iterators and you want to remove one level of indirection.
874 ///
875 /// # Examples
876 ///
877 /// Basic usage.
878 ///
879 /// ```
880 /// use orx_parallel::*;
881 ///
882 /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
883 /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
884 /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
885 /// ```
886 ///
887 /// Mapping and then flattening:
888 ///
889 /// ```
890 /// use orx_parallel::*;
891 ///
892 /// let words = vec!["alpha", "beta", "gamma"];
893 ///
894 /// // chars() returns an iterator
895 /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
896 /// let merged: String = all_characters.into_iter().collect();
897 /// assert_eq!(merged, "alphabetagamma");
898 /// ```
899 ///
900 /// But actually, you can write this in terms of `flat_map`,
901 /// which is preferable in this case since it conveys intent more clearly:
902 ///
903 /// ```
904 /// use orx_parallel::*;
905 ///
906 /// let words = vec!["alpha", "beta", "gamma"];
907 ///
908 /// // chars() returns an iterator
909 /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
910 /// let merged: String = all_characters.into_iter().collect();
911 /// assert_eq!(merged, "alphabetagamma");
912 /// ```
913 fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
914 where
915 Self::Item: IntoIterator,
916 {
917 let map = |e: Self::Item| e.into_iter();
918 self.flat_map(map)
919 }
920
921 // collect
922
923 /// Collects all the items from an iterator into a collection.
924 ///
925 /// This is useful when you already have a collection and want to add the iterator items to it.
926 ///
927 /// The collection is passed in as owned value, and returned back with the additional elements.
928 ///
929 /// All collections implementing [`ParCollectInto`] can be used to collect into.
930 ///
931 /// [`ParCollectInto`]: crate::ParCollectInto
932 ///
933 /// # Examples
934 ///
935 /// ```
936 /// use orx_parallel::*;
937 ///
938 /// let a = vec![1, 2, 3];
939 ///
940 /// let vec: Vec<i32> = vec![0, 1];
941 /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
942 /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
943 ///
944 /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
945 /// ```
946 fn collect_into<C>(self, output: C) -> C
947 where
948 C: ParCollectInto<Self::Item>;
949
950 /// Transforms an iterator into a collection.
951 ///
952 /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
953 /// the type of the result collection; or turbofish annotation can be used.
954 ///
955 /// All collections implementing [`ParCollectInto`] can be used to collect into.
956 ///
957 /// [`ParCollectInto`]: crate::ParCollectInto
958 ///
959 /// # Examples
960 ///
961 /// ```
962 /// use orx_parallel::*;
963 ///
964 /// let a = vec![1, 2, 3];
965 ///
966 /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
967 ///
968 /// assert_eq!(vec![2, 4, 6], doubled);
969 /// ```
970 fn collect<C>(self) -> C
971 where
972 C: ParCollectInto<Self::Item>,
973 {
974 let output = C::empty(self.con_iter().try_get_len());
975 self.collect_into(output)
976 }
977
978 // reduce
979
980 /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
981 ///
982 /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
983 ///
984 /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
985 ///
986 /// # Example
987 ///
988 /// ```
989 /// use orx_parallel::*;
990 ///
991 /// let inputs = 1..10;
992 /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
993 /// assert_eq!(reduced, 45);
994 /// ```
995 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
996 where
997 Self::Item: Send,
998 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync;
999
1000 /// Tests if every element of the iterator matches a predicate.
1001 ///
1002 /// `all` takes a `predicate` that returns true or false.
1003 /// It applies this closure to each element of the iterator,
1004 /// and if they all return true, then so does `all`.
1005 /// If any of them returns false, it returns false.
1006 ///
1007 /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
1008 /// given that no matter what else happens, the result will also be false.
1009 ///
1010 /// An empty iterator returns true.
1011 ///
1012 /// # Examples
1013 ///
1014 /// ```
1015 /// use orx_parallel::*;
1016 ///
1017 /// let mut a = vec![1, 2, 3];
1018 /// assert!(a.par().all(|x| **x > 0));
1019 /// assert!(!a.par().all(|x| **x > 2));
1020 ///
1021 /// a.clear();
1022 /// assert!(a.par().all(|x| **x > 2)); // empty iterator
1023 /// ```
1024 fn all<Predicate>(self, predicate: Predicate) -> bool
1025 where
1026 Self::Item: Send,
1027 Predicate: Fn(&Self::Item) -> bool + Sync,
1028 {
1029 let violates = |x: &Self::Item| !predicate(x);
1030 self.find(violates).is_none()
1031 }
1032
1033 /// Tests if any element of the iterator matches a predicate.
1034 ///
1035 /// `any` takes a `predicate` that returns true or false.
1036 /// It applies this closure to each element of the iterator,
1037 /// and if any of the elements returns true, then so does `any`.
1038 /// If all of them return false, it returns false.
1039 ///
1040 /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
1041 /// given that no matter what else happens, the result will also be true.
1042 ///
1043 /// An empty iterator returns false.
1044 ///
1045 /// # Examples
1046 ///
1047 /// ```
1048 /// use orx_parallel::*;
1049 ///
1050 /// let mut a = vec![1, 2, 3];
1051 /// assert!(a.par().any(|x| **x > 0));
1052 /// assert!(!a.par().any(|x| **x > 5));
1053 ///
1054 /// a.clear();
1055 /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
1056 /// ```
1057 fn any<Predicate>(self, predicate: Predicate) -> bool
1058 where
1059 Self::Item: Send,
1060 Predicate: Fn(&Self::Item) -> bool + Sync,
1061 {
1062 self.find(predicate).is_some()
1063 }
1064
1065 /// Consumes the iterator, counting the number of iterations and returning it.
1066 ///
1067 /// # Examples
1068 ///
1069 /// ```
1070 /// use orx_parallel::*;
1071 ///
1072 /// let a = vec![1, 2, 3];
1073 /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
1074 /// ```
1075 fn count(self) -> usize {
1076 self.map(map_count).reduce(reduce_sum).unwrap_or(0)
1077 }
1078
1079 /// Calls a closure on each element of an iterator.
1080 ///
1081 /// # Examples
1082 ///
1083 /// Basic usage:
1084 ///
1085 /// ```
1086 /// use orx_parallel::*;
1087 /// use std::sync::mpsc::channel;
1088 ///
1089 /// let (tx, rx) = channel();
1090 /// (0..5)
1091 /// .par()
1092 /// .map(|x| x * 2 + 1)
1093 /// .for_each(move |x| tx.send(x).unwrap());
1094 ///
1095 /// let mut v: Vec<_> = rx.iter().collect();
1096 /// v.sort(); // order can be mixed, since messages will be sent in parallel
1097 /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
1098 /// ```
1099 ///
1100 /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
1101 /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
1102 /// In the following example, we log every element that satisfies a predicate in parallel.
1103 ///
1104 /// ```
1105 /// use orx_parallel::*;
1106 ///
1107 /// (0..5)
1108 /// .par()
1109 /// .flat_map(|x| x * 100..x * 110)
1110 /// .filter(|&x| x % 3 == 0)
1111 /// .for_each(|x| println!("{x}"));
1112 /// ```
1113 fn for_each<Operation>(self, operation: Operation)
1114 where
1115 Operation: Fn(Self::Item) + Sync,
1116 {
1117 let map = |x| operation(x);
1118 let _ = self.map(map).reduce(reduce_unit);
1119 }
1120
1121 /// Returns the maximum element of an iterator.
1122 ///
1123 /// If the iterator is empty, None is returned.
1124 ///
1125 /// # Examples
1126 ///
1127 /// ```
1128 /// use orx_parallel::*;
1129 ///
1130 /// let a = vec![1, 2, 3];
1131 /// let b: Vec<u32> = Vec::new();
1132 ///
1133 /// assert_eq!(a.par().max(), Some(&3));
1134 /// assert_eq!(b.par().max(), None);
1135 /// ```
1136 fn max(self) -> Option<Self::Item>
1137 where
1138 Self::Item: Ord + Send,
1139 {
1140 self.reduce(Ord::max)
1141 }
1142
1143 /// Returns the element that gives the maximum value with respect to the specified `compare` function.
1144 ///
1145 /// If the iterator is empty, None is returned.
1146 ///
1147 /// # Examples
1148 ///
1149 /// ```
1150 /// use orx_parallel::*;
1151 ///
1152 /// let a = vec![-3_i32, 0, 1, 5, -10];
1153 /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
1154 /// ```
1155 fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1156 where
1157 Self::Item: Send,
1158 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1159 {
1160 let reduce = |x, y| match compare(&x, &y) {
1161 Ordering::Greater | Ordering::Equal => x,
1162 Ordering::Less => y,
1163 };
1164 self.reduce(reduce)
1165 }
1166
1167 /// Returns the element that gives the maximum value from the specified function.
1168 ///
1169 /// If the iterator is empty, None is returned.
1170 ///
1171 /// # Examples
1172 ///
1173 /// ```
1174 /// use orx_parallel::*;
1175 ///
1176 /// let a = vec![-3_i32, 0, 1, 5, -10];
1177 /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
1178 /// ```
1179 fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
1180 where
1181 Self::Item: Send,
1182 Key: Ord,
1183 GetKey: Fn(&Self::Item) -> Key + Sync,
1184 {
1185 let reduce = |x, y| match key(&x).cmp(&key(&y)) {
1186 Ordering::Greater | Ordering::Equal => x,
1187 Ordering::Less => y,
1188 };
1189 self.reduce(reduce)
1190 }
1191
1192 /// Returns the minimum element of an iterator.
1193 ///
1194 /// If the iterator is empty, None is returned.
1195 ///
1196 /// # Examples
1197 ///
1198 /// ```
1199 /// use orx_parallel::*;
1200 ///
1201 /// let a = vec![1, 2, 3];
1202 /// let b: Vec<u32> = Vec::new();
1203 ///
1204 /// assert_eq!(a.par().min(), Some(&1));
1205 /// assert_eq!(b.par().min(), None);
1206 /// ```
1207 fn min(self) -> Option<Self::Item>
1208 where
1209 Self::Item: Ord + Send,
1210 {
1211 self.reduce(Ord::min)
1212 }
1213
1214 /// Returns the element that gives the minimum value with respect to the specified `compare` function.
1215 ///
1216 /// If the iterator is empty, None is returned.
1217 ///
1218 /// # Examples
1219 ///
1220 /// ```
1221 /// use orx_parallel::*;
1222 ///
1223 /// let a = vec![-3_i32, 0, 1, 5, -10];
1224 /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
1225 /// ```
1226 fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1227 where
1228 Self::Item: Send,
1229 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1230 {
1231 let reduce = |x, y| match compare(&x, &y) {
1232 Ordering::Less | Ordering::Equal => x,
1233 Ordering::Greater => y,
1234 };
1235 self.reduce(reduce)
1236 }
1237
1238 /// Returns the element that gives the minimum value from the specified function.
1239 ///
1240 /// If the iterator is empty, None is returned.
1241 ///
1242 /// # Examples
1243 ///
1244 /// ```
1245 /// use orx_parallel::*;
1246 ///
1247 /// let a = vec![-3_i32, 0, 1, 5, -10];
1248 /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
1249 /// ```
1250 fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
1251 where
1252 Self::Item: Send,
1253 Key: Ord,
1254 GetKey: Fn(&Self::Item) -> Key + Sync,
1255 {
1256 let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1257 Ordering::Less | Ordering::Equal => x,
1258 Ordering::Greater => y,
1259 };
1260 self.reduce(reduce)
1261 }
1262
1263 /// Sums the elements of an iterator.
1264 ///
1265 /// Takes each element, adds them together, and returns the result.
1266 ///
1267 /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
1268 ///
1269 /// `sum` can be used to sum any type implementing [`Sum<Out>`].
1270 ///
1271 /// [`Sum<Out>`]: crate::Sum
1272 ///
1273 /// # Examples
1274 ///
1275 /// ```
1276 /// use orx_parallel::*;
1277 ///
1278 /// let a = vec![1, 2, 3];
1279 /// let sum: i32 = a.par().sum();
1280 ///
1281 /// assert_eq!(sum, 6);
1282 /// ```
1283 fn sum<Out>(self) -> Out
1284 where
1285 Self::Item: Sum<Out>,
1286 Out: Send,
1287 {
1288 self.map(Self::Item::map)
1289 .reduce(Self::Item::reduce)
1290 .unwrap_or(Self::Item::zero())
1291 }
1292
1293 // early exit
1294
1295 /// Returns the first (or any) element of the iterator; returns None if it is empty.
1296 ///
1297 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1298 /// * any element is returned if `IterationOrder::Arbitrary` is set.
1299 ///
1300 /// # Examples
1301 ///
1302 /// The following example demonstrates the usage of first with default `Ordered` iteration.
1303 /// This guarantees that the first element with respect to position in the input sequence
1304 /// is returned.
1305 ///
1306 /// ```
1307 /// use orx_parallel::*;
1308 ///
1309 /// let a: Vec<usize> = vec![];
1310 /// assert_eq!(a.par().copied().first(), None);
1311 ///
1312 /// let a = vec![1, 2, 3];
1313 /// assert_eq!(a.par().copied().first(), Some(1));
1314 ///
1315 /// let a = 1..10_000;
1316 /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
1317 /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
1318 ///
1319 /// // or equivalently,
1320 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1321 /// ```
1322 ///
1323 /// When the order is set to `Arbitrary`, `first` might return any of the elements,
1324 /// whichever is visited first depending on the parallel execution.
1325 ///
1326 /// ```
1327 /// use orx_parallel::*;
1328 ///
1329 /// let a = 1..10_000;
1330 ///
1331 /// // might return either of 3421 or 2*3421
1332 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
1333 /// assert!([3421, 2 * 3421].contains(&any));
1334 ///
1335 /// // or equivalently,
1336 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1337 /// assert!([3421, 2 * 3421].contains(&any));
1338 /// ```
1339 fn first(self) -> Option<Self::Item>
1340 where
1341 Self::Item: Send;
1342
1343 /// Searches for an element of an iterator that satisfies a `predicate`.
1344 ///
1345 /// Depending on the set iteration order of the parallel iterator, returns
1346 ///
1347 /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
1348 /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
1349 ///
1350 /// `find` takes a closure that returns true or false.
1351 /// It applies this closure to each element of the iterator,
1352 /// and returns `Some(x)` where `x` is the first element that returns true.
1353 /// If they all return false, it returns None.
1354 ///
1355 /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
1356 ///
1357 /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
1358 ///
1359 /// # Examples
1360 ///
1361 /// The following example demonstrates the usage of first with default `Ordered` iteration.
1362 /// This guarantees that the first element with respect to position in the input sequence
1363 /// is returned.
1364 ///
1365 /// ```
1366 /// use orx_parallel::*;
1367 ///
1368 /// let a = 1..10_000;
1369 /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
1370 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1371 /// ```
1372 ///
1373 /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
1374 /// whichever is found first depending on the parallel execution.
1375 ///
1376 /// ```
1377 /// use orx_parallel::*;
1378 ///
1379 /// let a = 1..10_000;
1380 ///
1381 /// // might return either of 3421 or 2*3421
1382 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1383 /// assert!([3421, 2 * 3421].contains(&any));
1384 /// ```
1385 fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
1386 where
1387 Self::Item: Send,
1388 Predicate: Fn(&Self::Item) -> bool + Sync,
1389 {
1390 self.filter(&predicate).first()
1391 }
1392}