1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use crateConcurrentIter;
/// A regular [`Iterator`] which is created from and linked to and
/// pulls its elements from a [`ConcurrentIter`].
///
/// It can be created using the [`item_puller`] method of a concurrent iterator.
///
/// [`item_puller`]: crate::ConcurrentIter::item_puller
///
/// # Examples
///
/// The definition might sound a bit confusing.
///
/// The following example demonstrates how it works:
///
/// * We have a concurrent iterator `con_iter` over elements "0", "1", ..., "99".
/// * We spawn two threads, say A and B.
/// * Each thread creates an `ItemPuller`, named as `puller`, from the same `con_iter`.
/// * The following is one possible sequence of this parallel execution.
/// * Thread A pulls "0" and calls process("0").
/// * Thread B pulls "1" and calls process("1"); it completes processing before A.
/// * Thread B pulls "2" and calls process("2").
/// * Thread A pulls "3" and calls process("3").
/// * and so on until all 100 elements are processed by the two threads.
/// * Notice that there is only one data source `con_iter`;
/// both of the `puller`s are connected to the same concurrent iterator, and
/// each element is processed only once.
///
/// ```
/// use orx_concurrent_iter::*;
///
/// let num_threads = 2;
/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
/// let con_iter = data.con_iter();
///
/// let process = |_x: &String| { /* assume actual work */ };
///
/// std::thread::scope(|s| {
/// for _ in 0..num_threads {
/// s.spawn(|| {
/// // puller implements Iterator
/// let puller = con_iter.item_puller();
/// for value in puller {
/// process(value);
/// }
/// });
/// }
/// });
/// ```
///
/// This approach brings the convenience of regular Iterators to the concurrent code.
/// The example above already demonstrates that we can now use a regular `for` loop as we are
/// writing a sequential code, while the program is parallelized.
///
/// Actually, we could've written an equivalent of the above program by directly using the
/// concurrent iterator's [`next`] method and a `while let` loop:
///
/// ```ignore
/// // ...
/// std::thread::scope(|s| {
/// for _ in 0..num_threads {
/// s.spawn(|| {
/// while let Some(value) = con_iter.next() {
/// process(value);
/// }
/// });
/// }
/// });
/// ```
///
/// [`next`]: crate::ConcurrentIter::next
///
/// However, the convenience of the pullers goes beyond the `for` loops.
/// All beautiful ergonomic Iterator methods become available in concurrent programs.
///
/// The following example demonstrate a very simple yet efficient implementation of the
/// parallelized version of the [`reduce`] operation.
///
/// Notice that the entire implementation is nothing but a chain of Iterator methods.
///
/// [`reduce`]: Iterator::reduce
///
/// ```
/// use orx_concurrent_iter::*;
///
/// fn parallel_reduce<T, F>(
/// num_threads: usize,
/// con_iter: impl ConcurrentIter<Item = T>,
/// reduce: F,
/// ) -> Option<T>
/// where
/// T: Send,
/// F: Fn(T, T) -> T + Sync,
/// {
/// std::thread::scope(|s| {
/// (0..num_threads)
/// .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
/// .filter_map(|x| x.join().unwrap()) // join threads
/// .reduce(&reduce) // reduce thread results to final result
/// })
/// }
///
/// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
/// assert_eq!(sum, None);
///
/// let n = 10_000;
/// let data: Vec<_> = (0..n).collect();
/// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
/// assert_eq!(sum, Some(n * (n - 1) / 2));
/// ```