midi_toolkit/sequence/common/
threaded_buffer.rs1use crossbeam_channel::{bounded, unbounded, IntoIter, Sender};
2use crate::gen_iter::GenIter;
3use std::{
4 collections::VecDeque,
5 sync::{Arc, RwLock},
6 thread,
7};
8
9use crate::sequence::common::to_vec;
10
11pub struct ThreadBufferIter<T> {
12 backward_tx: Sender<VecDeque<T>>,
13 bufs_iter: IntoIter<VecDeque<T>>,
14 current_buf: Option<VecDeque<T>>,
15}
16
17impl<T> Iterator for ThreadBufferIter<T> {
18 type Item = T;
19
20 fn next(&mut self) -> Option<T> {
21 loop {
22 if let Some(ref mut buf) = self.current_buf {
23 match buf.pop_front() {
24 Some(item) => return Some(item),
25 None => {
26 self.backward_tx.send(self.current_buf.take().unwrap()).ok();
27 self.current_buf = self.bufs_iter.next();
28 }
29 }
30 } else {
31 return None;
32 }
33 }
34 }
35}
36
37pub fn threaded_buffer<T: 'static + Send, I: 'static + Iterator<Item = T> + Send>(
38 iter: I,
39 buffer_size: usize,
40) -> ThreadBufferIter<T> {
41 let (forward_tx, forward_rx) = unbounded::<VecDeque<T>>();
42 let (backward_tx, backward_rx) = unbounded::<VecDeque<T>>();
43 thread::spawn(move || {
44 let mut iter = iter;
45 let mut ended = false;
46 for mut vector in backward_rx.into_iter() {
47 if !ended {
48 for _ in 0..buffer_size {
49 match iter.next() {
50 Some(item) => vector.push_back(item),
51 None => {
52 ended = true;
53 break;
54 }
55 }
56 }
57 }
58
59 if !vector.is_empty() {
60 forward_tx.send(vector).ok();
61 } else {
62 break;
63 }
64 }
65 });
66
67 for _ in 0..3 {
68 backward_tx.send(VecDeque::with_capacity(buffer_size)).ok();
69 }
70
71 let mut bufs_iter = forward_rx.into_iter();
72 let current_buf = bufs_iter.next();
73
74 ThreadBufferIter {
75 bufs_iter,
76 current_buf,
77 backward_tx,
78 }
79}
80
81pub fn channels_into_threadpool<
82 T: 'static + Send,
83 E: 'static + Send,
84 I: 'static + Iterator<Item = Result<T, E>> + Send + Sync,
85>(
86 iters: Vec<I>,
87 buffer_size: usize,
88) -> Vec<impl Iterator<Item = Result<T, E>>> {
89 let buffer_count = 3;
90
91 struct ReadCommand<T, E> {
92 vector: VecDeque<Result<T, E>>,
93 response_sender: Sender<VecDeque<Result<T, E>>>,
94 iter_id: usize,
95 }
96
97 let (request_queue_sender, request_queue_receiver) = unbounded();
98
99 let mut output_iters = Vec::new();
100
101 for iter_id in 0..iters.len() {
102 let (tx, rx) = bounded::<VecDeque<Result<T, E>>>(buffer_count);
103
104 let sender = request_queue_sender.clone();
105
106 for _ in 0..buffer_count {
107 sender
108 .send(ReadCommand {
109 vector: VecDeque::with_capacity(buffer_size),
110 response_sender: tx.clone(),
111 iter_id,
112 })
113 .ok();
114 }
115
116 output_iters.push(GenIter(
117 #[coroutine]
118 move || {
119 for mut received in rx.into_iter() {
120 if received.is_empty() {
121 break;
122 }
123
124 while let Some(item) = received.pop_front() {
125 yield item;
126 }
127
128 sender
129 .send(ReadCommand {
130 vector: received,
131 response_sender: tx.clone(),
132 iter_id,
133 })
134 .ok();
135 }
136 },
137 ));
138 }
139
140 thread::spawn(move || {
141 struct IterEnded<I> {
142 iter: I,
143 ended: bool,
144 }
145 let iters = to_vec(iters.into_iter().map(|i| {
146 Arc::new(RwLock::new(IterEnded {
147 iter: i,
148 ended: false,
149 }))
150 }));
151 for req in request_queue_receiver.into_iter() {
152 let iter = iters[req.iter_id].clone();
153 rayon::spawn_fifo(move || {
154 let mut iter = iter.write().unwrap();
155 let mut vector = req.vector;
156 if !iter.ended {
157 for _ in 0..buffer_size {
158 match iter.iter.next() {
159 Some(Ok(item)) => vector.push_back(Ok(item)),
160 Some(Err(error)) => {
161 vector.push_back(Err(error));
162 iter.ended = true;
163 break;
164 }
165 None => {
166 iter.ended = true;
167 break;
168 }
169 }
170 }
171 }
172
173 req.response_sender.send(vector).ok();
174 });
175 }
176 });
177
178 output_iters
179}