orx_concurrent_iter/pullers/item_puller.rs
1use crate::concurrent_iter::ConcurrentIter;
2
3/// A regular [`Iterator`] which is created from and linked to and
4/// pulls its elements from a [`ConcurrentIter`].
5///
6/// It can be created using the [`item_puller`] method of a concurrent iterator.
7///
8/// [`item_puller`]: crate::ConcurrentIter::item_puller
9///
10/// # Examples
11///
12/// The definition might sound a bit confusing.
13///
14/// The following example demonstrates how it works:
15///
16/// * We have a concurrent iterator `con_iter` over elements "0", "1", ..., "99".
17/// * We spawn two threads, say A and B.
18/// * Each thread creates an `ItemPuller`, named as `puller`, from the same `con_iter`.
19/// * The following is one possible sequence of this parallel execution.
20/// * Thread A pulls "0" and calls process("0").
21/// * Thread B pulls "1" and calls process("1"); it completes processing before A.
22/// * Thread B pulls "2" and calls process("2").
23/// * Thread A pulls "3" and calls process("3").
24/// * and so on until all 100 elements are processed by the two threads.
25/// * Notice that there is only one data source `con_iter`;
26/// both of the `puller`s are connected to the same concurrent iterator, and
27/// each element is processed only once.
28///
29/// ```
30/// use orx_concurrent_iter::*;
31///
32/// let num_threads = 2;
33/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
34/// let con_iter = data.con_iter();
35///
36/// let process = |_x: &String| { /* assume actual work */ };
37///
38/// std::thread::scope(|s| {
39/// for _ in 0..num_threads {
40/// s.spawn(|| {
41/// // puller implements Iterator
42/// let puller = con_iter.item_puller();
43/// for value in puller {
44/// process(value);
45/// }
46/// });
47/// }
48/// });
49/// ```
50///
51/// This approach brings the convenience of regular Iterators to the concurrent code.
52/// The example above already demonstrates that we can now use a regular `for` loop as we are
53/// writing a sequential code, while the program is parallelized.
54///
55/// Actually, we could've written an equivalent of the above program by directly using the
56/// concurrent iterator's [`next`] method and a `while let` loop:
57///
58/// ```ignore
59/// // ...
60/// std::thread::scope(|s| {
61/// for _ in 0..num_threads {
62/// s.spawn(|| {
63/// while let Some(value) = con_iter.next() {
64/// process(value);
65/// }
66/// });
67/// }
68/// });
69/// ```
70///
71/// [`next`]: crate::ConcurrentIter::next
72///
73/// However, the convenience of the pullers goes beyond the `for` loops.
74/// All beautiful ergonomic Iterator methods become available in concurrent programs.
75///
76/// The following example demonstrate a very simple yet efficient implementation of the
77/// parallelized version of the [`reduce`] operation.
78///
79/// Notice that the entire implementation is nothing but a chain of Iterator methods.
80///
81/// [`reduce`]: Iterator::reduce
82///
83/// ```
84/// use orx_concurrent_iter::*;
85///
86/// fn parallel_reduce<T, F>(
87/// num_threads: usize,
88/// con_iter: impl ConcurrentIter<Item = T>,
89/// reduce: F,
90/// ) -> Option<T>
91/// where
92/// T: Send,
93/// F: Fn(T, T) -> T + Sync,
94/// {
95/// std::thread::scope(|s| {
96/// (0..num_threads)
97/// .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
98/// .filter_map(|x| x.join().unwrap()) // join threads
99/// .reduce(&reduce) // reduce thread results to final result
100/// })
101/// }
102///
103/// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
104/// assert_eq!(sum, None);
105///
106/// let n = 10_000;
107/// let data: Vec<_> = (0..n).collect();
108/// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
109/// assert_eq!(sum, Some(n * (n - 1) / 2));
110/// ```
111pub struct ItemPuller<'a, I>
112where
113 I: ConcurrentIter,
114{
115 con_iter: &'a I,
116}
117
118impl<'i, I> From<&'i I> for ItemPuller<'i, I>
119where
120 I: ConcurrentIter,
121{
122 fn from(con_iter: &'i I) -> Self {
123 Self { con_iter }
124 }
125}
126
127impl<I> Iterator for ItemPuller<'_, I>
128where
129 I: ConcurrentIter,
130{
131 type Item = I::Item;
132
133 #[inline(always)]
134 fn next(&mut self) -> Option<Self::Item> {
135 self.con_iter.next()
136 }
137}