orx_concurrent_iter/pullers/chunk_puller.rs
1use crate::pullers::{FlattenedChunkPuller, FlattenedEnumeratedChunkPuller};
2
3/// A chunk puller which is created from and linked to and pulls its elements
4/// from a [`ConcurrentIter`].
5///
6/// It can be created using the [`chunk_puller`] method of a concurrent iterator
7/// by providing a desired chunk size.
8///
9/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
10///
11/// Unlike the [`ItemPuller`], a chunk puller pulls many items at once:
12///
13/// * Its [`pull`] method pulls a chunk from the concurrent iterator, where:
14/// * the pulled chunk implements [`ExactSizeIterator`],
15/// * it often has `chunk_size` elements as long as there are sufficient
16/// items; less items will be pulled only when the concurrent iterator
17/// runs out of elements,
18/// * it has at least 1 element, as `pull` returns None if there are no
19/// items left.
20///
21/// Three points are important:
22///
23/// * Items in each pulled chunk are guaranteed to be sequential in the data
24/// source.
25/// * Pulling elements in chunks rather than one-by-one as by the `ItemPuller` is
26/// an optimization technique which aims to reduce the overhead of updating the
27/// atomic state of the concurrent iterator. This optimization is relevant for
28/// cases where the work done on the pulled elements are considerably small.
29/// * Pulling multiple elements or a chunk does not mean the elements are copied
30/// and stored elsewhere. It actually means reserving multiple elements at once
31/// for the pulling thread.
32///
33/// [`ItemPuller`]: crate::ItemPuller
34/// [`pull`]: crate::ChunkPuller::pull
35/// [`ConcurrentIter`]: crate::ConcurrentIter
36///
37/// # Examples
38///
39/// ```
40/// use orx_concurrent_iter::*;
41///
42/// let num_threads = 4;
43/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
44/// let con_iter = data.con_iter();
45///
46/// let process = |_x: &String| {};
47///
48/// std::thread::scope(|s| {
49/// for _ in 0..num_threads {
50/// s.spawn(|| {
51/// // concurrently iterate over values in a `while let` loop
52/// // while pulling (up to) 10 elements every time
53/// let mut chunk_puller = con_iter.chunk_puller(10);
54/// while let Some(chunk) = chunk_puller.pull() {
55/// // chunk is an ExactSizeIterator
56/// for value in chunk {
57/// process(value);
58/// }
59/// }
60/// });
61/// }
62/// });
63/// ```
64///
65/// The above code conveniently allows for the iteration-by-chunks optimization.
66/// However, you might have noticed that now we have a nested `while let` and `for` loops.
67/// In terms of convenience, we can do better than this without losing any performance.
68///
69/// This can be achieved using the [`flattened`] method of the chunk puller (see also
70/// [`flattened_with_idx`]).
71///
72/// [`flattened`]: crate::ChunkPuller::flattened
73/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
74///
75/// ```
76/// use orx_concurrent_iter::*;
77///
78/// let num_threads = 4;
79/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
80/// let con_iter = data.con_iter();
81///
82/// let process = |_x: &String| {};
83///
84/// std::thread::scope(|s| {
85/// for _ in 0..num_threads {
86/// s.spawn(|| {
87/// // concurrently iterate over values in a `for` loop
88/// // while concurrently pulling (up to) 10 elements every time
89/// for value in con_iter.chunk_puller(10).flattened() {
90/// process(value);
91/// }
92/// });
93/// }
94/// });
95/// ```
96///
97/// A bit of magic here, that requires to be explained below.
98///
99/// Notice that this is a very convenient way to concurrently iterate over the elements
100/// using a simple `for` loop. However, it is important to note that, under the hood, this is
101/// equivalent to the program in the previous section where we used the `pull` method of the
102/// chunk puller.
103///
104/// The following happens under the hood:
105///
106/// * We reach the concurrent iterator to pull 10 items at once from the data source.
107/// This is the intended performance optimization to reduce the updates of the atomic state.
108/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
109/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
110/// Provided that there are elements left, we pull another chunk of 10 items.
111/// * Then, we iterate one-by-one ...
112///
113/// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
114/// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
115/// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
116/// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
117/// puller implements Iterator.
118///
119/// In the following code, the sums are computed by 8 threads while each thread pulls elements in
120/// chunks of 64.
121///
122/// ```
123/// use orx_concurrent_iter::*;
124///
125/// fn parallel_reduce<T, F>(
126/// num_threads: usize,
127/// chunk: usize,
128/// con_iter: impl ConcurrentIter<Item = T>,
129/// reduce: F,
130/// ) -> Option<T>
131/// where
132/// T: Send + Sync,
133/// F: Fn(T, T) -> T + Send + Sync,
134/// {
135/// std::thread::scope(|s| {
136/// (0..num_threads)
137/// .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
138/// .filter_map(|x| x.join().unwrap()) // join threads
139/// .reduce(&reduce) // reduce thread results to final result
140/// })
141/// }
142///
143/// let sum = parallel_reduce(8, 64, (0..0).into_con_iter(), |a, b| a + b);
144/// assert_eq!(sum, None);
145///
146/// let n = 10_000;
147/// let data: Vec<_> = (0..n).collect();
148/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
149/// assert_eq!(sum, Some(n * (n - 1) / 2));
150/// ```
151///
152/// [`reduce`]: Iterator::reduce
153/// [`ItemPuller`]: crate::ItemPuller
154pub trait ChunkPuller {
155 /// Type of the element that the concurrent iterator yields.
156 type ChunkItem;
157
158 /// Type of the pulled chunks which implements [`ExactSizeIterator`].
159 type Chunk<'c>: ExactSizeIterator<Item = Self::ChunkItem> + Default
160 where
161 Self: 'c;
162
163 /// Target length of the pulled chunks.
164 ///
165 /// Note that the pulled chunk might have a length of:
166 ///
167 /// * `chunk_size` if there are at least `chunk_size` items in the concurrent
168 /// iterator when the `pull` method is called; or
169 /// * `n` items where `1 <= n < chunk_size` items if the concurrent iterator
170 /// still has items; however, fewer than `chunk_size`.
171 ///
172 /// Notice that the chunk cannot contain `0` elements; in which case the `pull`
173 /// method returns `None` which signals to complete the iteration. This design
174 /// choice allows to use the `while let Some(chunk) = chunk_puller.pull() { }`
175 /// loops.
176 fn chunk_size(&self) -> usize;
177
178 /// Pulls the next chunk from the connected concurrent iterator.
179 ///
180 /// The pulled chunk has a known length, and hence, implements [`ExactSizeIterator`].
181 /// It might have a length of:
182 ///
183 /// * `chunk_size` if there are at least `chunk_size` sequential items in the concurrent
184 /// iterator when the `pull` method is called; or
185 /// * `n` items where `1 <= n < chunk_size` sequential items if the concurrent iterator
186 /// still has items; however, fewer than `chunk_size`.
187 ///
188 /// Notice that the chunk cannot contain `0` elements; in which case the `pull`
189 /// method returns `None` which signals to complete the iteration. This design
190 /// choice allows to use the `while let Some(chunk) = chunk_puller.pull() { }`
191 /// loops.
192 fn pull(&mut self) -> Option<Self::Chunk<'_>>;
193
194 /// Pulls the next chunk from the connected concurrent iterator together with the index
195 /// of the first element of the chunk.
196 ///
197 /// Since all items in the pulled chunk are sequential, knowing the index of the first
198 /// item of the chunk provides the complete information of all indices.
199 ///
200 /// # Examples
201 ///
202 /// Performing an enumerated iteration while using chunks is demonstrated below.
203 ///
204 /// ```
205 /// use orx_concurrent_iter::*;
206 ///
207 /// let num_threads = 4;
208 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
209 /// let con_iter = data.con_iter();
210 ///
211 /// std::thread::scope(|s| {
212 /// for _ in 0..num_threads {
213 /// s.spawn(|| {
214 /// let mut chunk_puller = con_iter.chunk_puller(4);
215 /// while let Some((begin_idx, chunk)) = chunk_puller.pull_with_idx() {
216 /// for (i, value) in chunk.enumerate() { // i is the index within chunk
217 /// let idx = begin_idx + i; // idx is the index in the input collection
218 /// assert_eq!(value, &idx.to_string());
219 /// }
220 /// }
221 /// });
222 /// }
223 /// });
224 /// ```
225 fn pull_with_idx(&mut self) -> Option<(usize, Self::Chunk<'_>)>;
226
227 /// Converts the [`ChunkPuller`] into a [`FlattenedChunkPuller`] which is still connected to
228 /// and pulls its elements from the same concurrent iterator; while allowing for:
229 ///
230 /// * avoiding nested loops:
231 /// * `while let` loop to [`pull`] the chunk, and then
232 /// * iterate over the chunk;
233 /// * bringing Iterator methods to the concurrent programs since [`FlattenedChunkPuller`]
234 /// implements the regular [`Iterator`].
235 ///
236 /// [`FlattenedChunkPuller`]: crate::FlattenedChunkPuller
237 /// [`pull`]: crate::ChunkPuller::pull
238 ///
239 /// # Examples
240 ///
241 /// See the [`ItemPuller`] documentation for the notes on how the pullers bring the convenience of
242 /// Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the
243 /// parallelized [`reduce`]. We can add the iteration-by-chunks optimization on top of this while
244 /// keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk
245 /// puller implements Iterator.
246 ///
247 /// In the following code, the sums are computed by 8 threads while each thread pulls elements in
248 /// chunks of 64.
249 ///
250 /// ```
251 /// use orx_concurrent_iter::*;
252 ///
253 /// fn parallel_reduce<T, F>(
254 /// num_threads: usize,
255 /// chunk: usize,
256 /// con_iter: impl ConcurrentIter<Item = T>,
257 /// reduce: F,
258 /// ) -> Option<T>
259 /// where
260 /// T: Send + Sync,
261 /// F: Fn(T, T) -> T + Send + Sync,
262 /// {
263 /// std::thread::scope(|s| {
264 /// (0..num_threads)
265 /// .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
266 /// .filter_map(|x| x.join().unwrap()) // join threads
267 /// .reduce(&reduce) // reduce thread results to final result
268 /// })
269 /// }
270 ///
271 /// let sum = parallel_reduce(8, 64, (0..0).into_con_iter(), |a, b| a + b);
272 /// assert_eq!(sum, None);
273 ///
274 /// let n = 10_000;
275 /// let data: Vec<_> = (0..n).collect();
276 /// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
277 /// assert_eq!(sum, Some(n * (n - 1) / 2));
278 /// ```
279 ///
280 /// [`reduce`]: Iterator::reduce
281 /// [`ItemPuller`]: crate::ItemPuller
282 fn flattened<'c>(self) -> FlattenedChunkPuller<'c, Self>
283 where
284 Self: Sized,
285 {
286 self.into()
287 }
288
289 /// Converts the [`ChunkPuller`] into a [`FlattenedEnumeratedChunkPuller`] which is still connected to
290 /// and pulls its elements from the same concurrent iterator; while allowing for:
291 ///
292 /// * avoiding nested loops:
293 /// * `while let` loop to [`pull`] the chunk, and then
294 /// * iterate over the chunk;
295 /// * bringing Iterator methods to the concurrent programs since [`FlattenedEnumeratedChunkPuller`]
296 /// implements the regular [`Iterator`].
297 ///
298 /// Similar to [`flattened`] except that returned iterator additionally returns the indices of the
299 /// elements in the concurrent iterator.
300 ///
301 /// # Examples
302 ///
303 /// ```
304 /// use orx_concurrent_iter::*;
305 ///
306 /// let num_threads = 4;
307 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
308 /// let con_iter = data.con_iter();
309 ///
310 /// std::thread::scope(|s| {
311 /// for _ in 0..num_threads {
312 /// s.spawn(|| {
313 /// for (idx, value) in con_iter.chunk_puller(10).flattened_with_idx() {
314 /// assert_eq!(value, &idx.to_string());
315 /// }
316 /// });
317 /// }
318 /// });
319 /// ```
320 ///
321 /// [`FlattenedEnumeratedChunkPuller`]: crate::FlattenedEnumeratedChunkPuller
322 /// [`pull`]: crate::ChunkPuller::pull
323 /// [`flattened`]: crate::ChunkPuller::flattened
324 fn flattened_with_idx<'c>(self) -> FlattenedEnumeratedChunkPuller<'c, Self>
325 where
326 Self: Sized,
327 {
328 self.into()
329 }
330}