orx_parallel/par_iter.rs
1use crate::using::{UsingClone, UsingFun};
2use crate::{
3 ParIterUsing, Params,
4 collect_into::ParCollectInto,
5 computations::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
6 parameters::{ChunkSize, IterationOrder, NumThreads},
7 runner::{DefaultRunner, ParallelRunner},
8 special_type_sets::Sum,
9};
10use orx_concurrent_iter::ConcurrentIter;
11use std::cmp::Ordering;
12
13/// Parallel iterator.
14pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
15where
16 R: ParallelRunner,
17{
18 /// Element type of the parallel iterator.
19 type Item: Send + Sync;
20
21 /// Returns a reference to the input concurrent iterator.
22 fn con_iter(&self) -> &impl ConcurrentIter;
23
24 /// Parameters of the parallel iterator.
25 ///
26 /// # Examples
27 ///
28 /// ```
29 /// use orx_parallel::*;
30 /// use std::num::NonZero;
31 ///
32 /// let vec = vec![1, 2, 3, 4];
33 ///
34 /// assert_eq!(
35 /// vec.par().params(),
36 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
37 /// );
38 ///
39 /// assert_eq!(
40 /// vec.par().num_threads(0).chunk_size(0).params(),
41 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
42 /// );
43 ///
44 /// assert_eq!(
45 /// vec.par().num_threads(1).params(),
46 /// Params::new(
47 /// NumThreads::Max(NonZero::new(1).unwrap()),
48 /// ChunkSize::Auto,
49 /// IterationOrder::Ordered
50 /// )
51 /// );
52 ///
53 /// assert_eq!(
54 /// vec.par().num_threads(4).chunk_size(64).params(),
55 /// Params::new(
56 /// NumThreads::Max(NonZero::new(4).unwrap()),
57 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
58 /// IterationOrder::Ordered
59 /// )
60 /// );
61 ///
62 /// assert_eq!(
63 /// vec.par()
64 /// .num_threads(8)
65 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
66 /// .iteration_order(IterationOrder::Arbitrary)
67 /// .params(),
68 /// Params::new(
69 /// NumThreads::Max(NonZero::new(8).unwrap()),
70 /// ChunkSize::Min(NonZero::new(16).unwrap()),
71 /// IterationOrder::Arbitrary
72 /// )
73 /// );
74 /// ```
75 fn params(&self) -> Params;
76
77 // params transformations
78
79 /// Sets the number of threads to be used in the parallel execution.
80 /// Integers can be used as the argument with the following mapping:
81 ///
82 /// * `0` -> `NumThreads::Auto`
83 /// * `1` -> `NumThreads::sequential()`
84 /// * `n > 0` -> `NumThreads::Max(n)`
85 ///
86 /// See [`NumThreads`] for details.
87 ///
88 /// # Examples
89 ///
90 /// ```
91 /// use orx_parallel::*;
92 /// use std::num::NonZero;
93 ///
94 /// let vec = vec![1, 2, 3, 4];
95 ///
96 /// // all available threads can be used
97 ///
98 /// assert_eq!(
99 /// vec.par().params(),
100 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
101 /// );
102 ///
103 /// assert_eq!(
104 /// vec.par().num_threads(0).chunk_size(0).params(),
105 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
106 /// );
107 ///
108 /// // computation will be executed sequentially on the main thread, no parallelization
109 ///
110 /// assert_eq!(
111 /// vec.par().num_threads(1).params(),
112 /// Params::new(
113 /// NumThreads::Max(NonZero::new(1).unwrap()),
114 /// ChunkSize::Auto,
115 /// IterationOrder::Ordered
116 /// )
117 /// );
118 ///
119 /// // maximum 4 threads can be used
120 /// assert_eq!(
121 /// vec.par().num_threads(4).chunk_size(64).params(),
122 /// Params::new(
123 /// NumThreads::Max(NonZero::new(4).unwrap()),
124 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
125 /// IterationOrder::Ordered
126 /// )
127 /// );
128 ///
129 /// // maximum 8 threads can be used
130 /// assert_eq!(
131 /// vec.par()
132 /// .num_threads(8)
133 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
134 /// .iteration_order(IterationOrder::Arbitrary)
135 /// .params(),
136 /// Params::new(
137 /// NumThreads::Max(NonZero::new(8).unwrap()),
138 /// ChunkSize::Min(NonZero::new(16).unwrap()),
139 /// IterationOrder::Arbitrary
140 /// )
141 /// );
142 /// ```
143 fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
144
145 /// Sets the number of elements to be pulled from the concurrent iterator during the
146 /// parallel execution. When integers are used as argument, the following mapping applies:
147 ///
148 /// * `0` -> `ChunkSize::Auto`
149 /// * `n > 0` -> `ChunkSize::Exact(n)`
150 ///
151 /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
152 ///
153 /// See [`ChunkSize`] for details.
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// use orx_parallel::*;
159 /// use std::num::NonZero;
160 ///
161 /// let vec = vec![1, 2, 3, 4];
162 ///
163 /// // chunk sizes will be dynamically decided by the parallel runner
164 ///
165 /// assert_eq!(
166 /// vec.par().params(),
167 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
168 /// );
169 ///
170 /// assert_eq!(
171 /// vec.par().num_threads(0).chunk_size(0).params(),
172 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
173 /// );
174 ///
175 /// assert_eq!(
176 /// vec.par().num_threads(1).params(),
177 /// Params::new(
178 /// NumThreads::Max(NonZero::new(1).unwrap()),
179 /// ChunkSize::Auto,
180 /// IterationOrder::Ordered
181 /// )
182 /// );
183 ///
184 /// // chunk size will always be 64, parallel runner cannot change
185 ///
186 /// assert_eq!(
187 /// vec.par().num_threads(4).chunk_size(64).params(),
188 /// Params::new(
189 /// NumThreads::Max(NonZero::new(4).unwrap()),
190 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
191 /// IterationOrder::Ordered
192 /// )
193 /// );
194 ///
195 /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
196 ///
197 /// assert_eq!(
198 /// vec.par()
199 /// .num_threads(8)
200 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
201 /// .iteration_order(IterationOrder::Arbitrary)
202 /// .params(),
203 /// Params::new(
204 /// NumThreads::Max(NonZero::new(8).unwrap()),
205 /// ChunkSize::Min(NonZero::new(16).unwrap()),
206 /// IterationOrder::Arbitrary
207 /// )
208 /// );
209 /// ```
210 fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
211
212 /// Sets the iteration order of the parallel computation.
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use orx_parallel::*;
218 ///
219 /// let vec = vec![1, 2, 3, 4];
220 ///
221 /// // results are collected in order consistent to the input order,
222 /// // or find returns the first element satisfying the predicate
223 ///
224 /// assert_eq!(
225 /// vec.par().params(),
226 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
227 /// );
228 ///
229 /// assert_eq!(
230 /// vec.par().iteration_order(IterationOrder::Ordered).params(),
231 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
232 /// );
233 ///
234 /// // results might be collected in arbitrary order
235 /// // or find returns the any of the elements satisfying the predicate
236 ///
237 /// assert_eq!(
238 /// vec.par().iteration_order(IterationOrder::Arbitrary).params(),
239 /// Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
240 /// );
241 /// ```
242 fn iteration_order(self, collect: IterationOrder) -> Self;
243
244 /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
245 ///
246 /// # Examples
247 ///
248 /// ```ignore
249 /// use orx_parallel::*;
250 ///
251 /// let inputs = vec![1, 2, 3, 4];
252 ///
253 /// // uses the default runner
254 /// let sum = inputs.par().sum();
255 ///
256 /// // uses the custom parallel runner MyParallelRunner: ParallelRunner
257 /// let sum = inputs.par().with_runner::<MyParallelRunner>().sum();
258 /// ```
259 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item>;
260
261 // using transformations
262
263 /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
264 /// used variable throughout the computation.
265 ///
266 /// Note that each used thread will obtain exactly one instance of the variable.
267 ///
268 /// The signature of the `using` closure is `(thread_idx: usize) -> U` which will create an instance of
269 /// `U` with respect to the `thread_idx`. The `thread_idx` is the order of the spawned thread; i.e.,
270 /// if the parallel computation uses 8 threads, the thread indices will be 0, 1, ..., 7.
271 ///
272 /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
273 ///
274 /// # Examples
275 ///
276 /// ## Example 1: Channels
277 ///
278 /// The following example is taken from rayon's `for_each_with` documentation and converted to using transformation:
279 ///
280 /// ```ignore
281 /// use orx_parallel::*;
282 /// use std::sync::mpsc::channel;
283 ///
284 /// let (sender, receiver) = channel();
285 ///
286 /// (0..5)
287 /// .into_par()
288 /// .using(|_thread_idx| sender.clone())
289 /// .for_each(|s, x| s.send(x).unwrap());
290 ///
291 /// let mut res: Vec<_> = receiver.iter().collect();
292 ///
293 /// res.sort();
294 ///
295 /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
296 /// ```
297 ///
298 /// ## Example 2: Random Number Generator
299 ///
300 /// Random number generator is one of the common use cases that is important for a certain class of algorithms.
301 ///
302 /// The following example demonstrates how to safely generate random numbers through mutable references within
303 /// a parallel computation.
304 ///
305 /// Notice the differences between sequential and parallel computation.
306 /// * In sequential computation, a mutable reference to `rng` is captured, while in parallel computation, we
307 /// explicitly define that we will be `using` a random number generator.
308 /// * Parallel iterator does not mutable capture any variable from the scope; however, using transformation
309 /// converts the `ParIter` into `ParIterUsing` which allows mutable access within all iterator methods.
310 ///
311 /// ```
312 /// use orx_parallel::*;
313 /// use rand::{Rng, SeedableRng};
314 /// use rand_chacha::ChaCha20Rng;
315 ///
316 /// fn random_walk(rng: &mut impl Rng, position: i64, num_steps: usize) -> i64 {
317 /// (0..num_steps).fold(position, |p, _| random_step(rng, p))
318 /// }
319 ///
320 /// fn random_step(rng: &mut impl Rng, position: i64) -> i64 {
321 /// match rng.random_bool(0.5) {
322 /// true => position + 1, // to right
323 /// false => position - 1, // to left
324 /// }
325 /// }
326 ///
327 /// fn input_positions() -> Vec<i64> {
328 /// (-100..=100).collect()
329 /// }
330 ///
331 /// fn sequential() {
332 /// let positions = input_positions();
333 ///
334 /// let mut rng = ChaCha20Rng::seed_from_u64(42);
335 /// let final_positions: Vec<_> = positions
336 /// .iter()
337 /// .copied()
338 /// .map(|position| random_walk(&mut rng, position, 10))
339 /// .collect();
340 /// let sum_final_positions = final_positions.iter().sum::<i64>();
341 /// }
342 ///
343 /// fn parallel() {
344 /// let positions = input_positions();
345 ///
346 /// let final_positions: Vec<_> = positions
347 /// .par()
348 /// .copied()
349 /// .using(|t_idx| ChaCha20Rng::seed_from_u64(42 * t_idx as u64))
350 /// .map(|rng, position| random_walk(rng, position, 10))
351 /// .collect();
352 /// let sum_final_positions = final_positions.iter().sum::<i64>();
353 /// }
354 ///
355 /// sequential();
356 /// parallel();
357 /// ```
358 ///
359 /// ## Example 3: Metrics Collection
360 ///
361 /// The following example demonstrates how to collect metrics about a parallel computation with `using` transformation and
362 /// some `unsafe` help with interior mutability.
363 ///
364 /// ```
365 /// use orx_parallel::*;
366 /// use std::cell::UnsafeCell;
367 ///
368 /// const N: u64 = 1_000;
369 /// const MAX_NUM_THREADS: usize = 4;
370 ///
371 /// // just some work
372 /// fn fibonacci(n: u64) -> u64 {
373 /// let mut a = 0;
374 /// let mut b = 1;
375 /// for _ in 0..n {
376 /// let c = a + b;
377 /// a = b;
378 /// b = c;
379 /// }
380 /// a
381 /// }
382 ///
383 /// #[derive(Default, Debug)]
384 /// struct ThreadMetrics {
385 /// thread_idx: usize,
386 /// num_items_handled: usize,
387 /// handled_42: bool,
388 /// num_filtered_out: usize,
389 /// }
390 ///
391 /// struct ThreadMetricsWriter<'a> {
392 /// metrics_ref: &'a mut ThreadMetrics,
393 /// }
394 ///
395 /// struct ComputationMetrics {
396 /// thread_metrics: UnsafeCell<[ThreadMetrics; MAX_NUM_THREADS]>,
397 /// }
398 /// impl ComputationMetrics {
399 /// fn new() -> Self {
400 /// let mut thread_metrics: [ThreadMetrics; MAX_NUM_THREADS] = Default::default();
401 /// for i in 0..MAX_NUM_THREADS {
402 /// thread_metrics[i].thread_idx = i;
403 /// }
404 /// Self {
405 /// thread_metrics: UnsafeCell::new(thread_metrics),
406 /// }
407 /// }
408 /// }
409 ///
410 /// impl ComputationMetrics {
411 /// unsafe fn create_for_thread<'a>(&mut self, thread_idx: usize) -> ThreadMetricsWriter<'a> {
412 /// // SAFETY: here we create a mutable variable to the thread_idx-th metrics
413 /// // * If we call this method multiple times with the same index,
414 /// // we create multiple mutable references to the same ThreadMetrics,
415 /// // which would lead to a race condition.
416 /// // * We must make sure that `create_for_thread` is called only once per thread.
417 /// // * If we use `create_for_thread` within the `using` call to create mutable values
418 /// // used by the threads, we are certain that the parallel computation
419 /// // will only call this method once per thread; hence, it will not
420 /// // cause the race condition.
421 /// // * On the other hand, we must ensure that we do not call this method
422 /// // externally.
423 /// let array = unsafe { &mut *self.thread_metrics.get() };
424 /// ThreadMetricsWriter {
425 /// metrics_ref: &mut array[thread_idx],
426 /// }
427 /// }
428 /// }
429 ///
430 /// let mut metrics = ComputationMetrics::new();
431 ///
432 /// let input: Vec<u64> = (0..N).collect();
433 ///
434 /// let sum = input
435 /// .par()
436 /// // SAFETY: we do not call `create_for_thread` externally;
437 /// // it is safe if it is called only by the parallel computation.
438 /// .using(|t| unsafe { metrics.create_for_thread(t) })
439 /// .map(|m: &mut ThreadMetricsWriter<'_>, i| {
440 /// // collect some useful metrics
441 /// m.metrics_ref.num_items_handled += 1;
442 /// m.metrics_ref.handled_42 |= *i == 42;
443 ///
444 /// // actual work
445 /// fibonacci((*i % 20) + 1) % 100
446 /// })
447 /// .filter(|m, i| {
448 /// let is_even = i % 2 == 0;
449 ///
450 /// if !is_even {
451 /// m.metrics_ref.num_filtered_out += 1;
452 /// }
453 ///
454 /// is_even
455 /// })
456 /// .num_threads(MAX_NUM_THREADS)
457 /// .sum();
458 ///
459 /// let total_by_metrics: usize = metrics
460 /// .thread_metrics
461 /// .get_mut()
462 /// .iter()
463 /// .map(|x| x.num_items_handled)
464 /// .sum();
465 ///
466 /// assert_eq!(N as usize, total_by_metrics);
467 /// ```
468 ///
469 fn using<U, F>(
470 self,
471 using: F,
472 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
473 where
474 U: Send,
475 F: FnMut(usize) -> U;
476
477 /// Converts the [`ParIter`] into [`ParIterUsing`] which will have access to a mutable reference of the
478 /// used variable throughout the computation.
479 ///
480 /// Note that each used thread will obtain exactly one instance of the variable.
481 ///
482 /// Each used thread receives a clone of the provided `value`.
483 /// Note that, `using_clone(value)` can be considered as a shorthand for `using(|_thread_idx| value.clone())`.
484 ///
485 /// Please see [`using`] for examples.
486 ///
487 /// Details of the **using transformation** can be found here: [`using.md`](https://github.com/orxfun/orx-parallel/blob/main/docs/using.md).
488 ///
489 /// [`using`]: crate::ParIter::using
490 fn using_clone<U>(
491 self,
492 value: U,
493 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
494 where
495 U: Clone + Send;
496
497 // computation transformations
498
499 /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
500 ///
501 /// # Examples
502 ///
503 /// ```
504 /// use orx_parallel::*;
505 ///
506 /// let a = [1, 2, 3];
507 ///
508 /// let iter = a.into_par().map(|x| 2 * x);
509 ///
510 /// let b: Vec<_> = iter.collect();
511 /// assert_eq!(b, &[2, 4, 6]);
512 /// ```
513 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
514 where
515 Out: Send + Sync,
516 Map: Fn(Self::Item) -> Out + Send + Sync + Clone;
517
518 /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
519 ///
520 /// # Examples
521 ///
522 /// ```
523 /// use orx_parallel::*;
524 ///
525 /// let a = [1, 2, 3];
526 ///
527 /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
528 ///
529 /// let b: Vec<_> = iter.collect();
530 /// assert_eq!(b, &[1, 3]);
531 /// ```
532 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
533 where
534 Filter: Fn(&Self::Item) -> bool + Send + Sync + Clone;
535
536 /// Creates an iterator that works like map, but flattens nested structure.
537 ///
538 /// # Examples
539 ///
540 /// ```
541 /// use orx_parallel::*;
542 ///
543 /// let words = ["alpha", "beta", "gamma"];
544 ///
545 /// // chars() returns an iterator
546 /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
547 ///
548 /// let merged: String = all_chars.iter().collect();
549 /// assert_eq!(merged, "alphabetagamma");
550 /// ```
551 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
552 where
553 IOut: IntoIterator + Send + Sync,
554 IOut::IntoIter: Send + Sync,
555 IOut::Item: Send + Sync,
556 FlatMap: Fn(Self::Item) -> IOut + Send + Sync + Clone;
557
558 /// Creates an iterator that both filters and maps.
559 ///
560 /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
561 ///
562 /// `filter_map` can be used to make chains of `filter` and `map` more concise.
563 /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
564 ///
565 /// # Examples
566 ///
567 /// ```
568 /// use orx_parallel::*;
569 ///
570 /// let a = ["1", "two", "NaN", "four", "5"];
571 ///
572 /// let numbers: Vec<_> = a
573 /// .into_par()
574 /// .filter_map(|s| s.parse::<usize>().ok())
575 /// .collect();
576 ///
577 /// assert_eq!(numbers, [1, 5]);
578 /// ```
579 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
580 where
581 Out: Send + Sync,
582 FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone;
583
584 /// Does something with each element of an iterator, passing the value on.
585 ///
586 /// When using iterators, you’ll often chain several of them together.
587 /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
588 /// To do that, insert a call to `inspect()`.
589 ///
590 /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
591 /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
592 ///
593 /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
594 /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
595 /// collect some intermediate values during parallel execution for further inspection.
596 /// The following example demonstrates such a use case.
597 ///
598 /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
599 ///
600 /// # Examples
601 ///
602 /// ```
603 /// use orx_parallel::*;
604 /// use orx_concurrent_bag::*;
605 ///
606 /// let a = vec![1, 4, 2, 3];
607 ///
608 /// // let's add some inspect() calls to investigate what's happening
609 /// // - log some events
610 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
611 /// let bag = ConcurrentBag::new();
612 ///
613 /// let sum = a
614 /// .par()
615 /// .copied()
616 /// .inspect(|x| println!("about to filter: {x}"))
617 /// .filter(|x| x % 2 == 0)
618 /// .inspect(|x| {
619 /// bag.push(*x);
620 /// println!("made it through filter: {x}");
621 /// })
622 /// .sum();
623 /// println!("{sum}");
624 ///
625 /// let mut values_made_through = bag.into_inner();
626 /// values_made_through.sort();
627 /// assert_eq!(values_made_through, [2, 4]);
628 /// ```
629 ///
630 /// This will print:
631 ///
632 /// ```console
633 /// about to filter: 1
634 /// about to filter: 4
635 /// made it through filter: 4
636 /// about to filter: 2
637 /// made it through filter: 2
638 /// about to filter: 3
639 /// 6
640 /// ```
641 fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
642 where
643 Operation: Fn(&Self::Item) + Sync + Send + Clone,
644 {
645 let map = move |x| {
646 operation(&x);
647 x
648 };
649 self.map(map)
650 }
651
652 // special item transformations
653
654 /// Creates an iterator which copies all of its elements.
655 ///
656 /// # Examples
657 ///
658 /// ```
659 /// use orx_parallel::*;
660 ///
661 /// let a = vec![1, 2, 3];
662 ///
663 /// let v_copied: Vec<_> = a.par().copied().collect();
664 ///
665 /// // copied is the same as .map(|&x| x)
666 /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
667 ///
668 /// assert_eq!(v_copied, vec![1, 2, 3]);
669 /// assert_eq!(v_map, vec![1, 2, 3]);
670 /// ```
671 fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
672 where
673 T: 'a + Copy + Send + Sync,
674 Self: ParIter<R, Item = &'a T>,
675 {
676 self.map(map_copy)
677 }
678
679 /// Creates an iterator which clones all of its elements.
680 ///
681 /// # Examples
682 ///
683 /// ```
684 /// use orx_parallel::*;
685 ///
686 /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
687 ///
688 /// let v_cloned: Vec<_> = a.par().cloned().collect();
689 ///
690 /// // cloned is the same as .map(|x| x.clone())
691 /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
692 ///
693 /// assert_eq!(
694 /// v_cloned,
695 /// vec![String::from("1"), String::from("2"), String::from("3")]
696 /// );
697 /// assert_eq!(
698 /// v_map,
699 /// vec![String::from("1"), String::from("2"), String::from("3")]
700 /// );
701 /// ```
702 fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
703 where
704 T: 'a + Clone + Send + Sync,
705 Self: ParIter<R, Item = &'a T>,
706 {
707 self.map(map_clone)
708 }
709
710 /// Creates an iterator that flattens nested structure.
711 ///
712 /// This is useful when you have an iterator of iterators or an iterator of things that can be
713 /// turned into iterators and you want to remove one level of indirection.
714 ///
715 /// # Examples
716 ///
717 /// Basic usage.
718 ///
719 /// ```
720 /// use orx_parallel::*;
721 ///
722 /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
723 /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
724 /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
725 /// ```
726 ///
727 /// Mapping and then flattening:
728 ///
729 /// ```
730 /// use orx_parallel::*;
731 ///
732 /// let words = vec!["alpha", "beta", "gamma"];
733 ///
734 /// // chars() returns an iterator
735 /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
736 /// let merged: String = all_characters.into_iter().collect();
737 /// assert_eq!(merged, "alphabetagamma");
738 /// ```
739 ///
740 /// But actually, you can write this in terms of `flat_map`,
741 /// which is preferable in this case since it conveys intent more clearly:
742 ///
743 /// ```
744 /// use orx_parallel::*;
745 ///
746 /// let words = vec!["alpha", "beta", "gamma"];
747 ///
748 /// // chars() returns an iterator
749 /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
750 /// let merged: String = all_characters.into_iter().collect();
751 /// assert_eq!(merged, "alphabetagamma");
752 /// ```
753 fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
754 where
755 Self::Item: IntoIterator,
756 <Self::Item as IntoIterator>::IntoIter: Send + Sync,
757 <Self::Item as IntoIterator>::Item: Send + Sync,
758 R: Send + Sync,
759 Self: Send + Sync,
760 {
761 let map = |e: Self::Item| e.into_iter();
762 self.flat_map(map)
763 }
764
765 // collect
766
767 /// Collects all the items from an iterator into a collection.
768 ///
769 /// This is useful when you already have a collection and want to add the iterator items to it.
770 ///
771 /// The collection is passed in as owned value, and returned back with the additional elements.
772 ///
773 /// All collections implementing [`ParCollectInto`] can be used to collect into.
774 ///
775 /// [`ParCollectInto`]: crate::ParCollectInto
776 ///
777 /// # Examples
778 ///
779 /// ```
780 /// use orx_parallel::*;
781 ///
782 /// let a = vec![1, 2, 3];
783 ///
784 /// let vec: Vec<i32> = vec![0, 1];
785 /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
786 /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
787 ///
788 /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
789 /// ```
790 fn collect_into<C>(self, output: C) -> C
791 where
792 C: ParCollectInto<Self::Item>;
793
794 /// Transforms an iterator into a collection.
795 ///
796 /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
797 /// the type of the result collection; or turbofish annotation can be used.
798 ///
799 /// All collections implementing [`ParCollectInto`] can be used to collect into.
800 ///
801 /// [`ParCollectInto`]: crate::ParCollectInto
802 ///
803 /// # Examples
804 ///
805 /// ```
806 /// use orx_parallel::*;
807 ///
808 /// let a = vec![1, 2, 3];
809 ///
810 /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
811 ///
812 /// assert_eq!(vec![2, 4, 6], doubled);
813 /// ```
814 fn collect<C>(self) -> C
815 where
816 C: ParCollectInto<Self::Item>,
817 {
818 let output = C::empty(self.con_iter().try_get_len());
819 self.collect_into(output)
820 }
821
822 // reduce
823
824 /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
825 ///
826 /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
827 ///
828 /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
829 ///
830 /// # Example
831 ///
832 /// ```
833 /// use orx_parallel::*;
834 ///
835 /// let inputs = 1..10;
836 /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
837 /// assert_eq!(reduced, 45);
838 /// ```
839 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
840 where
841 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync;
842
843 /// Tests if every element of the iterator matches a predicate.
844 ///
845 /// `all` takes a `predicate` that returns true or false.
846 /// It applies this closure to each element of the iterator,
847 /// and if they all return true, then so does `all`.
848 /// If any of them returns false, it returns false.
849 ///
850 /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
851 /// given that no matter what else happens, the result will also be false.
852 ///
853 /// An empty iterator returns true.
854 ///
855 /// # Examples
856 ///
857 /// ```
858 /// use orx_parallel::*;
859 ///
860 /// let mut a = vec![1, 2, 3];
861 /// assert!(a.par().all(|x| **x > 0));
862 /// assert!(!a.par().all(|x| **x > 2));
863 ///
864 /// a.clear();
865 /// assert!(a.par().all(|x| **x > 2)); // empty iterator
866 /// ```
867 fn all<Predicate>(self, predicate: Predicate) -> bool
868 where
869 Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
870 {
871 let violates = |x: &Self::Item| !predicate(x);
872 self.find(violates).is_none()
873 }
874
875 /// Tests if any element of the iterator matches a predicate.
876 ///
877 /// `any` takes a `predicate` that returns true or false.
878 /// It applies this closure to each element of the iterator,
879 /// and if any of the elements returns true, then so does `any`.
880 /// If all of them return false, it returns false.
881 ///
882 /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
883 /// given that no matter what else happens, the result will also be true.
884 ///
885 /// An empty iterator returns false.
886 ///
887 /// # Examples
888 ///
889 /// ```
890 /// use orx_parallel::*;
891 ///
892 /// let mut a = vec![1, 2, 3];
893 /// assert!(a.par().any(|x| **x > 0));
894 /// assert!(!a.par().any(|x| **x > 5));
895 ///
896 /// a.clear();
897 /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
898 /// ```
899 fn any<Predicate>(self, predicate: Predicate) -> bool
900 where
901 Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
902 {
903 self.find(predicate).is_some()
904 }
905
906 /// Consumes the iterator, counting the number of iterations and returning it.
907 ///
908 /// # Examples
909 ///
910 /// ```
911 /// use orx_parallel::*;
912 ///
913 /// let a = vec![1, 2, 3];
914 /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
915 /// ```
916 fn count(self) -> usize {
917 self.map(map_count).reduce(reduce_sum).unwrap_or(0)
918 }
919
920 /// Calls a closure on each element of an iterator.
921 ///
922 /// # Examples
923 ///
924 /// Basic usage:
925 ///
926 /// ```
927 /// use orx_parallel::*;
928 /// use std::sync::mpsc::channel;
929 ///
930 /// let (tx, rx) = channel();
931 /// (0..5)
932 /// .par()
933 /// .map(|x| x * 2 + 1)
934 /// .for_each(move |x| tx.send(x).unwrap());
935 ///
936 /// let mut v: Vec<_> = rx.iter().collect();
937 /// v.sort(); // order can be mixed, since messages will be sent in parallel
938 /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
939 /// ```
940 ///
941 /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
942 /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
943 /// In the following example, we log every element that satisfies a predicate in parallel.
944 ///
945 /// ```
946 /// use orx_parallel::*;
947 ///
948 /// (0..5)
949 /// .par()
950 /// .flat_map(|x| x * 100..x * 110)
951 /// .filter(|&x| x % 3 == 0)
952 /// .for_each(|x| println!("{x}"));
953 /// ```
954 fn for_each<Operation>(self, operation: Operation)
955 where
956 Operation: Fn(Self::Item) + Sync + Send,
957 {
958 let map = |x| operation(x);
959 let _ = self.map(map).reduce(reduce_unit);
960 }
961
962 /// Returns the maximum element of an iterator.
963 ///
964 /// If the iterator is empty, None is returned.
965 ///
966 /// # Examples
967 ///
968 /// ```
969 /// use orx_parallel::*;
970 ///
971 /// let a = vec![1, 2, 3];
972 /// let b: Vec<u32> = Vec::new();
973 ///
974 /// assert_eq!(a.par().max(), Some(&3));
975 /// assert_eq!(b.par().max(), None);
976 /// ```
977 fn max(self) -> Option<Self::Item>
978 where
979 Self::Item: Ord,
980 {
981 self.reduce(Ord::max)
982 }
983
984 /// Returns the element that gives the maximum value with respect to the specified `compare` function.
985 ///
986 /// If the iterator is empty, None is returned.
987 ///
988 /// # Examples
989 ///
990 /// ```
991 /// use orx_parallel::*;
992 ///
993 /// let a = vec![-3_i32, 0, 1, 5, -10];
994 /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
995 /// ```
996 fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
997 where
998 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
999 {
1000 let reduce = |x, y| match compare(&x, &y) {
1001 Ordering::Greater | Ordering::Equal => x,
1002 Ordering::Less => y,
1003 };
1004 self.reduce(reduce)
1005 }
1006
1007 /// Returns the element that gives the maximum value from the specified function.
1008 ///
1009 /// If the iterator is empty, None is returned.
1010 ///
1011 /// # Examples
1012 ///
1013 /// ```
1014 /// use orx_parallel::*;
1015 ///
1016 /// let a = vec![-3_i32, 0, 1, 5, -10];
1017 /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
1018 /// ```
1019 fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
1020 where
1021 Key: Ord,
1022 GetKey: Fn(&Self::Item) -> Key + Sync,
1023 {
1024 let reduce = |x, y| match key(&x).cmp(&key(&y)) {
1025 Ordering::Greater | Ordering::Equal => x,
1026 Ordering::Less => y,
1027 };
1028 self.reduce(reduce)
1029 }
1030
1031 /// Returns the minimum element of an iterator.
1032 ///
1033 /// If the iterator is empty, None is returned.
1034 ///
1035 /// # Examples
1036 ///
1037 /// ```
1038 /// use orx_parallel::*;
1039 ///
1040 /// let a = vec![1, 2, 3];
1041 /// let b: Vec<u32> = Vec::new();
1042 ///
1043 /// assert_eq!(a.par().min(), Some(&1));
1044 /// assert_eq!(b.par().min(), None);
1045 /// ```
1046 fn min(self) -> Option<Self::Item>
1047 where
1048 Self::Item: Ord,
1049 {
1050 self.reduce(Ord::min)
1051 }
1052
1053 /// Returns the element that gives the minimum value with respect to the specified `compare` function.
1054 ///
1055 /// If the iterator is empty, None is returned.
1056 ///
1057 /// # Examples
1058 ///
1059 /// ```
1060 /// use orx_parallel::*;
1061 ///
1062 /// let a = vec![-3_i32, 0, 1, 5, -10];
1063 /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
1064 /// ```
1065 fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
1066 where
1067 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
1068 {
1069 let reduce = |x, y| match compare(&x, &y) {
1070 Ordering::Less | Ordering::Equal => x,
1071 Ordering::Greater => y,
1072 };
1073 self.reduce(reduce)
1074 }
1075
1076 /// Returns the element that gives the minimum value from the specified function.
1077 ///
1078 /// If the iterator is empty, None is returned.
1079 ///
1080 /// # Examples
1081 ///
1082 /// ```
1083 /// use orx_parallel::*;
1084 ///
1085 /// let a = vec![-3_i32, 0, 1, 5, -10];
1086 /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
1087 /// ```
1088 fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
1089 where
1090 Key: Ord,
1091 GetKey: Fn(&Self::Item) -> Key + Sync,
1092 {
1093 let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
1094 Ordering::Less | Ordering::Equal => x,
1095 Ordering::Greater => y,
1096 };
1097 self.reduce(reduce)
1098 }
1099
1100 /// Sums the elements of an iterator.
1101 ///
1102 /// Takes each element, adds them together, and returns the result.
1103 ///
1104 /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
1105 ///
1106 /// `sum` can be used to sum any type implementing [`Sum<Out>`].
1107 ///
1108 /// [`Sum<Out>`]: crate::Sum
1109 ///
1110 /// # Examples
1111 ///
1112 /// ```
1113 /// use orx_parallel::*;
1114 ///
1115 /// let a = vec![1, 2, 3];
1116 /// let sum: i32 = a.par().sum();
1117 ///
1118 /// assert_eq!(sum, 6);
1119 /// ```
1120 fn sum<Out>(self) -> Out
1121 where
1122 Self::Item: Sum<Out>,
1123 Out: Send + Sync,
1124 {
1125 self.map(Self::Item::map)
1126 .reduce(Self::Item::reduce)
1127 .unwrap_or(Self::Item::zero())
1128 }
1129
1130 // early exit
1131
1132 /// Returns the first (or any) element of the iterator; returns None if it is empty.
1133 ///
1134 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
1135 /// * any element is returned if `IterationOrder::Arbitrary` is set.
1136 ///
1137 /// # Examples
1138 ///
1139 /// The following example demonstrates the usage of first with default `Ordered` iteration.
1140 /// This guarantees that the first element with respect to position in the input sequence
1141 /// is returned.
1142 ///
1143 /// ```
1144 /// use orx_parallel::*;
1145 ///
1146 /// let a: Vec<usize> = vec![];
1147 /// assert_eq!(a.par().copied().first(), None);
1148 ///
1149 /// let a = vec![1, 2, 3];
1150 /// assert_eq!(a.par().copied().first(), Some(1));
1151 ///
1152 /// let a = 1..10_000;
1153 /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
1154 /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
1155 ///
1156 /// // or equivalently,
1157 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1158 /// ```
1159 ///
1160 /// When the order is set to `Arbitrary`, `first` might return any of the elements,
1161 /// whichever is visited first depending on the parallel execution.
1162 ///
1163 /// ```
1164 /// use orx_parallel::*;
1165 ///
1166 /// let a = 1..10_000;
1167 ///
1168 /// // might return either of 3421 or 2*3421
1169 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
1170 /// assert!([3421, 2 * 3421].contains(&any));
1171 ///
1172 /// // or equivalently,
1173 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1174 /// assert!([3421, 2 * 3421].contains(&any));
1175 fn first(self) -> Option<Self::Item>;
1176
1177 /// Searches for an element of an iterator that satisfies a `predicate`.
1178 ///
1179 /// Depending on the set iteration order of the parallel iterator, returns
1180 ///
1181 /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
1182 /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
1183 ///
1184 /// `find` takes a closure that returns true or false.
1185 /// It applies this closure to each element of the iterator,
1186 /// and returns `Some(x)` where `x` is the first element that returns true.
1187 /// If they all return false, it returns None.
1188 ///
1189 /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
1190 ///
1191 /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
1192 ///
1193 /// # Examples
1194 ///
1195 /// The following example demonstrates the usage of first with default `Ordered` iteration.
1196 /// This guarantees that the first element with respect to position in the input sequence
1197 /// is returned.
1198 ///
1199 /// ```
1200 /// use orx_parallel::*;
1201 ///
1202 /// let a = 1..10_000;
1203 /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
1204 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
1205 /// ```
1206 ///
1207 /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
1208 /// whichever is found first depending on the parallel execution.
1209 ///
1210 /// ```
1211 /// use orx_parallel::*;
1212 ///
1213 /// let a = 1..10_000;
1214 ///
1215 /// // might return either of 3421 or 2*3421
1216 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
1217 /// assert!([3421, 2 * 3421].contains(&any));
1218 /// ```
1219 fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
1220 where
1221 Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
1222 {
1223 self.filter(predicate).first()
1224 }
1225}