orx_concurrent_iter/pullers/chunk_puller.rs
1use crate::pullers::{FlattenedChunkPuller, FlattenedEnumeratedChunkPuller};
2
3/// A chunk puller which is created from and linked to and pulls its elements
4/// from a [`ConcurrentIter`].
5///
6/// It can be created using the [`chunk_puller`] method of a concurrent iterator
7/// by providing a desired chunk size.
8///
9/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
10///
11/// Unlike the [`ItemPuller`], a chunk puller pulls many items at once:
12///
13/// * Its [`pull`] method pulls a chunk from the concurrent iterator, where:
14///   * the pulled chunk implements [`ExactSizeIterator`],
15///   * it often has `chunk_size` elements as long as there are sufficient
16///     items; less items will be pulled only when the concurrent iterator
17///     runs out of elements,
18///   * it has at least 1 element, as `pull` returns None if there are no
19///     items left.
20///
21/// Three points are important:
22///
23/// * Items in each pulled chunk are guaranteed to be sequential in the data
24///   source.
25/// * Pulling elements in chunks rather than one-by-one as by the `ItemPuller` is
26///   an optimization technique which aims to reduce the overhead of updating the
27///   atomic state of the concurrent iterator. This optimization is relevant for
28///   cases where the work done on the pulled elements are considerably small.
29/// * Pulling multiple elements or a chunk does not mean the elements are copied
30///   and stored elsewhere. It actually means reserving multiple elements at once
31///   for the pulling thread.
32///
33/// [`ItemPuller`]: crate::ItemPuller
34/// [`pull`]: crate::ChunkPuller::pull
35/// [`ConcurrentIter`]: crate::ConcurrentIter
36///
37/// # Examples
38///
39/// ```
40/// use orx_concurrent_iter::*;
41///
42/// let num_threads = 4;
43/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
44/// let con_iter = data.con_iter();
45///
46/// let process = |_x: &String| {};
47///
48/// std::thread::scope(|s| {
49///     for _ in 0..num_threads {
50///         s.spawn(|| {
51///             // concurrently iterate over values in a `while let` loop
52///             // while pulling (up to) 10 elements every time
53///             let mut chunk_puller = con_iter.chunk_puller(10);
54///             while let Some(chunk) = chunk_puller.pull() {
55///                 // chunk is an ExactSizeIterator
56///                 for value in chunk {
57///                     process(value);
58///                 }
59///             }
60///         });
61///     }
62/// });
63/// ```
64///
65/// The above code conveniently allows for the iteration-by-chunks optimization.
66/// However, you might have noticed that now we have a nested `while let` and `for` loops.
67/// In terms of convenience, we can do better than this without losing any performance.
68///
69/// This can be achieved using the [`flattened`] method of the chunk puller (see also
70/// [`flattened_with_idx`]).
71///
72/// [`flattened`]: crate::ChunkPuller::flattened
73/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
74///
75/// ```
76/// use orx_concurrent_iter::*;
77///
78/// let num_threads = 4;
79/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
80/// let con_iter = data.con_iter();
81///
82/// let process = |_x: &String| {};
83///
84/// std::thread::scope(|s| {
85///     for _ in 0..num_threads {
86///         s.spawn(|| {
87///             // concurrently iterate over values in a `for` loop
88///             // while concurrently pulling (up to) 10 elements every time
89///             for value in con_iter.chunk_puller(10).flattened() {
90///                 process(value);
91///             }
92///         });
93///     }
94/// });
95/// ```
96///
97/// A bit of magic here, that requires to be explained below.
98///
99/// Notice that this is a very convenient way to concurrently iterate over the elements
100/// using a simple `for` loop. However, it is important to note that, under the hood, this is
101/// equivalent to the program in the previous section where we used the `pull` method of the
102/// chunk puller.
103///
104/// The following happens under the hood:
105///
106/// * We reach the concurrent iterator to pull 10 items at once from the data source.
107///   This is the intended performance optimization to reduce the updates of the atomic state.
108/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
109/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
110///   Provided that there are elements left, we pull another chunk of 10 items.
111/// * Then, we iterate one-by-one ...
112///
113/// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
114/// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
115/// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
116/// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
117/// puller implements Iterator.
118///
119/// In the following code, the sums are computed by 8 threads while each thread pulls elements in
120/// chunks of 64.
121///
122/// ```
123/// use orx_concurrent_iter::*;
124///
125/// fn parallel_reduce<T, F>(
126///     num_threads: usize,
127///     chunk: usize,
128///     con_iter: impl ConcurrentIter<Item = T>,
129///     reduce: F,
130/// ) -> Option<T>
131/// where
132///     T: Send + Sync,
133///     F: Fn(T, T) -> T + Send + Sync,
134/// {
135///     std::thread::scope(|s| {
136///         (0..num_threads)
137///             .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
138///             .filter_map(|x| x.join().unwrap()) // join threads
139///             .reduce(&reduce) // reduce thread results to final result
140///     })
141/// }
142///
143/// let sum = parallel_reduce(8, 64, (0..0).into_con_iter(), |a, b| a + b);
144/// assert_eq!(sum, None);
145///
146/// let n = 10_000;
147/// let data: Vec<_> = (0..n).collect();
148/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
149/// assert_eq!(sum, Some(n * (n - 1) / 2));
150/// ```
151///
152/// [`reduce`]: Iterator::reduce
153/// [`ItemPuller`]: crate::ItemPuller
154pub trait ChunkPuller {
155    /// Type of the element that the concurrent iterator yields.
156    type ChunkItem;
157
158    /// Type of the pulled chunks which implements [`ExactSizeIterator`].
159    type Chunk<'c>: ExactSizeIterator<Item = Self::ChunkItem> + Default
160    where
161        Self: 'c;
162
163    /// Target length of the pulled chunks.
164    ///
165    /// Note that the pulled chunk might have a length of:
166    ///
167    /// * `chunk_size` if there are at least `chunk_size` items in the concurrent
168    ///   iterator when the `pull` method is called; or
169    /// * `n` items where `1 <= n < chunk_size` items if the concurrent iterator
170    ///   still has items; however, fewer than `chunk_size`.
171    ///
172    /// Notice that the chunk cannot contain `0` elements; in which case the `pull`
173    /// method returns `None` which signals to complete the iteration. This design
174    /// choice allows to use the `while let Some(chunk) = chunk_puller.pull() { }`
175    /// loops.
176    fn chunk_size(&self) -> usize;
177
178    /// Pulls the next chunk from the connected concurrent iterator.
179    ///
180    /// The pulled chunk has a known length, and hence, implements [`ExactSizeIterator`].
181    /// It might have a length of:
182    ///
183    /// * `chunk_size` if there are at least `chunk_size` sequential items in the concurrent
184    ///   iterator when the `pull` method is called; or
185    /// * `n` items where `1 <= n < chunk_size` sequential items if the concurrent iterator
186    ///   still has items; however, fewer than `chunk_size`.
187    ///
188    /// Notice that the chunk cannot contain `0` elements; in which case the `pull`
189    /// method returns `None` which signals to complete the iteration. This design
190    /// choice allows to use the `while let Some(chunk) = chunk_puller.pull() { }`
191    /// loops.
192    fn pull(&mut self) -> Option<Self::Chunk<'_>>;
193
194    /// Pulls the next chunk from the connected concurrent iterator together with the index
195    /// of the first element of the chunk.
196    ///
197    /// Since all items in the pulled chunk are sequential, knowing the index of the first
198    /// item of the chunk provides the complete information of all indices.
199    ///
200    /// # Examples
201    ///
202    /// Performing an enumerated iteration while using chunks is demonstrated below.
203    ///
204    /// ```
205    /// use orx_concurrent_iter::*;
206    ///
207    /// let num_threads = 4;
208    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
209    /// let con_iter = data.con_iter();
210    ///
211    /// std::thread::scope(|s| {
212    ///     for _ in 0..num_threads {
213    ///         s.spawn(|| {
214    ///             let mut chunk_puller = con_iter.chunk_puller(4);
215    ///             while let Some((begin_idx, chunk)) = chunk_puller.pull_with_idx() {
216    ///                 for (i, value) in chunk.enumerate() { // i is the index within chunk
217    ///                     let idx = begin_idx + i; // idx is the index in the input collection
218    ///                     assert_eq!(value, &idx.to_string());
219    ///                 }
220    ///             }
221    ///         });
222    ///     }
223    /// });
224    /// ```
225    fn pull_with_idx(&mut self) -> Option<(usize, Self::Chunk<'_>)>;
226
227    /// Converts the [`ChunkPuller`] into a [`FlattenedChunkPuller`] which is still connected to
228    /// and pulls its elements from the same concurrent iterator; while allowing for:
229    ///
230    /// * avoiding nested loops:
231    ///   * `while let` loop to [`pull`] the chunk, and then
232    ///   * iterate over the chunk;
233    /// * bringing Iterator methods to the concurrent programs since [`FlattenedChunkPuller`]
234    ///   implements the regular [`Iterator`].
235    ///
236    /// [`FlattenedChunkPuller`]: crate::FlattenedChunkPuller
237    /// [`pull`]: crate::ChunkPuller::pull
238    ///
239    /// # Examples
240    ///
241    /// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
242    /// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
243    /// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
244    /// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
245    /// puller implements Iterator.
246    ///
247    /// In the following code, the sums are computed by 8 threads while each thread pulls elements in
248    /// chunks of 64.
249    ///
250    /// ```
251    /// use orx_concurrent_iter::*;
252    ///
253    /// fn parallel_reduce<T, F>(
254    ///     num_threads: usize,
255    ///     chunk: usize,
256    ///     con_iter: impl ConcurrentIter<Item = T>,
257    ///     reduce: F,
258    /// ) -> Option<T>
259    /// where
260    ///     T: Send + Sync,
261    ///     F: Fn(T, T) -> T + Send + Sync,
262    /// {
263    ///     std::thread::scope(|s| {
264    ///         (0..num_threads)
265    ///             .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
266    ///             .filter_map(|x| x.join().unwrap()) // join threads
267    ///             .reduce(&reduce) // reduce thread results to final result
268    ///     })
269    /// }
270    ///
271    /// let sum = parallel_reduce(8, 64, (0..0).into_con_iter(), |a, b| a + b);
272    /// assert_eq!(sum, None);
273    ///
274    /// let n = 10_000;
275    /// let data: Vec<_> = (0..n).collect();
276    /// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
277    /// assert_eq!(sum, Some(n * (n - 1) / 2));
278    /// ```
279    ///
280    /// [`reduce`]: Iterator::reduce
281    /// [`ItemPuller`]: crate::ItemPuller
282    fn flattened<'c>(self) -> FlattenedChunkPuller<'c, Self>
283    where
284        Self: Sized,
285    {
286        self.into()
287    }
288
289    /// Converts the [`ChunkPuller`] into a [`FlattenedEnumeratedChunkPuller`] which is still connected to
290    /// and pulls its elements from the same concurrent iterator; while allowing for:
291    ///
292    /// * avoiding nested loops:
293    ///   * `while let` loop to [`pull`] the chunk, and then
294    ///   * iterate over the chunk;
295    /// * bringing Iterator methods to the concurrent programs since [`FlattenedEnumeratedChunkPuller`]
296    ///   implements the regular [`Iterator`].
297    ///
298    /// Similar to [`flattened`] except that returned iterator additionally returns the indices of the
299    /// elements in the concurrent iterator.
300    ///
301    /// # Examples
302    ///
303    /// ```
304    /// use orx_concurrent_iter::*;
305    ///
306    /// let num_threads = 4;
307    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
308    /// let con_iter = data.con_iter();
309    ///
310    /// std::thread::scope(|s| {
311    ///     for _ in 0..num_threads {
312    ///         s.spawn(|| {
313    ///             for (idx, value) in con_iter.chunk_puller(10).flattened_with_idx() {
314    ///                 assert_eq!(value, &idx.to_string());
315    ///             }
316    ///         });
317    ///     }
318    /// });
319    /// ```
320    ///
321    /// [`FlattenedEnumeratedChunkPuller`]: crate::FlattenedEnumeratedChunkPuller
322    /// [`pull`]: crate::ChunkPuller::pull
323    /// [`flattened`]: crate::ChunkPuller::flattened
324    fn flattened_with_idx<'c>(self) -> FlattenedEnumeratedChunkPuller<'c, Self>
325    where
326        Self: Sized,
327    {
328        self.into()
329    }
330}