orx_concurrent_iter/pullers/chunk_puller.rs
1use crate::pullers::{FlattenedChunkPuller, FlattenedEnumeratedChunkPuller};
2
3/// A chunk puller which is created from and linked to and pulls its elements
4/// from a [`ConcurrentIter`].
5///
6/// It can be created using the [`chunk_puller`] method of a concurrent iterator
7/// by providing a desired chunk size.
8///
9/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
10///
11/// Unlike the [`ItemPuller`], a chunk puller pulls many items at once:
12///
13/// * Its [`pull`] method pulls a chunk from the concurrent iterator, where:
14/// * the pulled chunk implements [`ExactSizeIterator`],
15/// * it often has `chunk_size` elements as long as there are sufficient
16/// items; less items will be pulled only when the concurrent iterator
17/// runs out of elements,
18/// * it has at least 1 element, as `pull` returns None if there are no
19/// items left.
20///
21/// Three points are important:
22///
23/// * Items in each pulled chunk are guaranteed to be sequential in the data
24/// source.
25/// * Pulling elements in chunks rather than one-by-one as by the `ItemPuller` is
26/// an optimization technique which aims to reduce the overhead of updating the
27/// atomic state of the concurrent iterator. This optimization is relevant for
28/// cases where the work done on the pulled elements are considerably small.
29/// * Pulling multiple elements or a chunk does not mean the elements are copied
30/// and stored elsewhere. It actually means reserving multiple elements at once
31/// for the pulling thread.
32///
33/// [`ItemPuller`]: crate::ItemPuller
34/// [`pull`]: crate::ChunkPuller::pull
35/// [`ConcurrentIter`]: crate::ConcurrentIter
36///
37/// # Examples
38///
39/// ```
40/// use orx_concurrent_iter::*;
41///
42/// let num_threads = 4;
43/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
44/// let con_iter = data.con_iter();
45///
46/// let process = |_x: &String| {};
47///
48/// std::thread::scope(|s| {
49/// for _ in 0..num_threads {
50/// s.spawn(|| {
51/// // concurrently iterate over values in a `while let` loop
52/// // while pulling (up to) 10 elements every time
53/// let mut chunk_puller = con_iter.chunk_puller(10);
54/// while let Some(chunk) = chunk_puller.pull() {
55/// // chunk is an ExactSizeIterator
56/// for value in chunk {
57/// process(value);
58/// }
59/// }
60/// });
61/// }
62/// });
63/// ```
64///
65/// The above code conveniently allows for the iteration-by-chunks optimization.
66/// However, you might have noticed that now we have a nested `while let` and `for` loops.
67/// In terms of convenience, we can do better than this without losing any performance.
68///
69/// This can be achieved using the [`flattened`] method of the chunk puller (see also
70/// [`flattened_with_idx`]).
71///
72/// [`flattened`]: crate::ChunkPuller::flattened
73/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
74///
75/// ```
76/// use orx_concurrent_iter::*;
77///
78/// let num_threads = 4;
79/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
80/// let con_iter = data.con_iter();
81///
82/// let process = |_x: &String| {};
83///
84/// std::thread::scope(|s| {
85/// for _ in 0..num_threads {
86/// s.spawn(|| {
87/// // concurrently iterate over values in a `for` loop
88/// // while concurrently pulling (up to) 10 elements every time
89/// for value in con_iter.chunk_puller(10).flattened() {
90/// process(value);
91/// }
92/// });
93/// }
94/// });
95/// ```
96///
97/// A bit of magic here, that requires to be explained below.
98///
99/// Notice that this is a very convenient way to concurrently iterate over the elements
100/// using a simple `for` loop. However, it is important to note that, under the hood, this is
101/// equivalent to the program in the previous section where we used the `pull` method of the
102/// chunk puller.
103///
104/// The following happens under the hood:
105///
106/// * We reach the concurrent iterator to pull 10 items at once from the data source.
107/// This is the intended performance optimization to reduce the updates of the atomic state.
108/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
109/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
110/// Provided that there are elements left, we pull another chunk of 10 items.
111/// * Then, we iterate one-by-one ...
112///
113/// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
114/// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
115/// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
116/// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
117/// puller implements Iterator.
118///
119/// In the following code, the sums are computed by 8 threads while each thread pulls elements in
120/// chunks of 64.
121///
122/// ```
123/// use orx_concurrent_iter::*;
124///
125/// fn parallel_reduce<T, F>(
126/// num_threads: usize,
127/// chunk: usize,
128/// con_iter: impl ConcurrentIter<Item = T>,
129/// reduce: F,
130/// ) -> Option<T>
131/// where
132/// T: Send,
133/// F: Fn(T, T) -> T + Sync,
134/// {
135/// std::thread::scope(|s| {
136/// (0..num_threads)
137/// .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
138/// .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
139/// .reduce(&reduce) // reduce thread results to final result
140/// })
141/// }
142///
143/// let n = 10_000;
144/// let data: Vec<_> = (0..n).collect();
145/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
146/// assert_eq!(sum, Some(n * (n - 1) / 2));
147/// ```
148///
149/// [`reduce`]: Iterator::reduce
150/// [`ItemPuller`]: crate::ItemPuller
151pub trait ChunkPuller {
152 /// Type of the element that the concurrent iterator yields.
153 type ChunkItem;
154
155 /// Type of the pulled chunks which implements [`ExactSizeIterator`].
156 type Chunk<'c>: ExactSizeIterator<Item = Self::ChunkItem> + Default
157 where
158 Self: 'c;
159
160 /// Target length of the pulled chunks.
161 ///
162 /// Note that the pulled chunk might have a length of:
163 ///
164 /// * `chunk_size` if there are at least `chunk_size` items in the concurrent
165 /// iterator when the `pull` method is called; or
166 /// * `n` items where `1 <= n < chunk_size` items if the concurrent iterator
167 /// still has items; however, fewer than `chunk_size`.
168 ///
169 /// Notice that the chunk cannot contain `0` elements; in which case the `pull`
170 /// method returns `None` which signals to complete the iteration. This design
171 /// choice allows to use the `while let Some(chunk) = chunk_puller.pull() { }`
172 /// loops.
173 fn chunk_size(&self) -> usize;
174
175 /// Pulls the next chunk from the connected concurrent iterator.
176 ///
177 /// The pulled chunk has a known length, and hence, implements [`ExactSizeIterator`].
178 /// It might have a length of:
179 ///
180 /// * `chunk_size` if there are at least `chunk_size` sequential items in the concurrent
181 /// iterator when the `pull` method is called; or
182 /// * `n` items where `1 <= n < chunk_size` sequential items if the concurrent iterator
183 /// still has items; however, fewer than `chunk_size`.
184 ///
185 /// Notice that the chunk cannot contain `0` elements; in which case the `pull`
186 /// method returns `None` which signals to complete the iteration. This design
187 /// choice allows to use the `while let Some(chunk) = chunk_puller.pull() { }`
188 /// loops.
189 fn pull(&mut self) -> Option<Self::Chunk<'_>>;
190
191 /// Pulls the next chunk from the connected concurrent iterator together with the index
192 /// of the first element of the chunk.
193 ///
194 /// Since all items in the pulled chunk are sequential, knowing the index of the first
195 /// item of the chunk provides the complete information of all indices.
196 ///
197 /// # Examples
198 ///
199 /// Performing an enumerated iteration while using chunks is demonstrated below.
200 ///
201 /// ```
202 /// use orx_concurrent_iter::*;
203 ///
204 /// let num_threads = 4;
205 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
206 /// let con_iter = data.con_iter();
207 ///
208 /// std::thread::scope(|s| {
209 /// for _ in 0..num_threads {
210 /// s.spawn(|| {
211 /// let mut chunk_puller = con_iter.chunk_puller(4);
212 /// while let Some((begin_idx, chunk)) = chunk_puller.pull_with_idx() {
213 /// for (i, value) in chunk.enumerate() { // i is the index within chunk
214 /// let idx = begin_idx + i; // idx is the index in the input collection
215 /// assert_eq!(value, &idx.to_string());
216 /// }
217 /// }
218 /// });
219 /// }
220 /// });
221 /// ```
222 fn pull_with_idx(&mut self) -> Option<(usize, Self::Chunk<'_>)>;
223
224 /// Converts the [`ChunkPuller`] into a [`FlattenedChunkPuller`] which is still connected to
225 /// and pulls its elements from the same concurrent iterator; while allowing for:
226 ///
227 /// * avoiding nested loops:
228 /// * `while let` loop to [`pull`] the chunk, and then
229 /// * iterate over the chunk;
230 /// * bringing Iterator methods to the concurrent programs since [`FlattenedChunkPuller`]
231 /// implements the regular [`Iterator`].
232 ///
233 /// [`FlattenedChunkPuller`]: crate::FlattenedChunkPuller
234 /// [`pull`]: crate::ChunkPuller::pull
235 ///
236 /// # Examples
237 ///
238 /// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
239 /// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
240 /// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
241 /// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
242 /// puller implements Iterator.
243 ///
244 /// In the following code, the sums are computed by 8 threads while each thread pulls elements in
245 /// chunks of 64.
246 ///
247 /// ```
248 /// use orx_concurrent_iter::*;
249 ///
250 /// fn parallel_reduce<T, F>(
251 /// num_threads: usize,
252 /// chunk: usize,
253 /// con_iter: impl ConcurrentIter<Item = T>,
254 /// reduce: F,
255 /// ) -> Option<T>
256 /// where
257 /// T: Send,
258 /// F: Fn(T, T) -> T + Sync,
259 /// {
260 /// std::thread::scope(|s| {
261 /// (0..num_threads)
262 /// .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
263 /// .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
264 /// .reduce(&reduce) // reduce thread results to final result
265 /// })
266 /// }
267 ///
268 /// let n = 10_000;
269 /// let data: Vec<_> = (0..n).collect();
270 /// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
271 /// assert_eq!(sum, Some(n * (n - 1) / 2));
272 /// ```
273 ///
274 /// [`reduce`]: Iterator::reduce
275 /// [`ItemPuller`]: crate::ItemPuller
276 fn flattened<'c>(self) -> FlattenedChunkPuller<'c, Self>
277 where
278 Self: Sized,
279 {
280 self.into()
281 }
282
283 /// Converts the [`ChunkPuller`] into a [`FlattenedEnumeratedChunkPuller`] which is still connected to
284 /// and pulls its elements from the same concurrent iterator; while allowing for:
285 ///
286 /// * avoiding nested loops:
287 /// * `while let` loop to [`pull`] the chunk, and then
288 /// * iterate over the chunk;
289 /// * bringing Iterator methods to the concurrent programs since [`FlattenedEnumeratedChunkPuller`]
290 /// implements the regular [`Iterator`].
291 ///
292 /// Similar to [`flattened`] except that returned iterator additionally returns the indices of the
293 /// elements in the concurrent iterator.
294 ///
295 /// # Examples
296 ///
297 /// ```
298 /// use orx_concurrent_iter::*;
299 ///
300 /// let num_threads = 4;
301 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
302 /// let con_iter = data.con_iter();
303 ///
304 /// std::thread::scope(|s| {
305 /// for _ in 0..num_threads {
306 /// s.spawn(|| {
307 /// for (idx, value) in con_iter.chunk_puller(10).flattened_with_idx() {
308 /// assert_eq!(value, &idx.to_string());
309 /// }
310 /// });
311 /// }
312 /// });
313 /// ```
314 ///
315 /// [`FlattenedEnumeratedChunkPuller`]: crate::FlattenedEnumeratedChunkPuller
316 /// [`pull`]: crate::ChunkPuller::pull
317 /// [`flattened`]: crate::ChunkPuller::flattened
318 fn flattened_with_idx<'c>(self) -> FlattenedEnumeratedChunkPuller<'c, Self>
319 where
320 Self: Sized,
321 {
322 self.into()
323 }
324}