orx_concurrent_iter/pullers/
item_puller.rs

1use crate::concurrent_iter::ConcurrentIter;
2
3/// A regular [`Iterator`] which is created from and linked to and
4/// pulls its elements from a [`ConcurrentIter`].
5///
6/// It can be created using the [`item_puller`] method of a concurrent iterator.
7///
8/// [`item_puller`]: crate::ConcurrentIter::item_puller
9///
10/// # Examples
11///
12/// The definition might sound a bit confusing.
13///
14/// The following example demonstrates how it works:
15///
16/// * We have a concurrent iterator `con_iter` over elements "0", "1", ..., "99".
17/// * We spawn two threads, say A and B.
18/// * Each thread creates an `ItemPuller`, named as `puller`, from the same `con_iter`.
19/// * The following is one possible sequence of this parallel execution.
20///   * Thread A pulls "0" and calls process("0").
21///   * Thread B pulls "1" and calls process("1"); it completes processing before A.
22///   * Thread B pulls "2" and calls process("2").
23///   * Thread A pulls "3" and calls process("3").
24///   * and so on until all 100 elements are processed by the two threads.
25/// * Notice that there is only one data source `con_iter`;
26///   both of the `puller`s are connected to the same concurrent iterator, and
27///   each element is processed only once.
28///
29/// ```
30/// use orx_concurrent_iter::*;
31///
32/// let num_threads = 2;
33/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
34/// let con_iter = data.con_iter();
35///
36/// let process = |_x: &String| { /* assume actual work */ };
37///
38/// std::thread::scope(|s| {
39///     for _ in 0..num_threads {
40///         s.spawn(|| {
41///             // puller implements Iterator
42///             let puller = con_iter.item_puller();
43///             for value in puller {
44///                 process(value);
45///             }
46///         });
47///     }
48/// });
49/// ```
50///
51/// This approach brings the convenience of regular Iterators to the concurrent code.
52/// The example above already demonstrates that we can now use a regular `for` loop as we are
53/// writing a sequential code, while the program is parallelized.
54///
55/// Actually, we could've written an equivalent of the above program by directly using the
56/// concurrent iterator's [`next`] method and a `while let` loop:
57///
58/// ```ignore
59/// // ...
60/// std::thread::scope(|s| {
61///     for _ in 0..num_threads {
62///         s.spawn(|| {
63///             while let Some(value) = con_iter.next() {
64///                 process(value);
65///             }
66///         });
67///     }
68/// });
69/// ```
70///
71/// [`next`]: crate::ConcurrentIter::next
72///
73/// However, the convenience of the pullers goes beyond the `for` loops.
74/// All beautiful ergonomic Iterator methods become available in concurrent programs.
75///
76/// The following example demonstrate a very simple yet efficient implementation of the
77/// parallelized version of the [`reduce`] operation.
78///
79/// Notice that the entire implementation is nothing but a chain of Iterator methods.
80///
81/// [`reduce`]: Iterator::reduce
82///
83/// ```
84/// use orx_concurrent_iter::*;
85///
86/// fn parallel_reduce<T, F>(
87///     num_threads: usize,
88///     con_iter: impl ConcurrentIter<Item = T>,
89///     reduce: F,
90/// ) -> Option<T>
91/// where
92///     T: Send,
93///     F: Fn(T, T) -> T + Sync,
94/// {
95///     std::thread::scope(|s| {
96///         (0..num_threads)
97///             .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
98///             .filter_map(|x| x.join().unwrap()) // join threads
99///             .reduce(&reduce) // reduce thread results to final result
100///     })
101/// }
102///
103/// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
104/// assert_eq!(sum, None);
105///
106/// let n = 10_000;
107/// let data: Vec<_> = (0..n).collect();
108/// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
109/// assert_eq!(sum, Some(n * (n - 1) / 2));
110/// ```
111pub struct ItemPuller<'a, I>
112where
113    I: ConcurrentIter,
114{
115    con_iter: &'a I,
116}
117
118impl<'i, I> From<&'i I> for ItemPuller<'i, I>
119where
120    I: ConcurrentIter,
121{
122    fn from(con_iter: &'i I) -> Self {
123        Self { con_iter }
124    }
125}
126
127impl<I> Iterator for ItemPuller<'_, I>
128where
129    I: ConcurrentIter,
130{
131    type Item = I::Item;
132
133    #[inline(always)]
134    fn next(&mut self) -> Option<Self::Item> {
135        self.con_iter.next()
136    }
137}