1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use ChunkPuller;
/// Flattened version of a [`ChunkPuller`] which conveniently implements [`Iterator`].
///
/// Similar to the regular chunk puller, a flattened chunk puller is created from and
/// linked to and pulls its elements from a [`ConcurrentIter`].
///
/// It can be created by calling the [`flattened`] method on a chunk puller that is
/// created by the [`chunk_puller`] method of a concurrent iterator.
///
/// [`ChunkPuller`]: crate::ChunkPuller
/// [`ConcurrentIter`]: crate::ConcurrentIter
/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
/// [`flattened`]: crate::ChunkPuller::flattened
///
/// # Examples
///
/// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
/// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
/// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
/// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
/// puller implements Iterator.
///
/// In the following code, the sums are computed by 8 threads while each thread pulls elements in
/// chunks of 64.
///
/// ```
/// use orx_concurrent_iter::*;
///
/// fn parallel_reduce<T, F>(
/// num_threads: usize,
/// chunk: usize,
/// con_iter: impl ConcurrentIter<Item = T>,
/// reduce: F,
/// ) -> Option<T>
/// where
/// T: Send,
/// F: Fn(T, T) -> T + Sync,
/// {
/// std::thread::scope(|s| {
/// (0..num_threads)
/// .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
/// .filter_map(|x| x.join().unwrap()) // join threads
/// .reduce(&reduce) // reduce thread results to final result
/// })
/// }
///
/// let sum = parallel_reduce(8, 64, (0..0).into_con_iter(), |a, b| a + b);
/// assert_eq!(sum, None);
///
/// let n = 10_000;
/// let data: Vec<_> = (0..n).collect();
/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
/// assert_eq!(sum, Some(n * (n - 1) / 2));
/// ```
///
/// [`reduce`]: Iterator::reduce
/// [`ItemPuller`]: crate::ItemPuller