midi_toolkit/sequence/common/
merge_threaded.rs1use super::threaded_buffer;
2
3pub trait MergableStreams {
4 type Item: Send + 'static;
5
6 fn merge_two(
7 iter1: impl Iterator<Item = Self::Item> + Send + 'static,
8 iter2: impl Iterator<Item = Self::Item> + Send + 'static,
9 ) -> impl Iterator<Item = Self::Item> + Send + 'static;
10
11 fn merge_array(
12 array: Vec<impl Iterator<Item = Self::Item> + Send + 'static>,
13 ) -> impl Iterator<Item = Self::Item> + Send + 'static;
14}
15
16pub fn grouped_multithreaded_merge<T: MergableStreams>(
17 mut array: Vec<impl Iterator<Item = T::Item> + Send + 'static>,
18) -> impl Iterator<Item = T::Item> {
19 {
20 let buffer_size = 1 << 20;
21 if array.is_empty() {
22 return threaded_buffer(std::iter::empty(), 1);
23 }
24 if array.len() == 1 {
25 return threaded_buffer(array.remove(0), buffer_size);
26 }
27
28 let depth = 2;
29
30 let count = 1 << depth;
31
32 let mut iterator_groups = Vec::new();
33
34 for _ in 0..count {
35 iterator_groups.push(Vec::new());
36 }
37
38 for (i, iter) in array.into_iter().enumerate() {
39 let i = i % count;
40 iterator_groups[i].push(iter);
41 }
42
43 let mut iterator_groups = iterator_groups
44 .into_iter()
45 .map(|g| threaded_buffer(T::merge_array(g), buffer_size))
46 .collect::<Vec<_>>();
47
48 let mut new_groups = Vec::new();
49 while iterator_groups.len() > 1 {
50 while !iterator_groups.is_empty() {
51 if iterator_groups.len() >= 2 {
52 let merge = T::merge_two(iterator_groups.remove(0), iterator_groups.remove(0));
53 new_groups.push(threaded_buffer(merge, buffer_size));
54 } else {
55 new_groups.push(iterator_groups.remove(0));
56 }
57 }
58 iterator_groups = new_groups;
59 new_groups = Vec::new();
60 }
61
62 threaded_buffer(iterator_groups.remove(0), buffer_size)
63 }
64}