midi_toolkit/sequence/common/
merge_threaded.rs1use crate::pipe;
2
3use super::{threaded_buffer, to_vec};
4
5pub trait MergableStreams {
6 type Item: Send + 'static;
7
8 fn merge_two(
9 iter1: impl Iterator<Item = Self::Item> + Send + 'static,
10 iter2: impl Iterator<Item = Self::Item> + Send + 'static,
11 ) -> impl Iterator<Item = Self::Item> + Send + 'static;
12
13 fn merge_array(
14 array: Vec<impl Iterator<Item = Self::Item> + Send + 'static>,
15 ) -> impl Iterator<Item = Self::Item> + Send + 'static;
16}
17
18pub fn grouped_multithreaded_merge<T: MergableStreams>(
19 mut array: Vec<impl Iterator<Item = T::Item> + Send + 'static>,
20) -> impl Iterator<Item = T::Item> {
21 {
22 let buffer_size = 1 << 20;
23 if array.is_empty() {
24 return threaded_buffer(std::iter::empty(), 1);
25 }
26 if array.len() == 1 {
27 return threaded_buffer(array.remove(0), buffer_size);
28 }
29
30 let depth = 2;
31
32 let count = 1 << depth;
33
34 let mut iterator_groups = Vec::new();
35
36 for _ in 0..count {
37 iterator_groups.push(Vec::new());
38 }
39
40 for (i, iter) in array.into_iter().enumerate() {
41 let i = i % count;
42 iterator_groups[i].push(iter);
43 }
44
45 let mut iterator_groups = pipe!(
46 iterator_groups.into_iter()
47 .map(|g| pipe!(
48 g
49 |>T::merge_array()
50 |>threaded_buffer(buffer_size)
51 ))
52 |>to_vec()
53 );
54
55 let mut new_groups = Vec::new();
56 while iterator_groups.len() > 1 {
57 while !iterator_groups.is_empty() {
58 if iterator_groups.len() >= 2 {
59 let merge = T::merge_two(iterator_groups.remove(0), iterator_groups.remove(0));
60 new_groups.push(threaded_buffer(merge, buffer_size));
61 } else {
62 new_groups.push(iterator_groups.remove(0));
63 }
64 }
65 iterator_groups = new_groups;
66 new_groups = Vec::new();
67 }
68
69 threaded_buffer(iterator_groups.remove(0), buffer_size)
70 }
71}