orx_parallel/par_iter.rs
1use crate::{
2 Params,
3 collect_into::ParCollectInto,
4 computations::{map_clone, map_copy, map_count, reduce_sum, reduce_unit},
5 parameters::{ChunkSize, IterationOrder, NumThreads},
6 runner::{DefaultRunner, ParallelRunner},
7 special_type_sets::Sum,
8};
9use orx_concurrent_iter::ConcurrentIter;
10use std::cmp::Ordering;
11
12/// Parallel iterator.
13pub trait ParIter<R = DefaultRunner>: Sized + Send + Sync
14where
15 R: ParallelRunner,
16{
17 /// Element type of the parallel iterator.
18 type Item: Send + Sync;
19
20 /// Returns a reference to the input concurrent iterator.
21 fn con_iter(&self) -> &impl ConcurrentIter;
22
23 /// Parameters of the parallel iterator.
24 ///
25 /// # Examples
26 ///
27 /// ```
28 /// use orx_parallel::*;
29 /// use std::num::NonZero;
30 ///
31 /// let vec = vec![1, 2, 3, 4];
32 ///
33 /// assert_eq!(
34 /// vec.par().params(),
35 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
36 /// );
37 ///
38 /// assert_eq!(
39 /// vec.par().num_threads(0).chunk_size(0).params(),
40 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
41 /// );
42 ///
43 /// assert_eq!(
44 /// vec.par().num_threads(1).params(),
45 /// &Params::new(
46 /// NumThreads::Max(NonZero::new(1).unwrap()),
47 /// ChunkSize::Auto,
48 /// IterationOrder::Ordered
49 /// )
50 /// );
51 ///
52 /// assert_eq!(
53 /// vec.par().num_threads(4).chunk_size(64).params(),
54 /// &Params::new(
55 /// NumThreads::Max(NonZero::new(4).unwrap()),
56 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
57 /// IterationOrder::Ordered
58 /// )
59 /// );
60 ///
61 /// assert_eq!(
62 /// vec.par()
63 /// .num_threads(8)
64 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
65 /// .iteration_order(IterationOrder::Arbitrary)
66 /// .params(),
67 /// &Params::new(
68 /// NumThreads::Max(NonZero::new(8).unwrap()),
69 /// ChunkSize::Min(NonZero::new(16).unwrap()),
70 /// IterationOrder::Arbitrary
71 /// )
72 /// );
73 /// ```
74 fn params(&self) -> &Params;
75
76 // params transformations
77
78 /// Sets the number of threads to be used in the parallel execution.
79 /// Integers can be used as the argument with the following mapping:
80 ///
81 /// * `0` -> `NumThreads::Auto`
82 /// * `1` -> `NumThreads::sequential()`
83 /// * `n > 0` -> `NumThreads::Max(n)`
84 ///
85 /// See [`NumThreads`] for details.
86 ///
87 /// # Examples
88 ///
89 /// ```
90 /// use orx_parallel::*;
91 /// use std::num::NonZero;
92 ///
93 /// let vec = vec![1, 2, 3, 4];
94 ///
95 /// // all available threads can be used
96 ///
97 /// assert_eq!(
98 /// vec.par().params(),
99 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
100 /// );
101 ///
102 /// assert_eq!(
103 /// vec.par().num_threads(0).chunk_size(0).params(),
104 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
105 /// );
106 ///
107 /// // computation will be executed sequentially on the main thread, no parallelization
108 ///
109 /// assert_eq!(
110 /// vec.par().num_threads(1).params(),
111 /// &Params::new(
112 /// NumThreads::Max(NonZero::new(1).unwrap()),
113 /// ChunkSize::Auto,
114 /// IterationOrder::Ordered
115 /// )
116 /// );
117 ///
118 /// // maximum 4 threads can be used
119 /// assert_eq!(
120 /// vec.par().num_threads(4).chunk_size(64).params(),
121 /// &Params::new(
122 /// NumThreads::Max(NonZero::new(4).unwrap()),
123 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
124 /// IterationOrder::Ordered
125 /// )
126 /// );
127 ///
128 /// // maximum 8 threads can be used
129 /// assert_eq!(
130 /// vec.par()
131 /// .num_threads(8)
132 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
133 /// .iteration_order(IterationOrder::Arbitrary)
134 /// .params(),
135 /// &Params::new(
136 /// NumThreads::Max(NonZero::new(8).unwrap()),
137 /// ChunkSize::Min(NonZero::new(16).unwrap()),
138 /// IterationOrder::Arbitrary
139 /// )
140 /// );
141 /// ```
142 fn num_threads(self, num_threads: impl Into<NumThreads>) -> Self;
143
144 /// Sets the number of elements to be pulled from the concurrent iterator during the
145 /// parallel execution. When integers are used as argument, the following mapping applies:
146 ///
147 /// * `0` -> `ChunkSize::Auto`
148 /// * `n > 0` -> `ChunkSize::Exact(n)`
149 ///
150 /// Please use the default enum constructor for creating `ChunkSize::Min` variant.
151 ///
152 /// See [`ChunkSize`] for details.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use orx_parallel::*;
158 /// use std::num::NonZero;
159 ///
160 /// let vec = vec![1, 2, 3, 4];
161 ///
162 /// // chunk sizes will be dynamically decided by the parallel runner
163 ///
164 /// assert_eq!(
165 /// vec.par().params(),
166 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
167 /// );
168 ///
169 /// assert_eq!(
170 /// vec.par().num_threads(0).chunk_size(0).params(),
171 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
172 /// );
173 ///
174 /// assert_eq!(
175 /// vec.par().num_threads(1).params(),
176 /// &Params::new(
177 /// NumThreads::Max(NonZero::new(1).unwrap()),
178 /// ChunkSize::Auto,
179 /// IterationOrder::Ordered
180 /// )
181 /// );
182 ///
183 /// // chunk size will always be 64, parallel runner cannot change
184 ///
185 /// assert_eq!(
186 /// vec.par().num_threads(4).chunk_size(64).params(),
187 /// &Params::new(
188 /// NumThreads::Max(NonZero::new(4).unwrap()),
189 /// ChunkSize::Exact(NonZero::new(64).unwrap()),
190 /// IterationOrder::Ordered
191 /// )
192 /// );
193 ///
194 /// // minimum chunk size will be 16, but can be dynamically increased by the parallel runner
195 ///
196 /// assert_eq!(
197 /// vec.par()
198 /// .num_threads(8)
199 /// .chunk_size(ChunkSize::Min(NonZero::new(16).unwrap()))
200 /// .iteration_order(IterationOrder::Arbitrary)
201 /// .params(),
202 /// &Params::new(
203 /// NumThreads::Max(NonZero::new(8).unwrap()),
204 /// ChunkSize::Min(NonZero::new(16).unwrap()),
205 /// IterationOrder::Arbitrary
206 /// )
207 /// );
208 /// ```
209 fn chunk_size(self, chunk_size: impl Into<ChunkSize>) -> Self;
210
211 /// Sets the iteration order of the parallel computation.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use orx_parallel::*;
217 ///
218 /// let vec = vec![1, 2, 3, 4];
219 ///
220 /// // results are collected in order consistent to the input order,
221 /// // or find returns the first element satisfying the predicate
222 ///
223 /// assert_eq!(
224 /// vec.par().params(),
225 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
226 /// );
227 ///
228 /// assert_eq!(
229 /// vec.par().iteration_order(IterationOrder::Ordered).params(),
230 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Ordered)
231 /// );
232 ///
233 /// // results might be collected in arbitrary order
234 /// // or find returns the any of the elements satisfying the predicate
235 ///
236 /// assert_eq!(
237 /// vec.par().iteration_order(IterationOrder::Arbitrary).params(),
238 /// &Params::new(NumThreads::Auto, ChunkSize::Auto, IterationOrder::Arbitrary)
239 /// );
240 /// ```
241 fn iteration_order(self, collect: IterationOrder) -> Self;
242
243 /// Rather than the [`DefaultRunner`], uses the parallel runner `Q` which implements [`ParallelRunner`].
244 ///
245 /// # Examples
246 ///
247 /// ```ignore
248 /// use orx_parallel::*;
249 ///
250 /// let inputs = vec![1, 2, 3, 4];
251 ///
252 /// // uses the default runner
253 /// let sum = inputs.par().sum();
254 ///
255 /// // uses the custom parallel runner MyParallelRunner: ParallelRunner
256 /// let sum = inputs.par().with_runner::<MyParallelRunner>().sum();
257 /// ```
258 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item>;
259
260 // computation transformations
261
262 /// Takes a closure `map` and creates a parallel iterator which calls that closure on each element.
263 ///
264 /// # Examples
265 ///
266 /// ```
267 /// use orx_parallel::*;
268 ///
269 /// let a = [1, 2, 3];
270 ///
271 /// let iter = a.into_par().map(|x| 2 * x);
272 ///
273 /// let b: Vec<_> = iter.collect();
274 /// assert_eq!(b, &[2, 4, 6]);
275 /// ```
276 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
277 where
278 Out: Send + Sync,
279 Map: Fn(Self::Item) -> Out + Send + Sync + Clone;
280
281 /// Creates an iterator which uses a closure `filter` to determine if an element should be yielded.
282 ///
283 /// # Examples
284 ///
285 /// ```
286 /// use orx_parallel::*;
287 ///
288 /// let a = [1, 2, 3];
289 ///
290 /// let iter = a.into_par().filter(|x| *x % 2 == 1).copied();
291 ///
292 /// let b: Vec<_> = iter.collect();
293 /// assert_eq!(b, &[1, 3]);
294 /// ```
295 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
296 where
297 Filter: Fn(&Self::Item) -> bool + Send + Sync + Clone;
298
299 /// Creates an iterator that works like map, but flattens nested structure.
300 ///
301 /// # Examples
302 ///
303 /// ```
304 /// use orx_parallel::*;
305 ///
306 /// let words = ["alpha", "beta", "gamma"];
307 ///
308 /// // chars() returns an iterator
309 /// let all_chars: Vec<_> = words.into_par().flat_map(|s| s.chars()).collect();
310 ///
311 /// let merged: String = all_chars.iter().collect();
312 /// assert_eq!(merged, "alphabetagamma");
313 /// ```
314 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
315 where
316 IOut: IntoIterator + Send + Sync,
317 IOut::IntoIter: Send + Sync,
318 IOut::Item: Send + Sync,
319 FlatMap: Fn(Self::Item) -> IOut + Send + Sync + Clone;
320
321 /// Creates an iterator that both filters and maps.
322 ///
323 /// The returned iterator yields only the values for which the supplied closure `filter_map` returns `Some(value)`.
324 ///
325 /// `filter_map` can be used to make chains of `filter` and `map` more concise.
326 /// The example below shows how a `map().filter().map()` can be shortened to a single call to `filter_map`.
327 ///
328 /// # Examples
329 ///
330 /// ```
331 /// use orx_parallel::*;
332 ///
333 /// let a = ["1", "two", "NaN", "four", "5"];
334 ///
335 /// let numbers: Vec<_> = a
336 /// .into_par()
337 /// .filter_map(|s| s.parse::<usize>().ok())
338 /// .collect();
339 ///
340 /// assert_eq!(numbers, [1, 5]);
341 /// ```
342 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
343 where
344 Out: Send + Sync,
345 FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone;
346
347 /// Does something with each element of an iterator, passing the value on.
348 ///
349 /// When using iterators, you’ll often chain several of them together.
350 /// While working on such code, you might want to check out what’s happening at various parts in the pipeline.
351 /// To do that, insert a call to `inspect()`.
352 ///
353 /// It’s more common for `inspect()` to be used as a debugging tool than to exist in your final code,
354 /// but applications may find it useful in certain situations when errors need to be logged before being discarded.
355 ///
356 /// It is often convenient to use thread-safe collections such as [`ConcurrentBag`] and
357 /// [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec) to
358 /// collect some intermediate values during parallel execution for further inspection.
359 /// The following example demonstrates such a use case.
360 ///
361 /// [`ConcurrentBag`]: orx_concurrent_bag::ConcurrentBag
362 ///
363 /// # Examples
364 ///
365 /// ```
366 /// use orx_parallel::*;
367 /// use orx_concurrent_bag::*;
368 ///
369 /// let a = vec![1, 4, 2, 3];
370 ///
371 /// // let's add some inspect() calls to investigate what's happening
372 /// // - log some events
373 /// // - use a concurrent bag to collect and investigate numbers contributing to the sum
374 /// let bag = ConcurrentBag::new();
375 ///
376 /// let sum = a
377 /// .par()
378 /// .copied()
379 /// .inspect(|x| println!("about to filter: {x}"))
380 /// .filter(|x| x % 2 == 0)
381 /// .inspect(|x| {
382 /// bag.push(*x);
383 /// println!("made it through filter: {x}");
384 /// })
385 /// .sum();
386 /// println!("{sum}");
387 ///
388 /// let mut values_made_through = bag.into_inner();
389 /// values_made_through.sort();
390 /// assert_eq!(values_made_through, [2, 4]);
391 /// ```
392 ///
393 /// This will print:
394 ///
395 /// ```console
396 /// about to filter: 1
397 /// about to filter: 4
398 /// made it through filter: 4
399 /// about to filter: 2
400 /// made it through filter: 2
401 /// about to filter: 3
402 /// 6
403 /// ```
404 fn inspect<Operation>(self, operation: Operation) -> impl ParIter<R, Item = Self::Item>
405 where
406 Operation: Fn(&Self::Item) + Sync + Send + Clone,
407 {
408 let map = move |x| {
409 operation(&x);
410 x
411 };
412 self.map(map)
413 }
414
415 // special item transformations
416
417 /// Creates an iterator which copies all of its elements.
418 ///
419 /// # Examples
420 ///
421 /// ```
422 /// use orx_parallel::*;
423 ///
424 /// let a = vec![1, 2, 3];
425 ///
426 /// let v_copied: Vec<_> = a.par().copied().collect();
427 ///
428 /// // copied is the same as .map(|&x| x)
429 /// let v_map: Vec<_> = a.par().map(|&x| x).collect();
430 ///
431 /// assert_eq!(v_copied, vec![1, 2, 3]);
432 /// assert_eq!(v_map, vec![1, 2, 3]);
433 /// ```
434 fn copied<'a, T>(self) -> impl ParIter<R, Item = T>
435 where
436 T: 'a + Copy + Send + Sync,
437 Self: ParIter<R, Item = &'a T>,
438 {
439 self.map(map_copy)
440 }
441
442 /// Creates an iterator which clones all of its elements.
443 ///
444 /// # Examples
445 ///
446 /// ```
447 /// use orx_parallel::*;
448 ///
449 /// let a: Vec<_> = [1, 2, 3].map(|x| x.to_string()).into_iter().collect();
450 ///
451 /// let v_cloned: Vec<_> = a.par().cloned().collect();
452 ///
453 /// // cloned is the same as .map(|x| x.clone())
454 /// let v_map: Vec<_> = a.par().map(|x| x.clone()).collect();
455 ///
456 /// assert_eq!(
457 /// v_cloned,
458 /// vec![String::from("1"), String::from("2"), String::from("3")]
459 /// );
460 /// assert_eq!(
461 /// v_map,
462 /// vec![String::from("1"), String::from("2"), String::from("3")]
463 /// );
464 /// ```
465 fn cloned<'a, T>(self) -> impl ParIter<R, Item = T>
466 where
467 T: 'a + Clone + Send + Sync,
468 Self: ParIter<R, Item = &'a T>,
469 {
470 self.map(map_clone)
471 }
472
473 /// Creates an iterator that flattens nested structure.
474 ///
475 /// This is useful when you have an iterator of iterators or an iterator of things that can be
476 /// turned into iterators and you want to remove one level of indirection.
477 ///
478 /// # Examples
479 ///
480 /// Basic usage.
481 ///
482 /// ```
483 /// use orx_parallel::*;
484 ///
485 /// let data = vec![vec![1, 2, 3, 4], vec![5, 6]];
486 /// let flattened = data.into_par().flatten().collect::<Vec<u8>>();
487 /// assert_eq!(flattened, &[1, 2, 3, 4, 5, 6]);
488 /// ```
489 ///
490 /// Mapping and then flattening:
491 ///
492 /// ```
493 /// use orx_parallel::*;
494 ///
495 /// let words = vec!["alpha", "beta", "gamma"];
496 ///
497 /// // chars() returns an iterator
498 /// let all_characters: Vec<_> = words.par().map(|s| s.chars()).flatten().collect();
499 /// let merged: String = all_characters.into_iter().collect();
500 /// assert_eq!(merged, "alphabetagamma");
501 /// ```
502 ///
503 /// But actually, you can write this in terms of `flat_map`,
504 /// which is preferable in this case since it conveys intent more clearly:
505 ///
506 /// ```
507 /// use orx_parallel::*;
508 ///
509 /// let words = vec!["alpha", "beta", "gamma"];
510 ///
511 /// // chars() returns an iterator
512 /// let all_characters: Vec<_> = words.par().flat_map(|s| s.chars()).collect();
513 /// let merged: String = all_characters.into_iter().collect();
514 /// assert_eq!(merged, "alphabetagamma");
515 /// ```
516 fn flatten(self) -> impl ParIter<R, Item = <Self::Item as IntoIterator>::Item>
517 where
518 Self::Item: IntoIterator,
519 <Self::Item as IntoIterator>::IntoIter: Send + Sync,
520 <Self::Item as IntoIterator>::Item: Send + Sync,
521 R: Send + Sync,
522 Self: Send + Sync,
523 {
524 let map = |e: Self::Item| e.into_iter();
525 self.flat_map(map)
526 }
527
528 // collect
529
530 /// Collects all the items from an iterator into a collection.
531 ///
532 /// This is useful when you already have a collection and want to add the iterator items to it.
533 ///
534 /// The collection is passed in as owned value, and returned back with the additional elements.
535 ///
536 /// All collections implementing [`ParCollectInto`] can be used to collect into.
537 ///
538 /// [`ParCollectInto`]: crate::ParCollectInto
539 ///
540 /// # Examples
541 ///
542 /// ```
543 /// use orx_parallel::*;
544 ///
545 /// let a = vec![1, 2, 3];
546 ///
547 /// let vec: Vec<i32> = vec![0, 1];
548 /// let vec = a.par().map(|&x| x * 2).collect_into(vec);
549 /// let vec = a.par().map(|&x| x * 10).collect_into(vec);
550 ///
551 /// assert_eq!(vec, vec![0, 1, 2, 4, 6, 10, 20, 30]);
552 /// ```
553 fn collect_into<C>(self, output: C) -> C
554 where
555 C: ParCollectInto<Self::Item>;
556
557 /// Transforms an iterator into a collection.
558 ///
559 /// Similar to [`Iterator::collect`], the type annotation on the left-hand-side determines
560 /// the type of the result collection; or turbofish annotation can be used.
561 ///
562 /// All collections implementing [`ParCollectInto`] can be used to collect into.
563 ///
564 /// [`ParCollectInto`]: crate::ParCollectInto
565 ///
566 /// # Examples
567 ///
568 /// ```
569 /// use orx_parallel::*;
570 ///
571 /// let a = vec![1, 2, 3];
572 ///
573 /// let doubled: Vec<i32> = a.par().map(|&x| x * 2).collect();
574 ///
575 /// assert_eq!(vec![2, 4, 6], doubled);
576 /// ```
577 fn collect<C>(self) -> C
578 where
579 C: ParCollectInto<Self::Item>,
580 {
581 let output = C::empty(self.con_iter().try_get_len());
582 self.collect_into(output)
583 }
584
585 // reduce
586
587 /// Reduces the elements to a single one, by repeatedly applying a reducing operation.
588 ///
589 /// If the iterator is empty, returns `None`; otherwise, returns the result of the reduction.
590 ///
591 /// The `reduce` function is a closure with two arguments: an ‘accumulator’, and an element.
592 ///
593 /// # Example
594 ///
595 /// ```
596 /// use orx_parallel::*;
597 ///
598 /// let inputs = 1..10;
599 /// let reduced: usize = inputs.par().reduce(|acc, e| acc + e).unwrap_or(0);
600 /// assert_eq!(reduced, 45);
601 /// ```
602 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
603 where
604 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync;
605
606 /// Tests if every element of the iterator matches a predicate.
607 ///
608 /// `all` takes a `predicate` that returns true or false.
609 /// It applies this closure to each element of the iterator,
610 /// and if they all return true, then so does `all`.
611 /// If any of them returns false, it returns false.
612 ///
613 /// `all` is short-circuiting; in other words, it will stop processing as soon as it finds a false,
614 /// given that no matter what else happens, the result will also be false.
615 ///
616 /// An empty iterator returns true.
617 ///
618 /// # Examples
619 ///
620 /// ```
621 /// use orx_parallel::*;
622 ///
623 /// let mut a = vec![1, 2, 3];
624 /// assert!(a.par().all(|x| **x > 0));
625 /// assert!(!a.par().all(|x| **x > 2));
626 ///
627 /// a.clear();
628 /// assert!(a.par().all(|x| **x > 2)); // empty iterator
629 /// ```
630 fn all<Predicate>(self, predicate: Predicate) -> bool
631 where
632 Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
633 {
634 let violates = |x: &Self::Item| !predicate(x);
635 self.find(violates).is_none()
636 }
637
638 /// Tests if any element of the iterator matches a predicate.
639 ///
640 /// `any` takes a `predicate` that returns true or false.
641 /// It applies this closure to each element of the iterator,
642 /// and if any of the elements returns true, then so does `any`.
643 /// If all of them return false, it returns false.
644 ///
645 /// `any` is short-circuiting; in other words, it will stop processing as soon as it finds a true,
646 /// given that no matter what else happens, the result will also be true.
647 ///
648 /// An empty iterator returns false.
649 ///
650 /// # Examples
651 ///
652 /// ```
653 /// use orx_parallel::*;
654 ///
655 /// let mut a = vec![1, 2, 3];
656 /// assert!(a.par().any(|x| **x > 0));
657 /// assert!(!a.par().any(|x| **x > 5));
658 ///
659 /// a.clear();
660 /// assert!(!a.par().any(|x| **x > 0)); // empty iterator
661 /// ```
662 fn any<Predicate>(self, predicate: Predicate) -> bool
663 where
664 Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
665 {
666 self.find(predicate).is_some()
667 }
668
669 /// Consumes the iterator, counting the number of iterations and returning it.
670 ///
671 /// # Examples
672 ///
673 /// ```
674 /// use orx_parallel::*;
675 ///
676 /// let a = vec![1, 2, 3];
677 /// assert_eq!(a.par().filter(|x| **x >= 2).count(), 2);
678 /// ```
679 fn count(self) -> usize {
680 self.map(map_count).reduce(reduce_sum).unwrap_or(0)
681 }
682
683 /// Calls a closure on each element of an iterator.
684 ///
685 /// # Examples
686 ///
687 /// Basic usage:
688 ///
689 /// ```
690 /// use orx_parallel::*;
691 /// use std::sync::mpsc::channel;
692 ///
693 /// let (tx, rx) = channel();
694 /// (0..5)
695 /// .par()
696 /// .map(|x| x * 2 + 1)
697 /// .for_each(move |x| tx.send(x).unwrap());
698 ///
699 /// let mut v: Vec<_> = rx.iter().collect();
700 /// v.sort(); // order can be mixed, since messages will be sent in parallel
701 /// assert_eq!(v, vec![1, 3, 5, 7, 9]);
702 /// ```
703 ///
704 /// Note that since parallel iterators cannot be used within the `for` loop as regular iterators,
705 /// `for_each` provides a way to perform arbitrary for loops on parallel iterators.
706 /// In the following example, we log every element that satisfies a predicate in parallel.
707 ///
708 /// ```
709 /// use orx_parallel::*;
710 ///
711 /// (0..5)
712 /// .par()
713 /// .flat_map(|x| x * 100..x * 110)
714 /// .filter(|&x| x % 3 == 0)
715 /// .for_each(|x| println!("{x}"));
716 /// ```
717 fn for_each<Operation>(self, operation: Operation)
718 where
719 Operation: Fn(Self::Item) + Sync + Send,
720 {
721 let map = |x| operation(x);
722 let _ = self.map(map).reduce(reduce_unit);
723 }
724
725 /// Returns the maximum element of an iterator.
726 ///
727 /// If the iterator is empty, None is returned.
728 ///
729 /// # Examples
730 ///
731 /// ```
732 /// use orx_parallel::*;
733 ///
734 /// let a = vec![1, 2, 3];
735 /// let b: Vec<u32> = Vec::new();
736 ///
737 /// assert_eq!(a.par().max(), Some(&3));
738 /// assert_eq!(b.par().max(), None);
739 /// ```
740 fn max(self) -> Option<Self::Item>
741 where
742 Self::Item: Ord,
743 {
744 self.reduce(Ord::max)
745 }
746
747 /// Returns the element that gives the maximum value with respect to the specified `compare` function.
748 ///
749 /// If the iterator is empty, None is returned.
750 ///
751 /// # Examples
752 ///
753 /// ```
754 /// use orx_parallel::*;
755 ///
756 /// let a = vec![-3_i32, 0, 1, 5, -10];
757 /// assert_eq!(*a.par().max_by(|x, y| x.cmp(y)).unwrap(), 5);
758 /// ```
759 fn max_by<Compare>(self, compare: Compare) -> Option<Self::Item>
760 where
761 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
762 {
763 let reduce = |x, y| match compare(&x, &y) {
764 Ordering::Greater | Ordering::Equal => x,
765 Ordering::Less => y,
766 };
767 self.reduce(reduce)
768 }
769
770 /// Returns the element that gives the maximum value from the specified function.
771 ///
772 /// If the iterator is empty, None is returned.
773 ///
774 /// # Examples
775 ///
776 /// ```
777 /// use orx_parallel::*;
778 ///
779 /// let a = vec![-3_i32, 0, 1, 5, -10];
780 /// assert_eq!(*a.par().max_by_key(|x| x.abs()).unwrap(), -10);
781 /// ```
782 fn max_by_key<Key, GetKey>(self, key: GetKey) -> Option<Self::Item>
783 where
784 Key: Ord,
785 GetKey: Fn(&Self::Item) -> Key + Sync,
786 {
787 let reduce = |x, y| match key(&x).cmp(&key(&y)) {
788 Ordering::Greater | Ordering::Equal => x,
789 Ordering::Less => y,
790 };
791 self.reduce(reduce)
792 }
793
794 /// Returns the minimum element of an iterator.
795 ///
796 /// If the iterator is empty, None is returned.
797 ///
798 /// # Examples
799 ///
800 /// ```
801 /// use orx_parallel::*;
802 ///
803 /// let a = vec![1, 2, 3];
804 /// let b: Vec<u32> = Vec::new();
805 ///
806 /// assert_eq!(a.par().min(), Some(&1));
807 /// assert_eq!(b.par().min(), None);
808 /// ```
809 fn min(self) -> Option<Self::Item>
810 where
811 Self::Item: Ord,
812 {
813 self.reduce(Ord::min)
814 }
815
816 /// Returns the element that gives the minimum value with respect to the specified `compare` function.
817 ///
818 /// If the iterator is empty, None is returned.
819 ///
820 /// # Examples
821 ///
822 /// ```
823 /// use orx_parallel::*;
824 ///
825 /// let a = vec![-3_i32, 0, 1, 5, -10];
826 /// assert_eq!(*a.par().min_by(|x, y| x.cmp(y)).unwrap(), -10);
827 /// ```
828 fn min_by<Compare>(self, compare: Compare) -> Option<Self::Item>
829 where
830 Compare: Fn(&Self::Item, &Self::Item) -> Ordering + Sync,
831 {
832 let reduce = |x, y| match compare(&x, &y) {
833 Ordering::Less | Ordering::Equal => x,
834 Ordering::Greater => y,
835 };
836 self.reduce(reduce)
837 }
838
839 /// Returns the element that gives the minimum value from the specified function.
840 ///
841 /// If the iterator is empty, None is returned.
842 ///
843 /// # Examples
844 ///
845 /// ```
846 /// use orx_parallel::*;
847 ///
848 /// let a = vec![-3_i32, 0, 1, 5, -10];
849 /// assert_eq!(*a.par().min_by_key(|x| x.abs()).unwrap(), 0);
850 /// ```
851 fn min_by_key<Key, GetKey>(self, get_key: GetKey) -> Option<Self::Item>
852 where
853 Key: Ord,
854 GetKey: Fn(&Self::Item) -> Key + Sync,
855 {
856 let reduce = |x, y| match get_key(&x).cmp(&get_key(&y)) {
857 Ordering::Less | Ordering::Equal => x,
858 Ordering::Greater => y,
859 };
860 self.reduce(reduce)
861 }
862
863 /// Sums the elements of an iterator.
864 ///
865 /// Takes each element, adds them together, and returns the result.
866 ///
867 /// An empty iterator returns the additive identity (“zero”) of the type, which is 0 for integers and -0.0 for floats.
868 ///
869 /// `sum` can be used to sum any type implementing [`Sum<Out>`].
870 ///
871 /// [`Sum<Out>`]: crate::Sum
872 ///
873 /// # Examples
874 ///
875 /// ```
876 /// use orx_parallel::*;
877 ///
878 /// let a = vec![1, 2, 3];
879 /// let sum: i32 = a.par().sum();
880 ///
881 /// assert_eq!(sum, 6);
882 /// ```
883 fn sum<Out>(self) -> Out
884 where
885 Self::Item: Sum<Out>,
886 Out: Send + Sync,
887 {
888 self.map(Self::Item::map)
889 .reduce(Self::Item::reduce)
890 .unwrap_or(Self::Item::zero())
891 }
892
893 // early exit
894
895 /// Returns the first (or any) element of the iterator; returns None if it is empty.
896 ///
897 /// * first element is returned if default iteration order `IterationOrder::Ordered` is used,
898 /// * any element is returned if `IterationOrder::Arbitrary` is set.
899 ///
900 /// # Examples
901 ///
902 /// The following example demonstrates the usage of first with default `Ordered` iteration.
903 /// This guarantees that the first element with respect to position in the input sequence
904 /// is returned.
905 ///
906 /// ```
907 /// use orx_parallel::*;
908 ///
909 /// let a: Vec<usize> = vec![];
910 /// assert_eq!(a.par().copied().first(), None);
911 ///
912 /// let a = vec![1, 2, 3];
913 /// assert_eq!(a.par().copied().first(), Some(1));
914 ///
915 /// let a = 1..10_000;
916 /// assert_eq!(a.par().filter(|x| x % 3421 == 0).first(), Some(3421));
917 /// assert_eq!(a.par().filter(|x| x % 12345 == 0).first(), None);
918 ///
919 /// // or equivalently,
920 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
921 /// ```
922 ///
923 /// When the order is set to `Arbitrary`, `first` might return any of the elements,
924 /// whichever is visited first depending on the parallel execution.
925 ///
926 /// ```
927 /// use orx_parallel::*;
928 ///
929 /// let a = 1..10_000;
930 ///
931 /// // might return either of 3421 or 2*3421
932 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).filter(|x| x % 3421 == 0).first().unwrap();
933 /// assert!([3421, 2 * 3421].contains(&any));
934 ///
935 /// // or equivalently,
936 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
937 /// assert!([3421, 2 * 3421].contains(&any));
938 fn first(self) -> Option<Self::Item>;
939
940 /// Searches for an element of an iterator that satisfies a `predicate`.
941 ///
942 /// Depending on the set iteration order of the parallel iterator, returns
943 ///
944 /// * first element satisfying the `predicate` if default iteration order `IterationOrder::Ordered` is used,
945 /// * any element satisfying the `predicate` if `IterationOrder::Arbitrary` is set.
946 ///
947 /// `find` takes a closure that returns true or false.
948 /// It applies this closure to each element of the iterator,
949 /// and returns `Some(x)` where `x` is the first element that returns true.
950 /// If they all return false, it returns None.
951 ///
952 /// `find` is short-circuiting; in other words, it will stop processing as soon as the closure returns true.
953 ///
954 /// `par_iter.find(predicate)` can also be considered as a shorthand for `par_iter.filter(predicate).first()`.
955 ///
956 /// # Examples
957 ///
958 /// The following example demonstrates the usage of first with default `Ordered` iteration.
959 /// This guarantees that the first element with respect to position in the input sequence
960 /// is returned.
961 ///
962 /// ```
963 /// use orx_parallel::*;
964 ///
965 /// let a = 1..10_000;
966 /// assert_eq!(a.par().find(|x| x % 12345 == 0), None);
967 /// assert_eq!(a.par().find(|x| x % 3421 == 0), Some(3421));
968 /// ```
969 ///
970 /// When the order is set to `Arbitrary`, `find` might return any of the elements satisfying the predicate,
971 /// whichever is found first depending on the parallel execution.
972 ///
973 /// ```
974 /// use orx_parallel::*;
975 ///
976 /// let a = 1..10_000;
977 ///
978 /// // might return either of 3421 or 2*3421
979 /// let any = a.par().iteration_order(IterationOrder::Arbitrary).find(|x| x % 3421 == 0).unwrap();
980 /// assert!([3421, 2 * 3421].contains(&any));
981 /// ```
982 fn find<Predicate>(self, predicate: Predicate) -> Option<Self::Item>
983 where
984 Predicate: Fn(&Self::Item) -> bool + Send + Sync + Clone,
985 {
986 self.filter(predicate).first()
987 }
988}