orx_concurrent_iter/pullers/
chunk_puller.rs

1use crate::pullers::{FlattenedChunkPuller, FlattenedEnumeratedChunkPuller};
2
3/// A chunk puller which is created from and linked to and pulls its elements
4/// from a [`ConcurrentIter`].
5///
6/// It can be created using the [`chunk_puller`] method of a concurrent iterator
7/// by providing a desired chunk size.
8///
9/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
10///
11/// Unlike the [`ItemPuller`], a chunk puller pulls many items at once:
12///
13/// * Its [`pull`] method pulls a chunk from the concurrent iterator, where:
14///   * the pulled chunk implements [`ExactSizeIterator`],
15///   * it often has `chunk_size` elements as long as there are sufficient
16///     items; less items will be pulled only when the concurrent iterator
17///     runs out of elements,
18///   * it has at least 1 element, as `pull` returns None if there are no
19///     items left.
20///
21/// Three points are important:
22///
23/// * Items in each pulled chunk are guaranteed to be sequential in the data
24///   source.
25/// * Pulling elements in chunks rather than one-by-one as by the `ItemPuller` is
26///   an optimization technique which aims to reduce the overhead of updating the
27///   atomic state of the concurrent iterator. This optimization is relevant for
28///   cases where the work done on the pulled elements are considerably small.
29/// * Pulling multiple elements or a chunk does not mean the elements are copied
30///   and stored elsewhere. It actually means reserving multiple elements at once
31///   for the pulling thread.
32///
33/// [`ItemPuller`]: crate::ItemPuller
34/// [`pull`]: crate::ChunkPuller::pull
35/// [`ConcurrentIter`]: crate::ConcurrentIter
36///
37/// # Examples
38///
39/// ```
40/// use orx_concurrent_iter::*;
41///
42/// let num_threads = 4;
43/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
44/// let con_iter = data.con_iter();
45///
46/// let process = |_x: &String| {};
47///
48/// std::thread::scope(|s| {
49///     for _ in 0..num_threads {
50///         s.spawn(|| {
51///             // concurrently iterate over values in a `while let` loop
52///             // while pulling (up to) 10 elements every time
53///             let mut chunk_puller = con_iter.chunk_puller(10);
54///             while let Some(chunk) = chunk_puller.pull() {
55///                 // chunk is an ExactSizeIterator
56///                 for value in chunk {
57///                     process(value);
58///                 }
59///             }
60///         });
61///     }
62/// });
63/// ```
64///
65/// The above code conveniently allows for the iteration-by-chunks optimization.
66/// However, you might have noticed that now we have a nested `while let` and `for` loops.
67/// In terms of convenience, we can do better than this without losing any performance.
68///
69/// This can be achieved using the [`flattened`] method of the chunk puller (see also
70/// [`flattened_with_idx`]).
71///
72/// [`flattened`]: crate::ChunkPuller::flattened
73/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
74///
75/// ```
76/// use orx_concurrent_iter::*;
77///
78/// let num_threads = 4;
79/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
80/// let con_iter = data.con_iter();
81///
82/// let process = |_x: &String| {};
83///
84/// std::thread::scope(|s| {
85///     for _ in 0..num_threads {
86///         s.spawn(|| {
87///             // concurrently iterate over values in a `for` loop
88///             // while concurrently pulling (up to) 10 elements every time
89///             for value in con_iter.chunk_puller(10).flattened() {
90///                 process(value);
91///             }
92///         });
93///     }
94/// });
95/// ```
96///
97/// A bit of magic here, that requires to be explained below.
98///
99/// Notice that this is a very convenient way to concurrently iterate over the elements
100/// using a simple `for` loop. However, it is important to note that, under the hood, this is
101/// equivalent to the program in the previous section where we used the `pull` method of the
102/// chunk puller.
103///
104/// The following happens under the hood:
105///
106/// * We reach the concurrent iterator to pull 10 items at once from the data source.
107///   This is the intended performance optimization to reduce the updates of the atomic state.
108/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
109/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
110///   Provided that there are elements left, we pull another chunk of 10 items.
111/// * Then, we iterate one-by-one ...
112///
113/// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
114/// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
115/// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
116/// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
117/// puller implements Iterator.
118///
119/// In the following code, the sums are computed by 8 threads while each thread pulls elements in
120/// chunks of 64.
121///
122/// ```
123/// use orx_concurrent_iter::*;
124///
125/// fn parallel_reduce<T, F>(
126///     num_threads: usize,
127///     chunk: usize,
128///     con_iter: impl ConcurrentIter<Item = T>,
129///     reduce: F,
130/// ) -> Option<T>
131/// where
132///     T: Send,
133///     F: Fn(T, T) -> T + Sync,
134/// {
135///     std::thread::scope(|s| {
136///         (0..num_threads)
137///             .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
138///             .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
139///             .reduce(&reduce) // reduce thread results to final result
140///     })
141/// }
142///
143/// let n = 10_000;
144/// let data: Vec<_> = (0..n).collect();
145/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
146/// assert_eq!(sum, Some(n * (n - 1) / 2));
147/// ```
148///
149/// [`reduce`]: Iterator::reduce
150/// [`ItemPuller`]: crate::ItemPuller
151pub trait ChunkPuller {
152    /// Type of the element that the concurrent iterator yields.
153    type ChunkItem;
154
155    /// Type of the pulled chunks which implements [`ExactSizeIterator`].
156    type Chunk<'c>: ExactSizeIterator<Item = Self::ChunkItem> + Default
157    where
158        Self: 'c;
159
160    /// Target length of the pulled chunks.
161    ///
162    /// Note that the pulled chunk might have a length of:
163    ///
164    /// * `chunk_size` if there are at least `chunk_size` items in the concurrent
165    ///   iterator when the `pull` method is called; or
166    /// * `n` items where `1 <= n < chunk_size` items if the concurrent iterator
167    ///   still has items; however, fewer than `chunk_size`.
168    ///
169    /// Notice that the chunk cannot contain `0` elements; in which case the `pull`
170    /// method returns `None` which signals to complete the iteration. This design
171    /// choice allows to use the `while let Some(chunk) = chunk_puller.pull() { }`
172    /// loops.
173    fn chunk_size(&self) -> usize;
174
175    /// Pulls the next chunk from the connected concurrent iterator.
176    ///
177    /// The pulled chunk has a known length, and hence, implements [`ExactSizeIterator`].
178    /// It might have a length of:
179    ///
180    /// * `chunk_size` if there are at least `chunk_size` sequential items in the concurrent
181    ///   iterator when the `pull` method is called; or
182    /// * `n` items where `1 <= n < chunk_size` sequential items if the concurrent iterator
183    ///   still has items; however, fewer than `chunk_size`.
184    ///
185    /// Notice that the chunk cannot contain `0` elements; in which case the `pull`
186    /// method returns `None` which signals to complete the iteration. This design
187    /// choice allows to use the `while let Some(chunk) = chunk_puller.pull() { }`
188    /// loops.
189    fn pull(&mut self) -> Option<Self::Chunk<'_>>;
190
191    /// Pulls the next chunk from the connected concurrent iterator together with the index
192    /// of the first element of the chunk.
193    ///
194    /// Since all items in the pulled chunk are sequential, knowing the index of the first
195    /// item of the chunk provides the complete information of all indices.
196    ///
197    /// # Examples
198    ///
199    /// Performing an enumerated iteration while using chunks is demonstrated below.
200    ///
201    /// ```
202    /// use orx_concurrent_iter::*;
203    ///
204    /// let num_threads = 4;
205    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
206    /// let con_iter = data.con_iter();
207    ///
208    /// std::thread::scope(|s| {
209    ///     for _ in 0..num_threads {
210    ///         s.spawn(|| {
211    ///             let mut chunk_puller = con_iter.chunk_puller(4);
212    ///             while let Some((begin_idx, chunk)) = chunk_puller.pull_with_idx() {
213    ///                 for (i, value) in chunk.enumerate() { // i is the index within chunk
214    ///                     let idx = begin_idx + i; // idx is the index in the input collection
215    ///                     assert_eq!(value, &idx.to_string());
216    ///                 }
217    ///             }
218    ///         });
219    ///     }
220    /// });
221    /// ```
222    fn pull_with_idx(&mut self) -> Option<(usize, Self::Chunk<'_>)>;
223
224    /// Converts the [`ChunkPuller`] into a [`FlattenedChunkPuller`] which is still connected to
225    /// and pulls its elements from the same concurrent iterator; while allowing for:
226    ///
227    /// * avoiding nested loops:
228    ///   * `while let` loop to [`pull`] the chunk, and then
229    ///   * iterate over the chunk;
230    /// * bringing Iterator methods to the concurrent programs since [`FlattenedChunkPuller`]
231    ///   implements the regular [`Iterator`].
232    ///
233    /// [`FlattenedChunkPuller`]: crate::FlattenedChunkPuller
234    /// [`pull`]: crate::ChunkPuller::pull
235    ///
236    /// # Examples
237    ///
238    /// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
239    /// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
240    /// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
241    /// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
242    /// puller implements Iterator.
243    ///
244    /// In the following code, the sums are computed by 8 threads while each thread pulls elements in
245    /// chunks of 64.
246    ///
247    /// ```
248    /// use orx_concurrent_iter::*;
249    ///
250    /// fn parallel_reduce<T, F>(
251    ///     num_threads: usize,
252    ///     chunk: usize,
253    ///     con_iter: impl ConcurrentIter<Item = T>,
254    ///     reduce: F,
255    /// ) -> Option<T>
256    /// where
257    ///     T: Send,
258    ///     F: Fn(T, T) -> T + Sync,
259    /// {
260    ///     std::thread::scope(|s| {
261    ///         (0..num_threads)
262    ///             .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
263    ///             .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
264    ///             .reduce(&reduce) // reduce thread results to final result
265    ///     })
266    /// }
267    ///
268    /// let n = 10_000;
269    /// let data: Vec<_> = (0..n).collect();
270    /// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
271    /// assert_eq!(sum, Some(n * (n - 1) / 2));
272    /// ```
273    ///
274    /// [`reduce`]: Iterator::reduce
275    /// [`ItemPuller`]: crate::ItemPuller
276    fn flattened<'c>(self) -> FlattenedChunkPuller<'c, Self>
277    where
278        Self: Sized,
279    {
280        self.into()
281    }
282
283    /// Converts the [`ChunkPuller`] into a [`FlattenedEnumeratedChunkPuller`] which is still connected to
284    /// and pulls its elements from the same concurrent iterator; while allowing for:
285    ///
286    /// * avoiding nested loops:
287    ///   * `while let` loop to [`pull`] the chunk, and then
288    ///   * iterate over the chunk;
289    /// * bringing Iterator methods to the concurrent programs since [`FlattenedEnumeratedChunkPuller`]
290    ///   implements the regular [`Iterator`].
291    ///
292    /// Similar to [`flattened`] except that returned iterator additionally returns the indices of the
293    /// elements in the concurrent iterator.
294    ///
295    /// # Examples
296    ///
297    /// ```
298    /// use orx_concurrent_iter::*;
299    ///
300    /// let num_threads = 4;
301    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
302    /// let con_iter = data.con_iter();
303    ///
304    /// std::thread::scope(|s| {
305    ///     for _ in 0..num_threads {
306    ///         s.spawn(|| {
307    ///             for (idx, value) in con_iter.chunk_puller(10).flattened_with_idx() {
308    ///                 assert_eq!(value, &idx.to_string());
309    ///             }
310    ///         });
311    ///     }
312    /// });
313    /// ```
314    ///
315    /// [`FlattenedEnumeratedChunkPuller`]: crate::FlattenedEnumeratedChunkPuller
316    /// [`pull`]: crate::ChunkPuller::pull
317    /// [`flattened`]: crate::ChunkPuller::flattened
318    fn flattened_with_idx<'c>(self) -> FlattenedEnumeratedChunkPuller<'c, Self>
319    where
320        Self: Sized,
321    {
322        self.into()
323    }
324}