1use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::source::{Empty, Source, Zero};
8use crate::Sample;
9
10#[cfg(feature = "crossbeam-channel")]
11use crossbeam_channel::{unbounded as channel, Receiver, Sender};
12#[cfg(not(feature = "crossbeam-channel"))]
13use std::sync::mpsc::{channel, Receiver, Sender};
14
15pub fn queue<S>(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput<S>>, SourcesQueueOutput<S>)
27where
28 S: Sample + Send + 'static,
29{
30 let input = Arc::new(SourcesQueueInput {
31 next_sounds: Mutex::new(Vec::new()),
32 keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
33 });
34
35 let output = SourcesQueueOutput {
36 current: Box::new(Empty::<S>::new()) as Box<_>,
37 signal_after_end: None,
38 input: input.clone(),
39 };
40
41 (input, output)
42}
43
44pub struct SourcesQueueInput<S> {
48 next_sounds: Mutex<Vec<(Box<dyn Source<Item = S> + Send>, Option<Sender<()>>)>>,
49
50 keep_alive_if_empty: AtomicBool,
52}
53
54impl<S> SourcesQueueInput<S>
55where
56 S: Sample + Send + 'static,
57{
58 #[inline]
60 pub fn append<T>(&self, source: T)
61 where
62 T: Source<Item = S> + Send + 'static,
63 {
64 self.next_sounds
65 .lock()
66 .unwrap()
67 .push((Box::new(source) as Box<_>, None));
68 }
69
70 #[inline]
76 pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
77 where
78 T: Source<Item = S> + Send + 'static,
79 {
80 let (tx, rx) = channel();
81 self.next_sounds
82 .lock()
83 .unwrap()
84 .push((Box::new(source) as Box<_>, Some(tx)));
85 rx
86 }
87
88 pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
92 self.keep_alive_if_empty
93 .store(keep_alive_if_empty, Ordering::Release);
94 }
95
96 pub fn clear(&self) -> usize {
98 let mut sounds = self.next_sounds.lock().unwrap();
99 let len = sounds.len();
100 sounds.clear();
101 len
102 }
103}
104pub struct SourcesQueueOutput<S> {
106 current: Box<dyn Source<Item = S> + Send>,
108
109 signal_after_end: Option<Sender<()>>,
111
112 input: Arc<SourcesQueueInput<S>>,
114}
115
116const THRESHOLD: usize = 512;
117impl<S> Source for SourcesQueueOutput<S>
118where
119 S: Sample + Send + 'static,
120{
121 #[inline]
122 fn current_frame_len(&self) -> Option<usize> {
123 if let Some(val) = self.current.current_frame_len() {
136 if val != 0 {
137 return Some(val);
138 } else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
139 && self.input.next_sounds.lock().unwrap().is_empty()
140 {
141 return Some(THRESHOLD);
143 }
144 }
145
146 let (lower_bound, _) = self.current.size_hint();
148 if lower_bound > 0 {
151 return Some(lower_bound);
152 }
153
154 Some(THRESHOLD)
156 }
157
158 #[inline]
159 fn channels(&self) -> u16 {
160 self.current.channels()
161 }
162
163 #[inline]
164 fn sample_rate(&self) -> u32 {
165 self.current.sample_rate()
166 }
167
168 #[inline]
169 fn total_duration(&self) -> Option<Duration> {
170 None
171 }
172}
173
174impl<S> Iterator for SourcesQueueOutput<S>
175where
176 S: Sample + Send + 'static,
177{
178 type Item = S;
179
180 #[inline]
181 fn next(&mut self) -> Option<S> {
182 loop {
183 if let Some(sample) = self.current.next() {
185 return Some(sample);
186 }
187
188 if self.go_next().is_err() {
191 return None;
192 }
193 }
194 }
195
196 #[inline]
197 fn size_hint(&self) -> (usize, Option<usize>) {
198 (self.current.size_hint().0, None)
199 }
200}
201
202impl<S> SourcesQueueOutput<S>
203where
204 S: Sample + Send + 'static,
205{
206 fn go_next(&mut self) -> Result<(), ()> {
211 if let Some(signal_after_end) = self.signal_after_end.take() {
212 let _ = signal_after_end.send(());
213 }
214
215 let (next, signal_after_end) = {
216 let mut next = self.input.next_sounds.lock().unwrap();
217
218 if next.len() == 0 {
219 let silence = Box::new(Zero::<S>::new_samples(1, 44100, THRESHOLD)) as Box<_>;
220 if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
221 (silence, None)
223 } else {
224 return Err(());
225 }
226 } else {
227 next.remove(0)
228 }
229 };
230
231 self.current = next;
232 self.signal_after_end = signal_after_end;
233 Ok(())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use crate::buffer::SamplesBuffer;
240 use crate::queue;
241 use crate::source::Source;
242
243 #[test]
244 #[ignore] fn basic() {
246 let (tx, mut rx) = queue::queue(false);
247
248 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
249 tx.append(SamplesBuffer::new(2, 96000, vec![5i16, 5, 5, 5]));
250
251 assert_eq!(rx.channels(), 1);
252 assert_eq!(rx.sample_rate(), 48000);
253 assert_eq!(rx.next(), Some(10));
254 assert_eq!(rx.next(), Some(-10));
255 assert_eq!(rx.next(), Some(10));
256 assert_eq!(rx.next(), Some(-10));
257 assert_eq!(rx.channels(), 2);
258 assert_eq!(rx.sample_rate(), 96000);
259 assert_eq!(rx.next(), Some(5));
260 assert_eq!(rx.next(), Some(5));
261 assert_eq!(rx.next(), Some(5));
262 assert_eq!(rx.next(), Some(5));
263 assert_eq!(rx.next(), None);
264 }
265
266 #[test]
267 fn immediate_end() {
268 let (_, mut rx) = queue::queue::<i16>(false);
269 assert_eq!(rx.next(), None);
270 }
271
272 #[test]
273 fn keep_alive() {
274 let (tx, mut rx) = queue::queue(true);
275 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
276
277 assert_eq!(rx.next(), Some(10));
278 assert_eq!(rx.next(), Some(-10));
279 assert_eq!(rx.next(), Some(10));
280 assert_eq!(rx.next(), Some(-10));
281
282 for _ in 0..100000 {
283 assert_eq!(rx.next(), Some(0));
284 }
285 }
286
287 #[test]
288 #[ignore] fn no_delay_when_added() {
290 let (tx, mut rx) = queue::queue(true);
291
292 for _ in 0..500 {
293 assert_eq!(rx.next(), Some(0));
294 }
295
296 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
297 assert_eq!(rx.next(), Some(10));
298 assert_eq!(rx.next(), Some(-10));
299 assert_eq!(rx.next(), Some(10));
300 assert_eq!(rx.next(), Some(-10));
301 }
302}