orx_parallel/iter/recursive/into_par_rec_iter.rs
1use crate::{DefaultRunner, Params, computational_variants::Par};
2use orx_concurrent_recursive_iter::{ConcurrentRecursiveIter, Queue};
3
4// unknown size
5
6/// Trait to convert an iterator into a recursive parallel iterator together with the `extend` method.
7/// Recursive iterators are most useful for defining parallel computations over non-linear data structures
8/// such as trees or graphs.
9///
10/// Created parallel iterator is a regular parallel iterator; i.e., we have access to all [`ParIter`] features.
11///
12/// It is recursive due to the extension. The recursive parallel iterator will yield
13/// * all initial elements contained in this iterator,
14/// * all elements dynamically added to the queue with the `extend` method while processing the elements.
15///
16/// You may read more about the [`ConcurrentRecursiveIter`].
17///
18/// [`ParIter`]: crate::ParIter
19pub trait IntoParIterRec
20where
21 Self: IntoIterator,
22 Self::Item: Send,
23{
24 /// Converts this iterator into a recursive parallel iterator together with the `extend` method.
25 /// Recursive iterators are most useful for defining parallel computations over non-linear data structures
26 /// such as trees or graphs.
27 ///
28 /// Created parallel iterator is a regular parallel iterator; i.e., we have access to all [`ParIter`] features.
29 ///
30 /// It is recursive due to the extension. The recursive parallel iterator will yield
31 /// * all initial elements contained in this iterator,
32 /// * all elements dynamically added to the queue with the `extend` method while processing the elements.
33 ///
34 /// You may read more about the [`ConcurrentRecursiveIter`].
35 ///
36 /// The `extend` function defines the recursive expansion behavior. It takes two arguments:
37 /// * `element: &Self::Item` is the item being processed.
38 /// * `queue: Queue<Self::Item, P>` is the queue of remaining elements/tasks which exposes two methods:
39 /// * `push(item)` allows us to add one item to the queue,
40 /// * `extend(items)` allows us to add all of the items to the queue. Here `items` must have a known
41 /// size (`ExactSizeIterator`).
42 ///
43 /// Adding children one-by-one with `push` or all together with `extend` might be the extreme options.
44 /// Actually, any intermediate approach is also possible. For instance, we can choose to `extend` in
45 /// chunks of say 50 tasks. If the item happens to create 140 children, we can handle this with four
46 /// `extend` calls.
47 ///
48 /// Using either of the methods might be beneficial for different use cases.
49 ///
50 /// Pushing children one by one makes the new task available for other threads as fast as possible. Further,
51 /// when we don't know the exact number of children ahead of time, and we don't want to use heap allocation
52 /// to store the children in a vec before adding them to the queue just to make it sized, we can add the
53 /// elements one-by-one with the `queue.push(item)` method. On the other hand, this approach will have more
54 /// parallelization overhead.
55 ///
56 /// When we extending children all at once using `queue.extend(items)`, we minimize the parallelization overhead
57 /// for adding tasks to the queue. On the other hand, the children will be available only when writing of all
58 /// children to the queue is complete which might cause idleness when tasks are scarce. Still, the recommendation
59 /// is to try to `extend` first whenever possible due to the following: (i) if we extend with a lot of children,
60 /// the tasks will not be scarce; (ii) and if we extend with only a few of items, the delay of making the tasks
61 /// available for other threads will be short.
62 ///
63 /// The decision is use-case specific and best to benchmark for the specific input.
64 ///
65 /// This crate makes use of the [`ConcurrentRecursiveIter`] for this computation and provides three ways to execute
66 /// this computation in parallel.
67 ///
68 /// ## A. Recursive Iterator with Exact Length
69 ///
70 /// If we know, or if it is possible and sufficiently cheap to find out, the exact length of the iterator,
71 /// it is recommended to work with exact length recursive iterator. Note that the exact length of an
72 /// iterator is the total of all elements that will be created. This gives the parallel executor
73 /// opportunity to optimize the chunk sizes.
74 ///
75 /// We can use `initial_elements.into_par_rec_exact(extend, count)` to create the iterator with exact length.
76 ///
77 /// ## B. Recursive Iterator with Unknown Length
78 ///
79 /// If we cannot know or it is expensive to know the exact length of the iterator ahead of time, we can
80 /// still create a recursive parallel iterator. In these cases; however, it is recommended to provide
81 /// chunk size explicitly depending on the number of threads that will be used and any estimate on the exact
82 /// length.
83 ///
84 /// We can use `initial_elements.into_par_rec(extend)` to create the iterator without length information.
85 ///
86 /// ## C. Linearization
87 ///
88 /// Even with exact length, a recursive parallel iterator is much more dynamic than a flat parallel
89 /// iterator. This dynamic nature of shrinking and growing concurrently requires a greater parallelization
90 /// overhead. An alternative approach is to eagerly discover all tasks and then perform the parallel
91 /// computation over the flattened input of tasks using [`linearize`] transformation.
92 ///
93 /// We can use `initial_elements.into_par_rec(extend).linearize()` to create the flattened iterator.
94 ///
95 /// [`ParIter`]: crate::ParIter
96 /// [`ConcurrentRecursiveIter`]: orx_concurrent_recursive_iter::ConcurrentRecursiveIter
97 /// [`linearize`]: crate::computational_variants::Par::linearize
98 ///
99 /// ## Examples
100 ///
101 /// In the following example we perform some parallel computations over a tree.
102 /// It demonstrates that a "recursive parallel iterator" is just a parallel iterator with
103 /// access to all [`ParIter`] methods.
104 /// Once we create the recursive parallel iterator with the `extend` definition, we can use it as
105 /// a regular parallel iterator.
106 ///
107 /// Unfortunately, the example requires a long set up for completeness. Note that the relevant
108 /// code blocks begin after line `// parallel reduction`.
109 ///
110 /// ```
111 /// use orx_parallel::*;
112 /// use rand::{Rng, SeedableRng};
113 /// use rand_chacha::ChaCha8Rng;
114 /// use std::{collections::HashSet, ops::Range};
115 ///
116 /// pub struct Node<T> {
117 /// pub idx: usize,
118 /// pub data: T,
119 /// pub children: Vec<Node<T>>,
120 /// }
121 ///
122 /// impl<T> Node<T> {
123 /// fn create_node(out_edges: &[Vec<usize>], idx: usize, data: fn(usize) -> T) -> Node<T> {
124 /// Node {
125 /// idx,
126 /// data: data(idx),
127 /// children: out_edges[idx]
128 /// .iter()
129 /// .map(|child_idx| Self::create_node(out_edges, *child_idx, data))
130 /// .collect(),
131 /// }
132 /// }
133 ///
134 /// pub fn new_tree(
135 /// num_nodes: usize,
136 /// degree: Range<usize>,
137 /// data: fn(usize) -> T,
138 /// rng: &mut impl Rng,
139 /// ) -> Node<T> {
140 /// assert!(num_nodes >= 2);
141 ///
142 /// let mut leaves = vec![0];
143 /// let mut remaining: Vec<_> = (1..num_nodes).collect();
144 /// let mut edges = vec![];
145 /// let mut out_edges = vec![vec![]; num_nodes];
146 ///
147 /// while !remaining.is_empty() {
148 /// let leaf_idx = rng.random_range(0..leaves.len());
149 /// let leaf = leaves.remove(leaf_idx);
150 ///
151 /// let degree = rng.random_range(degree.clone());
152 /// match degree == 0 {
153 /// true => leaves.push(leaf),
154 /// false => {
155 /// let children_indices: HashSet<_> = (0..degree)
156 /// .map(|_| rng.random_range(0..remaining.len()))
157 /// .collect();
158 ///
159 /// let mut sorted: Vec<_> = children_indices.iter().copied().collect();
160 /// sorted.sort();
161 ///
162 /// edges.extend(children_indices.iter().map(|c| (leaf, remaining[*c])));
163 /// out_edges[leaf] = children_indices.iter().map(|c| remaining[*c]).collect();
164 /// leaves.extend(children_indices.iter().map(|c| remaining[*c]));
165 ///
166 /// for idx in sorted.into_iter().rev() {
167 /// remaining.remove(idx);
168 /// }
169 /// }
170 /// }
171 /// }
172 ///
173 /// Self::create_node(&out_edges, 0, data)
174 /// }
175 /// }
176 ///
177 /// let num_nodes = 1_000;
178 /// let out_degree = 0..100;
179 /// let mut rng = ChaCha8Rng::seed_from_u64(42);
180 /// let data = |idx: usize| idx.to_string();
181 /// let root = Node::new_tree(num_nodes, out_degree, data, &mut rng);
182 ///
183 /// let compute = |node: &Node<String>| node.data.parse::<u64>().unwrap();
184 ///
185 /// // parallel reduction
186 ///
187 /// fn extend<'a, T: Sync>(node: &&'a Node<T>, queue: &Queue<&'a Node<T>>) {
188 /// queue.extend(&node.children);
189 /// }
190 ///
191 /// let sum = [&root].into_par_rec(extend).map(compute).sum();
192 /// assert_eq!(sum, 499500);
193 ///
194 /// // or any parallel computation such as map->filter->collect
195 ///
196 /// let result: Vec<_> = [&root]
197 /// .into_par_rec(extend)
198 /// .map(compute)
199 /// .filter(|x| x.is_multiple_of(7))
200 /// .collect();
201 /// assert_eq!(result.len(), 143);
202 ///
203 /// // or filter during extension
204 /// fn extend_filtered<'a>(node: &&'a Node<String>, queue: &Queue<&'a Node<String>>) {
205 /// for child in &node.children {
206 /// if child.idx != 42 {
207 /// queue.push(child);
208 /// }
209 /// }
210 /// }
211 ///
212 /// let sum = [&root].into_par_rec(extend_filtered).map(compute).sum();
213 /// ```
214 fn into_par_rec<E>(
215 self,
216 extend: E,
217 ) -> Par<ConcurrentRecursiveIter<Self::Item, E>, DefaultRunner>
218 where
219 E: Fn(&Self::Item, &Queue<Self::Item>) + Sync;
220
221 /// Converts this iterator into a recursive parallel iterator together with the `extend` method.
222 /// Recursive iterators are most useful for defining parallel computations over non-linear data structures
223 /// such as trees or graphs.
224 ///
225 /// Created parallel iterator is a regular parallel iterator; i.e., we have access to all [`ParIter`] features.
226 ///
227 /// It is recursive due to the extension. The recursive parallel iterator will yield
228 /// * all initial elements contained in this iterator,
229 /// * all elements dynamically added to the queue with the `extend` method while processing the elements.
230 ///
231 /// You may read more about the [`ConcurrentRecursiveIter`].
232 ///
233 /// The `extend` function defines the recursive expansion behavior. It takes two arguments:
234 /// * `element: &Self::Item` is the item being processed.
235 /// * `queue: Queue<Self::Item, P>` is the queue of remaining elements/tasks which exposes two methods:
236 /// * `push(item)` allows us to add one item to the queue,
237 /// * `extend(items)` allows us to add all of the items to the queue. Here `items` must have a known
238 /// size (`ExactSizeIterator`).
239 ///
240 /// Adding children one-by-one with `push` or all together with `extend` might be the extreme options.
241 /// Actually, any intermediate approach is also possible. For instance, we can choose to `extend` in
242 /// chunks of say 50 tasks. If the item happens to create 140 children, we can handle this with four
243 /// `extend` calls.
244 ///
245 /// Using either of the methods might be beneficial for different use cases.
246 ///
247 /// Pushing children one by one makes the new task available for other threads as fast as possible. Further,
248 /// when we don't know the exact number of children ahead of time, and we don't want to use heap allocation
249 /// to store the children in a vec before adding them to the queue just to make it sized, we can add the
250 /// elements one-by-one with the `queue.push(item)` method. On the other hand, this approach will have more
251 /// parallelization overhead.
252 ///
253 /// When we extending children all at once using `queue.extend(items)`, we minimize the parallelization overhead
254 /// for adding tasks to the queue. On the other hand, the children will be available only when writing of all
255 /// children to the queue is complete which might cause idleness when tasks are scarce. Still, the recommendation
256 /// is to try to `extend` first whenever possible due to the following: (i) if we extend with a lot of children,
257 /// the tasks will not be scarce; (ii) and if we extend with only a few of items, the delay of making the tasks
258 /// available for other threads will be short.
259 ///
260 /// The decision is use-case specific and best to benchmark for the specific input.
261 ///
262 /// This crate makes use of the [`ConcurrentRecursiveIter`] for this computation and provides three ways to execute
263 /// this computation in parallel.
264 ///
265 /// ## A. Recursive Iterator with Exact Length
266 ///
267 /// If we know, or if it is possible and sufficiently cheap to find out, the exact length of the iterator,
268 /// it is recommended to work with exact length recursive iterator. Note that the exact length of an
269 /// iterator is the total of all elements that will be created. This gives the parallel executor
270 /// opportunity to optimize the chunk sizes.
271 ///
272 /// We can use `initial_elements.into_par_rec_exact(extend, count)` to create the iterator with exact length.
273 ///
274 /// ## B. Recursive Iterator with Unknown Length
275 ///
276 /// If we cannot know or it is expensive to know the exact length of the iterator ahead of time, we can
277 /// still create a recursive parallel iterator. In these cases; however, it is recommended to provide
278 /// chunk size explicitly depending on the number of threads that will be used and any estimate on the exact
279 /// length.
280 ///
281 /// We can use `initial_elements.into_par_rec(extend)` to create the iterator without length information.
282 ///
283 /// ## C. Linearization
284 ///
285 /// Even with exact length, a recursive parallel iterator is much more dynamic than a flat parallel
286 /// iterator. This dynamic nature of shrinking and growing concurrently requires a greater parallelization
287 /// overhead. An alternative approach is to eagerly discover all tasks and then perform the parallel
288 /// computation over the flattened input of tasks using [`linearize`] transformation.
289 ///
290 /// We can use `initial_elements.into_par_rec(extend).linearize()` to create the flattened iterator.
291 ///
292 /// [`ParIter`]: crate::ParIter
293 /// [`ConcurrentRecursiveIter`]: orx_concurrent_recursive_iter::ConcurrentRecursiveIter
294 /// [`linearize`]: crate::computational_variants::Par::linearize
295 ///
296 /// ## Examples
297 ///
298 /// In the following example we perform some parallel computations over a tree.
299 /// It demonstrates that a "recursive parallel iterator" is just a parallel iterator with
300 /// access to all [`ParIter`] methods.
301 /// Once we create the recursive parallel iterator with the `extend` definition, we can use it as
302 /// a regular parallel iterator.
303 ///
304 /// Unfortunately, the example requires a long set up for completeness. Note that the relevant
305 /// code blocks begin after line `// parallel reduction`.
306 ///
307 /// ```
308 /// use orx_parallel::*;
309 /// use rand::{Rng, SeedableRng};
310 /// use rand_chacha::ChaCha8Rng;
311 /// use std::{collections::HashSet, ops::Range};
312 ///
313 /// pub struct Node<T> {
314 /// pub idx: usize,
315 /// pub data: T,
316 /// pub children: Vec<Node<T>>,
317 /// }
318 ///
319 /// impl<T> Node<T> {
320 /// fn create_node(out_edges: &[Vec<usize>], idx: usize, data: fn(usize) -> T) -> Node<T> {
321 /// Node {
322 /// idx,
323 /// data: data(idx),
324 /// children: out_edges[idx]
325 /// .iter()
326 /// .map(|child_idx| Self::create_node(out_edges, *child_idx, data))
327 /// .collect(),
328 /// }
329 /// }
330 ///
331 /// pub fn new_tree(
332 /// num_nodes: usize,
333 /// degree: Range<usize>,
334 /// data: fn(usize) -> T,
335 /// rng: &mut impl Rng,
336 /// ) -> Node<T> {
337 /// assert!(num_nodes >= 2);
338 ///
339 /// let mut leaves = vec![0];
340 /// let mut remaining: Vec<_> = (1..num_nodes).collect();
341 /// let mut edges = vec![];
342 /// let mut out_edges = vec![vec![]; num_nodes];
343 ///
344 /// while !remaining.is_empty() {
345 /// let leaf_idx = rng.random_range(0..leaves.len());
346 /// let leaf = leaves.remove(leaf_idx);
347 ///
348 /// let degree = rng.random_range(degree.clone());
349 /// match degree == 0 {
350 /// true => leaves.push(leaf),
351 /// false => {
352 /// let children_indices: HashSet<_> = (0..degree)
353 /// .map(|_| rng.random_range(0..remaining.len()))
354 /// .collect();
355 ///
356 /// let mut sorted: Vec<_> = children_indices.iter().copied().collect();
357 /// sorted.sort();
358 ///
359 /// edges.extend(children_indices.iter().map(|c| (leaf, remaining[*c])));
360 /// out_edges[leaf] = children_indices.iter().map(|c| remaining[*c]).collect();
361 /// leaves.extend(children_indices.iter().map(|c| remaining[*c]));
362 ///
363 /// for idx in sorted.into_iter().rev() {
364 /// remaining.remove(idx);
365 /// }
366 /// }
367 /// }
368 /// }
369 ///
370 /// Self::create_node(&out_edges, 0, data)
371 /// }
372 /// }
373 ///
374 /// let num_nodes = 1_000;
375 /// let out_degree = 0..100;
376 /// let mut rng = ChaCha8Rng::seed_from_u64(42);
377 /// let data = |idx: usize| idx.to_string();
378 /// let root = Node::new_tree(num_nodes, out_degree, data, &mut rng);
379 ///
380 /// let compute = |node: &Node<String>| node.data.parse::<u64>().unwrap();
381 ///
382 /// // parallel reduction
383 ///
384 /// fn extend<'a, T: Sync>(node: &&'a Node<T>, queue: &Queue<&'a Node<T>>) {
385 /// queue.extend(&node.children);
386 /// }
387 ///
388 /// let sum = [&root].into_par_rec(extend).map(compute).sum();
389 /// assert_eq!(sum, 499500);
390 ///
391 /// // or any parallel computation such as map->filter->collect
392 ///
393 /// let result: Vec<_> = [&root]
394 /// .into_par_rec(extend)
395 /// .map(compute)
396 /// .filter(|x| x.is_multiple_of(7))
397 /// .collect();
398 /// assert_eq!(result.len(), 143);
399 ///
400 /// // or filter during extension
401 /// fn extend_filtered<'a>(node: &&'a Node<String>, queue: &Queue<&'a Node<String>>) {
402 /// for child in &node.children {
403 /// if child.idx != 42 {
404 /// queue.push(child);
405 /// }
406 /// }
407 /// }
408 ///
409 /// let sum = [&root].into_par_rec(extend_filtered).map(compute).sum();
410 /// ```
411 fn into_par_rec_exact<E>(
412 self,
413 extend: E,
414 exact_len: usize,
415 ) -> Par<ConcurrentRecursiveIter<Self::Item, E>, DefaultRunner>
416 where
417 E: Fn(&Self::Item, &Queue<Self::Item>) + Sync;
418}
419
420impl<X> IntoParIterRec for X
421where
422 X: IntoIterator,
423 X::Item: Send,
424{
425 fn into_par_rec<E>(
426 self,
427 extend: E,
428 ) -> Par<ConcurrentRecursiveIter<Self::Item, E>, DefaultRunner>
429 where
430 E: Fn(&Self::Item, &Queue<Self::Item>) + Sync,
431 {
432 let con_rec_iter = ConcurrentRecursiveIter::new(self, extend);
433 Par::new(DefaultRunner::default(), Params::default(), con_rec_iter)
434 }
435
436 fn into_par_rec_exact<E>(
437 self,
438 extend: E,
439 exact_len: usize,
440 ) -> Par<ConcurrentRecursiveIter<Self::Item, E>, DefaultRunner>
441 where
442 E: Fn(&Self::Item, &Queue<Self::Item>) + Sync,
443 {
444 let con_rec_iter = ConcurrentRecursiveIter::new_exact(self, extend, exact_len);
445 Par::new(DefaultRunner::default(), Params::default(), con_rec_iter)
446 }
447}