midi_toolkit/sequence/common/
threaded_buffer.rs

1use crossbeam_channel::{bounded, unbounded, IntoIter, Sender};
2use crate::gen_iter::GenIter;
3use std::{
4    collections::VecDeque,
5    sync::{Arc, RwLock},
6    thread,
7};
8
9use crate::sequence::common::to_vec;
10
11pub struct ThreadBufferIter<T> {
12    backward_tx: Sender<VecDeque<T>>,
13    bufs_iter: IntoIter<VecDeque<T>>,
14    current_buf: Option<VecDeque<T>>,
15}
16
17impl<T> Iterator for ThreadBufferIter<T> {
18    type Item = T;
19
20    fn next(&mut self) -> Option<T> {
21        loop {
22            if let Some(ref mut buf) = self.current_buf {
23                match buf.pop_front() {
24                    Some(item) => return Some(item),
25                    None => {
26                        self.backward_tx.send(self.current_buf.take().unwrap()).ok();
27                        self.current_buf = self.bufs_iter.next();
28                    }
29                }
30            } else {
31                return None;
32            }
33        }
34    }
35}
36
37pub fn threaded_buffer<T: 'static + Send, I: 'static + Iterator<Item = T> + Send>(
38    iter: I,
39    buffer_size: usize,
40) -> ThreadBufferIter<T> {
41    let (forward_tx, forward_rx) = unbounded::<VecDeque<T>>();
42    let (backward_tx, backward_rx) = unbounded::<VecDeque<T>>();
43    thread::spawn(move || {
44        let mut iter = iter;
45        let mut ended = false;
46        for mut vector in backward_rx.into_iter() {
47            if !ended {
48                for _ in 0..buffer_size {
49                    match iter.next() {
50                        Some(item) => vector.push_back(item),
51                        None => {
52                            ended = true;
53                            break;
54                        }
55                    }
56                }
57            }
58
59            if !vector.is_empty() {
60                forward_tx.send(vector).ok();
61            } else {
62                break;
63            }
64        }
65    });
66
67    for _ in 0..3 {
68        backward_tx.send(VecDeque::with_capacity(buffer_size)).ok();
69    }
70
71    let mut bufs_iter = forward_rx.into_iter();
72    let current_buf = bufs_iter.next();
73
74    ThreadBufferIter {
75        bufs_iter,
76        current_buf,
77        backward_tx,
78    }
79}
80
81pub fn channels_into_threadpool<
82    T: 'static + Send,
83    E: 'static + Send,
84    I: 'static + Iterator<Item = Result<T, E>> + Send + Sync,
85>(
86    iters: Vec<I>,
87    buffer_size: usize,
88) -> Vec<impl Iterator<Item = Result<T, E>>> {
89    let buffer_count = 3;
90
91    struct ReadCommand<T, E> {
92        vector: VecDeque<Result<T, E>>,
93        response_sender: Sender<VecDeque<Result<T, E>>>,
94        iter_id: usize,
95    }
96
97    let (request_queue_sender, request_queue_receiver) = unbounded();
98
99    let mut output_iters = Vec::new();
100
101    for iter_id in 0..iters.len() {
102        let (tx, rx) = bounded::<VecDeque<Result<T, E>>>(buffer_count);
103
104        let sender = request_queue_sender.clone();
105
106        for _ in 0..buffer_count {
107            sender
108                .send(ReadCommand {
109                    vector: VecDeque::with_capacity(buffer_size),
110                    response_sender: tx.clone(),
111                    iter_id,
112                })
113                .ok();
114        }
115
116        output_iters.push(GenIter(
117            #[coroutine]
118            move || {
119                for mut received in rx.into_iter() {
120                    if received.is_empty() {
121                        break;
122                    }
123
124                    while let Some(item) = received.pop_front() {
125                        yield item;
126                    }
127
128                    sender
129                        .send(ReadCommand {
130                            vector: received,
131                            response_sender: tx.clone(),
132                            iter_id,
133                        })
134                        .ok();
135                }
136            },
137        ));
138    }
139
140    thread::spawn(move || {
141        struct IterEnded<I> {
142            iter: I,
143            ended: bool,
144        }
145        let iters = to_vec(iters.into_iter().map(|i| {
146            Arc::new(RwLock::new(IterEnded {
147                iter: i,
148                ended: false,
149            }))
150        }));
151        for req in request_queue_receiver.into_iter() {
152            let iter = iters[req.iter_id].clone();
153            rayon::spawn_fifo(move || {
154                let mut iter = iter.write().unwrap();
155                let mut vector = req.vector;
156                if !iter.ended {
157                    for _ in 0..buffer_size {
158                        match iter.iter.next() {
159                            Some(Ok(item)) => vector.push_back(Ok(item)),
160                            Some(Err(error)) => {
161                                vector.push_back(Err(error));
162                                iter.ended = true;
163                                break;
164                            }
165                            None => {
166                                iter.ended = true;
167                                break;
168                            }
169                        }
170                    }
171                }
172
173                req.response_sender.send(vector).ok();
174            });
175        }
176    });
177
178    output_iters
179}