orx_parallel/iter/recursive/
into_par_rec_iter.rs

1use crate::{DefaultRunner, Params, computational_variants::Par};
2use orx_concurrent_recursive_iter::{ConcurrentRecursiveIter, Queue};
3
4// unknown size
5
6/// Trait to convert an iterator into a recursive parallel iterator together with the `extend` method.
7/// Recursive iterators are most useful for defining parallel computations over non-linear data structures
8/// such as trees or graphs.
9///
10/// Created parallel iterator is a regular parallel iterator; i.e., we have access to all [`ParIter`] features.
11///
12/// It is recursive due to the extension. The recursive parallel iterator will yield
13/// * all initial elements contained in this iterator,
14/// * all elements dynamically added to the queue with the `extend` method while processing the elements.
15///
16/// You may read more about the [`ConcurrentRecursiveIter`].
17///
18/// [`ParIter`]: crate::ParIter
19pub trait IntoParIterRec
20where
21    Self: IntoIterator,
22    Self::Item: Send,
23{
24    /// Converts this iterator into a recursive parallel iterator together with the `extend` method.
25    /// Recursive iterators are most useful for defining parallel computations over non-linear data structures
26    /// such as trees or graphs.
27    ///
28    /// Created parallel iterator is a regular parallel iterator; i.e., we have access to all [`ParIter`] features.
29    ///
30    /// It is recursive due to the extension. The recursive parallel iterator will yield
31    /// * all initial elements contained in this iterator,
32    /// * all elements dynamically added to the queue with the `extend` method while processing the elements.
33    ///
34    /// You may read more about the [`ConcurrentRecursiveIter`].
35    ///
36    /// The `extend` function defines the recursive expansion behavior. It takes two arguments:
37    /// * `element: &Self::Item` is the item being processed.
38    /// * `queue: Queue<Self::Item, P>` is the queue of remaining elements/tasks which exposes two methods:
39    ///   * `push(item)` allows us to add one item to the queue,
40    ///   * `extend(items)` allows us to add all of the items to the queue. Here `items` must have a known
41    ///     size (`ExactSizeIterator`).
42    ///
43    /// Adding children one-by-one with `push` or all together with `extend` might be the extreme options.
44    /// Actually, any intermediate approach is also possible. For instance, we can choose to `extend` in
45    /// chunks of say 50 tasks. If the item happens to create 140 children, we can handle this with four
46    /// `extend` calls.
47    ///
48    /// Using either of the methods might be beneficial for different use cases.
49    ///
50    /// Pushing children one by one makes the new task available for other threads as fast as possible. Further,
51    /// when we don't know the exact number of children ahead of time, and we don't want to use heap allocation
52    /// to store the children in a vec before adding them to the queue just to make it sized, we can add the
53    /// elements one-by-one with the `queue.push(item)` method. On the other hand, this approach will have more
54    /// parallelization overhead.
55    ///
56    /// When we extending children all at once using `queue.extend(items)`, we minimize the parallelization overhead
57    /// for adding tasks to the queue. On the other hand, the children will be available only when writing of all
58    /// children to the queue is complete which might cause idleness when tasks are scarce. Still, the recommendation
59    /// is to try to `extend` first whenever possible due to the following: (i) if we extend with a lot of children,
60    /// the tasks will not be scarce; (ii) and if we extend with only a few of items, the delay of making the tasks
61    /// available for other threads will be short.
62    ///
63    /// The decision is use-case specific and best to benchmark for the specific input.
64    ///
65    /// This crate makes use of the [`ConcurrentRecursiveIter`] for this computation and provides three ways to execute
66    /// this computation in parallel.
67    ///
68    /// ## A. Recursive Iterator with Exact Length
69    ///
70    /// If we know, or if it is possible and sufficiently cheap to find out, the exact length of the iterator,
71    /// it is recommended to work with exact length recursive iterator. Note that the exact length of an
72    /// iterator is the total of all elements that will be created. This gives the parallel executor
73    /// opportunity to optimize the chunk sizes.
74    ///
75    /// We can use `initial_elements.into_par_rec_exact(extend, count)` to create the iterator with exact length.
76    ///
77    /// ## B. Recursive Iterator with Unknown Length
78    ///
79    /// If we cannot know or it is expensive to know the exact length of the iterator ahead of time, we can
80    /// still create a recursive parallel iterator. In these cases; however, it is recommended to provide
81    /// chunk size explicitly depending on the number of threads that will be used and any estimate on the exact
82    /// length.
83    ///
84    /// We can use `initial_elements.into_par_rec(extend)` to create the iterator without length information.
85    ///
86    /// ## C. Linearization
87    ///
88    /// Even with exact length, a recursive parallel iterator is much more dynamic than a flat parallel
89    /// iterator. This dynamic nature of shrinking and growing concurrently requires a greater parallelization
90    /// overhead. An alternative approach is to eagerly discover all tasks and then perform the parallel
91    /// computation over the flattened input of tasks using [`linearize`] transformation.
92    ///
93    /// We can use `initial_elements.into_par_rec(extend).linearize()` to create the flattened iterator.
94    ///
95    /// [`ParIter`]: crate::ParIter
96    /// [`ConcurrentRecursiveIter`]: orx_concurrent_recursive_iter::ConcurrentRecursiveIter
97    /// [`linearize`]: crate::computational_variants::Par::linearize
98    ///
99    /// ## Examples
100    ///
101    /// In the following example we perform some parallel computations over a tree.
102    /// It demonstrates that a "recursive parallel iterator" is just a parallel iterator with
103    /// access to all [`ParIter`] methods.
104    /// Once we create the recursive parallel iterator with the `extend` definition, we can use it as
105    /// a regular parallel iterator.
106    ///
107    /// Unfortunately, the example requires a long set up for completeness. Note that the relevant
108    /// code blocks begin after line `// parallel reduction`.
109    ///
110    /// ```
111    /// use orx_parallel::*;
112    /// use rand::{Rng, SeedableRng};
113    /// use rand_chacha::ChaCha8Rng;
114    /// use std::{collections::HashSet, ops::Range};
115    ///
116    /// pub struct Node<T> {
117    ///     pub idx: usize,
118    ///     pub data: T,
119    ///     pub children: Vec<Node<T>>,
120    /// }
121    ///
122    /// impl<T> Node<T> {
123    ///     fn create_node(out_edges: &[Vec<usize>], idx: usize, data: fn(usize) -> T) -> Node<T> {
124    ///         Node {
125    ///             idx,
126    ///             data: data(idx),
127    ///             children: out_edges[idx]
128    ///                 .iter()
129    ///                 .map(|child_idx| Self::create_node(out_edges, *child_idx, data))
130    ///                 .collect(),
131    ///         }
132    ///     }
133    ///
134    ///     pub fn new_tree(
135    ///         num_nodes: usize,
136    ///         degree: Range<usize>,
137    ///         data: fn(usize) -> T,
138    ///         rng: &mut impl Rng,
139    ///     ) -> Node<T> {
140    ///         assert!(num_nodes >= 2);
141    ///
142    ///         let mut leaves = vec![0];
143    ///         let mut remaining: Vec<_> = (1..num_nodes).collect();
144    ///         let mut edges = vec![];
145    ///         let mut out_edges = vec![vec![]; num_nodes];
146    ///
147    ///         while !remaining.is_empty() {
148    ///             let leaf_idx = rng.random_range(0..leaves.len());
149    ///             let leaf = leaves.remove(leaf_idx);
150    ///
151    ///             let degree = rng.random_range(degree.clone());
152    ///             match degree == 0 {
153    ///                 true => leaves.push(leaf),
154    ///                 false => {
155    ///                     let children_indices: HashSet<_> = (0..degree)
156    ///                         .map(|_| rng.random_range(0..remaining.len()))
157    ///                         .collect();
158    ///
159    ///                     let mut sorted: Vec<_> = children_indices.iter().copied().collect();
160    ///                     sorted.sort();
161    ///
162    ///                     edges.extend(children_indices.iter().map(|c| (leaf, remaining[*c])));
163    ///                     out_edges[leaf] = children_indices.iter().map(|c| remaining[*c]).collect();
164    ///                     leaves.extend(children_indices.iter().map(|c| remaining[*c]));
165    ///
166    ///                     for idx in sorted.into_iter().rev() {
167    ///                         remaining.remove(idx);
168    ///                     }
169    ///                 }
170    ///             }
171    ///         }
172    ///
173    ///         Self::create_node(&out_edges, 0, data)
174    ///     }
175    /// }
176    ///
177    /// let num_nodes = 1_000;
178    /// let out_degree = 0..100;
179    /// let mut rng = ChaCha8Rng::seed_from_u64(42);
180    /// let data = |idx: usize| idx.to_string();
181    /// let root = Node::new_tree(num_nodes, out_degree, data, &mut rng);
182    ///
183    /// let compute = |node: &Node<String>| node.data.parse::<u64>().unwrap();
184    ///
185    /// // parallel reduction
186    ///
187    /// fn extend<'a, T: Sync>(node: &&'a Node<T>, queue: &Queue<&'a Node<T>>) {
188    ///     queue.extend(&node.children);
189    /// }
190    ///
191    /// let sum = [&root].into_par_rec(extend).map(compute).sum();
192    /// assert_eq!(sum, 499500);
193    ///
194    /// // or any parallel computation such as map->filter->collect
195    ///
196    /// let result: Vec<_> = [&root]
197    ///     .into_par_rec(extend)
198    ///     .map(compute)
199    ///     .filter(|x| x.is_multiple_of(7))
200    ///     .collect();
201    /// assert_eq!(result.len(), 143);
202    ///
203    /// // or filter during extension
204    /// fn extend_filtered<'a>(node: &&'a Node<String>, queue: &Queue<&'a Node<String>>) {
205    ///     for child in &node.children {
206    ///         if child.idx != 42 {
207    ///             queue.push(child);
208    ///         }
209    ///     }
210    /// }
211    ///
212    /// let sum = [&root].into_par_rec(extend_filtered).map(compute).sum();
213    /// ```
214    fn into_par_rec<E>(
215        self,
216        extend: E,
217    ) -> Par<ConcurrentRecursiveIter<Self::Item, E>, DefaultRunner>
218    where
219        E: Fn(&Self::Item, &Queue<Self::Item>) + Sync;
220
221    /// Converts this iterator into a recursive parallel iterator together with the `extend` method.
222    /// Recursive iterators are most useful for defining parallel computations over non-linear data structures
223    /// such as trees or graphs.
224    ///
225    /// Created parallel iterator is a regular parallel iterator; i.e., we have access to all [`ParIter`] features.
226    ///
227    /// It is recursive due to the extension. The recursive parallel iterator will yield
228    /// * all initial elements contained in this iterator,
229    /// * all elements dynamically added to the queue with the `extend` method while processing the elements.
230    ///
231    /// You may read more about the [`ConcurrentRecursiveIter`].
232    ///
233    /// The `extend` function defines the recursive expansion behavior. It takes two arguments:
234    /// * `element: &Self::Item` is the item being processed.
235    /// * `queue: Queue<Self::Item, P>` is the queue of remaining elements/tasks which exposes two methods:
236    ///   * `push(item)` allows us to add one item to the queue,
237    ///   * `extend(items)` allows us to add all of the items to the queue. Here `items` must have a known
238    ///     size (`ExactSizeIterator`).
239    ///
240    /// Adding children one-by-one with `push` or all together with `extend` might be the extreme options.
241    /// Actually, any intermediate approach is also possible. For instance, we can choose to `extend` in
242    /// chunks of say 50 tasks. If the item happens to create 140 children, we can handle this with four
243    /// `extend` calls.
244    ///
245    /// Using either of the methods might be beneficial for different use cases.
246    ///
247    /// Pushing children one by one makes the new task available for other threads as fast as possible. Further,
248    /// when we don't know the exact number of children ahead of time, and we don't want to use heap allocation
249    /// to store the children in a vec before adding them to the queue just to make it sized, we can add the
250    /// elements one-by-one with the `queue.push(item)` method. On the other hand, this approach will have more
251    /// parallelization overhead.
252    ///
253    /// When we extending children all at once using `queue.extend(items)`, we minimize the parallelization overhead
254    /// for adding tasks to the queue. On the other hand, the children will be available only when writing of all
255    /// children to the queue is complete which might cause idleness when tasks are scarce. Still, the recommendation
256    /// is to try to `extend` first whenever possible due to the following: (i) if we extend with a lot of children,
257    /// the tasks will not be scarce; (ii) and if we extend with only a few of items, the delay of making the tasks
258    /// available for other threads will be short.
259    ///
260    /// The decision is use-case specific and best to benchmark for the specific input.
261    ///
262    /// This crate makes use of the [`ConcurrentRecursiveIter`] for this computation and provides three ways to execute
263    /// this computation in parallel.
264    ///
265    /// ## A. Recursive Iterator with Exact Length
266    ///
267    /// If we know, or if it is possible and sufficiently cheap to find out, the exact length of the iterator,
268    /// it is recommended to work with exact length recursive iterator. Note that the exact length of an
269    /// iterator is the total of all elements that will be created. This gives the parallel executor
270    /// opportunity to optimize the chunk sizes.
271    ///
272    /// We can use `initial_elements.into_par_rec_exact(extend, count)` to create the iterator with exact length.
273    ///
274    /// ## B. Recursive Iterator with Unknown Length
275    ///
276    /// If we cannot know or it is expensive to know the exact length of the iterator ahead of time, we can
277    /// still create a recursive parallel iterator. In these cases; however, it is recommended to provide
278    /// chunk size explicitly depending on the number of threads that will be used and any estimate on the exact
279    /// length.
280    ///
281    /// We can use `initial_elements.into_par_rec(extend)` to create the iterator without length information.
282    ///
283    /// ## C. Linearization
284    ///
285    /// Even with exact length, a recursive parallel iterator is much more dynamic than a flat parallel
286    /// iterator. This dynamic nature of shrinking and growing concurrently requires a greater parallelization
287    /// overhead. An alternative approach is to eagerly discover all tasks and then perform the parallel
288    /// computation over the flattened input of tasks using [`linearize`] transformation.
289    ///
290    /// We can use `initial_elements.into_par_rec(extend).linearize()` to create the flattened iterator.
291    ///
292    /// [`ParIter`]: crate::ParIter
293    /// [`ConcurrentRecursiveIter`]: orx_concurrent_recursive_iter::ConcurrentRecursiveIter
294    /// [`linearize`]: crate::computational_variants::Par::linearize
295    ///
296    /// ## Examples
297    ///
298    /// In the following example we perform some parallel computations over a tree.
299    /// It demonstrates that a "recursive parallel iterator" is just a parallel iterator with
300    /// access to all [`ParIter`] methods.
301    /// Once we create the recursive parallel iterator with the `extend` definition, we can use it as
302    /// a regular parallel iterator.
303    ///
304    /// Unfortunately, the example requires a long set up for completeness. Note that the relevant
305    /// code blocks begin after line `// parallel reduction`.
306    ///
307    /// ```
308    /// use orx_parallel::*;
309    /// use rand::{Rng, SeedableRng};
310    /// use rand_chacha::ChaCha8Rng;
311    /// use std::{collections::HashSet, ops::Range};
312    ///
313    /// pub struct Node<T> {
314    ///     pub idx: usize,
315    ///     pub data: T,
316    ///     pub children: Vec<Node<T>>,
317    /// }
318    ///
319    /// impl<T> Node<T> {
320    ///     fn create_node(out_edges: &[Vec<usize>], idx: usize, data: fn(usize) -> T) -> Node<T> {
321    ///         Node {
322    ///             idx,
323    ///             data: data(idx),
324    ///             children: out_edges[idx]
325    ///                 .iter()
326    ///                 .map(|child_idx| Self::create_node(out_edges, *child_idx, data))
327    ///                 .collect(),
328    ///         }
329    ///     }
330    ///
331    ///     pub fn new_tree(
332    ///         num_nodes: usize,
333    ///         degree: Range<usize>,
334    ///         data: fn(usize) -> T,
335    ///         rng: &mut impl Rng,
336    ///     ) -> Node<T> {
337    ///         assert!(num_nodes >= 2);
338    ///
339    ///         let mut leaves = vec![0];
340    ///         let mut remaining: Vec<_> = (1..num_nodes).collect();
341    ///         let mut edges = vec![];
342    ///         let mut out_edges = vec![vec![]; num_nodes];
343    ///
344    ///         while !remaining.is_empty() {
345    ///             let leaf_idx = rng.random_range(0..leaves.len());
346    ///             let leaf = leaves.remove(leaf_idx);
347    ///
348    ///             let degree = rng.random_range(degree.clone());
349    ///             match degree == 0 {
350    ///                 true => leaves.push(leaf),
351    ///                 false => {
352    ///                     let children_indices: HashSet<_> = (0..degree)
353    ///                         .map(|_| rng.random_range(0..remaining.len()))
354    ///                         .collect();
355    ///
356    ///                     let mut sorted: Vec<_> = children_indices.iter().copied().collect();
357    ///                     sorted.sort();
358    ///
359    ///                     edges.extend(children_indices.iter().map(|c| (leaf, remaining[*c])));
360    ///                     out_edges[leaf] = children_indices.iter().map(|c| remaining[*c]).collect();
361    ///                     leaves.extend(children_indices.iter().map(|c| remaining[*c]));
362    ///
363    ///                     for idx in sorted.into_iter().rev() {
364    ///                         remaining.remove(idx);
365    ///                     }
366    ///                 }
367    ///             }
368    ///         }
369    ///
370    ///         Self::create_node(&out_edges, 0, data)
371    ///     }
372    /// }
373    ///
374    /// let num_nodes = 1_000;
375    /// let out_degree = 0..100;
376    /// let mut rng = ChaCha8Rng::seed_from_u64(42);
377    /// let data = |idx: usize| idx.to_string();
378    /// let root = Node::new_tree(num_nodes, out_degree, data, &mut rng);
379    ///
380    /// let compute = |node: &Node<String>| node.data.parse::<u64>().unwrap();
381    ///
382    /// // parallel reduction
383    ///
384    /// fn extend<'a, T: Sync>(node: &&'a Node<T>, queue: &Queue<&'a Node<T>>) {
385    ///     queue.extend(&node.children);
386    /// }
387    ///
388    /// let sum = [&root].into_par_rec(extend).map(compute).sum();
389    /// assert_eq!(sum, 499500);
390    ///
391    /// // or any parallel computation such as map->filter->collect
392    ///
393    /// let result: Vec<_> = [&root]
394    ///     .into_par_rec(extend)
395    ///     .map(compute)
396    ///     .filter(|x| x.is_multiple_of(7))
397    ///     .collect();
398    /// assert_eq!(result.len(), 143);
399    ///
400    /// // or filter during extension
401    /// fn extend_filtered<'a>(node: &&'a Node<String>, queue: &Queue<&'a Node<String>>) {
402    ///     for child in &node.children {
403    ///         if child.idx != 42 {
404    ///             queue.push(child);
405    ///         }
406    ///     }
407    /// }
408    ///
409    /// let sum = [&root].into_par_rec(extend_filtered).map(compute).sum();
410    /// ```
411    fn into_par_rec_exact<E>(
412        self,
413        extend: E,
414        exact_len: usize,
415    ) -> Par<ConcurrentRecursiveIter<Self::Item, E>, DefaultRunner>
416    where
417        E: Fn(&Self::Item, &Queue<Self::Item>) + Sync;
418}
419
420impl<X> IntoParIterRec for X
421where
422    X: IntoIterator,
423    X::Item: Send,
424{
425    fn into_par_rec<E>(
426        self,
427        extend: E,
428    ) -> Par<ConcurrentRecursiveIter<Self::Item, E>, DefaultRunner>
429    where
430        E: Fn(&Self::Item, &Queue<Self::Item>) + Sync,
431    {
432        let con_rec_iter = ConcurrentRecursiveIter::new(self, extend);
433        Par::new(DefaultRunner::default(), Params::default(), con_rec_iter)
434    }
435
436    fn into_par_rec_exact<E>(
437        self,
438        extend: E,
439        exact_len: usize,
440    ) -> Par<ConcurrentRecursiveIter<Self::Item, E>, DefaultRunner>
441    where
442        E: Fn(&Self::Item, &Queue<Self::Item>) + Sync,
443    {
444        let con_rec_iter = ConcurrentRecursiveIter::new_exact(self, extend, exact_len);
445        Par::new(DefaultRunner::default(), Params::default(), con_rec_iter)
446    }
447}