Skip to main content

midi_toolkit/sequence/common/
merge_threaded.rs

1use super::threaded_buffer;
2
3pub trait MergableStreams {
4    type Item: Send + 'static;
5
6    fn merge_two(
7        iter1: impl Iterator<Item = Self::Item> + Send + 'static,
8        iter2: impl Iterator<Item = Self::Item> + Send + 'static,
9    ) -> impl Iterator<Item = Self::Item> + Send + 'static;
10
11    fn merge_array(
12        array: Vec<impl Iterator<Item = Self::Item> + Send + 'static>,
13    ) -> impl Iterator<Item = Self::Item> + Send + 'static;
14}
15
16pub fn grouped_multithreaded_merge<T: MergableStreams>(
17    mut array: Vec<impl Iterator<Item = T::Item> + Send + 'static>,
18) -> impl Iterator<Item = T::Item> {
19    {
20        let buffer_size = 1 << 20;
21        if array.is_empty() {
22            return threaded_buffer(std::iter::empty(), 1);
23        }
24        if array.len() == 1 {
25            return threaded_buffer(array.remove(0), buffer_size);
26        }
27
28        let depth = 2;
29
30        let count = 1 << depth;
31
32        let mut iterator_groups = Vec::new();
33
34        for _ in 0..count {
35            iterator_groups.push(Vec::new());
36        }
37
38        for (i, iter) in array.into_iter().enumerate() {
39            let i = i % count;
40            iterator_groups[i].push(iter);
41        }
42
43        let mut iterator_groups = iterator_groups
44            .into_iter()
45            .map(|g| threaded_buffer(T::merge_array(g), buffer_size))
46            .collect::<Vec<_>>();
47
48        let mut new_groups = Vec::new();
49        while iterator_groups.len() > 1 {
50            while !iterator_groups.is_empty() {
51                if iterator_groups.len() >= 2 {
52                    let merge = T::merge_two(iterator_groups.remove(0), iterator_groups.remove(0));
53                    new_groups.push(threaded_buffer(merge, buffer_size));
54                } else {
55                    new_groups.push(iterator_groups.remove(0));
56                }
57            }
58            iterator_groups = new_groups;
59            new_groups = Vec::new();
60        }
61
62        threaded_buffer(iterator_groups.remove(0), buffer_size)
63    }
64}