midi_toolkit/sequence/common/
merge_threaded.rs

1use crate::pipe;
2
3use super::{threaded_buffer, to_vec};
4
5pub trait MergableStreams {
6    type Item: Send + 'static;
7
8    fn merge_two(
9        iter1: impl Iterator<Item = Self::Item> + Send + 'static,
10        iter2: impl Iterator<Item = Self::Item> + Send + 'static,
11    ) -> impl Iterator<Item = Self::Item> + Send + 'static;
12
13    fn merge_array(
14        array: Vec<impl Iterator<Item = Self::Item> + Send + 'static>,
15    ) -> impl Iterator<Item = Self::Item> + Send + 'static;
16}
17
18pub fn grouped_multithreaded_merge<T: MergableStreams>(
19    mut array: Vec<impl Iterator<Item = T::Item> + Send + 'static>,
20) -> impl Iterator<Item = T::Item> {
21    {
22        let buffer_size = 1 << 20;
23        if array.is_empty() {
24            return threaded_buffer(std::iter::empty(), 1);
25        }
26        if array.len() == 1 {
27            return threaded_buffer(array.remove(0), buffer_size);
28        }
29
30        let depth = 2;
31
32        let count = 1 << depth;
33
34        let mut iterator_groups = Vec::new();
35
36        for _ in 0..count {
37            iterator_groups.push(Vec::new());
38        }
39
40        for (i, iter) in array.into_iter().enumerate() {
41            let i = i % count;
42            iterator_groups[i].push(iter);
43        }
44
45        let mut iterator_groups = pipe!(
46            iterator_groups.into_iter()
47            .map(|g| pipe!(
48                g
49                |>T::merge_array()
50                |>threaded_buffer(buffer_size)
51            ))
52            |>to_vec()
53        );
54
55        let mut new_groups = Vec::new();
56        while iterator_groups.len() > 1 {
57            while !iterator_groups.is_empty() {
58                if iterator_groups.len() >= 2 {
59                    let merge = T::merge_two(iterator_groups.remove(0), iterator_groups.remove(0));
60                    new_groups.push(threaded_buffer(merge, buffer_size));
61                } else {
62                    new_groups.push(iterator_groups.remove(0));
63                }
64            }
65            iterator_groups = new_groups;
66            new_groups = Vec::new();
67        }
68
69        threaded_buffer(iterator_groups.remove(0), buffer_size)
70    }
71}