geng_rodio/
queue.rs

1//! Queue that plays sounds one after the other.
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::source::{Empty, Source, Zero};
8use crate::Sample;
9
10#[cfg(feature = "crossbeam-channel")]
11use crossbeam_channel::{unbounded as channel, Receiver, Sender};
12#[cfg(not(feature = "crossbeam-channel"))]
13use std::sync::mpsc::{channel, Receiver, Sender};
14
15/// Builds a new queue. It consists of an input and an output.
16///
17/// The input can be used to add sounds to the end of the queue, while the output implements
18/// `Source` and plays the sounds.
19///
20/// The parameter indicates how the queue should behave if the queue becomes empty:
21///
22/// - If you pass `true`, then the queue is infinite and will play a silence instead until you add
23///   a new sound.
24/// - If you pass `false`, then the queue will report that it has finished playing.
25///
26pub fn queue<S>(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput<S>>, SourcesQueueOutput<S>)
27where
28    S: Sample + Send + 'static,
29{
30    let input = Arc::new(SourcesQueueInput {
31        next_sounds: Mutex::new(Vec::new()),
32        keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
33    });
34
35    let output = SourcesQueueOutput {
36        current: Box::new(Empty::<S>::new()) as Box<_>,
37        signal_after_end: None,
38        input: input.clone(),
39    };
40
41    (input, output)
42}
43
44// TODO: consider reimplementing this with `from_factory`
45
46/// The input of the queue.
47pub struct SourcesQueueInput<S> {
48    next_sounds: Mutex<Vec<(Box<dyn Source<Item = S> + Send>, Option<Sender<()>>)>>,
49
50    // See constructor.
51    keep_alive_if_empty: AtomicBool,
52}
53
54impl<S> SourcesQueueInput<S>
55where
56    S: Sample + Send + 'static,
57{
58    /// Adds a new source to the end of the queue.
59    #[inline]
60    pub fn append<T>(&self, source: T)
61    where
62        T: Source<Item = S> + Send + 'static,
63    {
64        self.next_sounds
65            .lock()
66            .unwrap()
67            .push((Box::new(source) as Box<_>, None));
68    }
69
70    /// Adds a new source to the end of the queue.
71    ///
72    /// The `Receiver` will be signalled when the sound has finished playing.
73    ///
74    /// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver` instead.
75    #[inline]
76    pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
77    where
78        T: Source<Item = S> + Send + 'static,
79    {
80        let (tx, rx) = channel();
81        self.next_sounds
82            .lock()
83            .unwrap()
84            .push((Box::new(source) as Box<_>, Some(tx)));
85        rx
86    }
87
88    /// Sets whether the queue stays alive if there's no more sound to play.
89    ///
90    /// See also the constructor.
91    pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
92        self.keep_alive_if_empty
93            .store(keep_alive_if_empty, Ordering::Release);
94    }
95
96    /// Removes all the sounds from the queue. Returns the number of sounds cleared.
97    pub fn clear(&self) -> usize {
98        let mut sounds = self.next_sounds.lock().unwrap();
99        let len = sounds.len();
100        sounds.clear();
101        len
102    }
103}
104/// The output of the queue. Implements `Source`.
105pub struct SourcesQueueOutput<S> {
106    // The current iterator that produces samples.
107    current: Box<dyn Source<Item = S> + Send>,
108
109    // Signal this sender before picking from `next`.
110    signal_after_end: Option<Sender<()>>,
111
112    // The next sounds.
113    input: Arc<SourcesQueueInput<S>>,
114}
115
116const THRESHOLD: usize = 512;
117impl<S> Source for SourcesQueueOutput<S>
118where
119    S: Sample + Send + 'static,
120{
121    #[inline]
122    fn current_frame_len(&self) -> Option<usize> {
123        // This function is non-trivial because the boundary between two sounds in the queue should
124        // be a frame boundary as well.
125        //
126        // The current sound is free to return `None` for `current_frame_len()`, in which case
127        // we *should* return the number of samples remaining the current sound.
128        // This can be estimated with `size_hint()`.
129        //
130        // If the `size_hint` is `None` as well, we are in the worst case scenario. To handle this
131        // situation we force a frame to have a maximum number of samples indicate by this
132        // constant.
133
134        // Try the current `current_frame_len`.
135        if let Some(val) = self.current.current_frame_len() {
136            if val != 0 {
137                return Some(val);
138            } else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
139                && self.input.next_sounds.lock().unwrap().is_empty()
140            {
141                // The next source will be a filler silence which will have the length of `THRESHOLD`
142                return Some(THRESHOLD);
143            }
144        }
145
146        // Try the size hint.
147        let (lower_bound, _) = self.current.size_hint();
148        // The iterator default implementation just returns 0.
149        // That's a problematic value, so skip it.
150        if lower_bound > 0 {
151            return Some(lower_bound);
152        }
153
154        // Otherwise we use the constant value.
155        Some(THRESHOLD)
156    }
157
158    #[inline]
159    fn channels(&self) -> u16 {
160        self.current.channels()
161    }
162
163    #[inline]
164    fn sample_rate(&self) -> u32 {
165        self.current.sample_rate()
166    }
167
168    #[inline]
169    fn total_duration(&self) -> Option<Duration> {
170        None
171    }
172}
173
174impl<S> Iterator for SourcesQueueOutput<S>
175where
176    S: Sample + Send + 'static,
177{
178    type Item = S;
179
180    #[inline]
181    fn next(&mut self) -> Option<S> {
182        loop {
183            // Basic situation that will happen most of the time.
184            if let Some(sample) = self.current.next() {
185                return Some(sample);
186            }
187
188            // Since `self.current` has finished, we need to pick the next sound.
189            // In order to avoid inlining this expensive operation, the code is in another function.
190            if self.go_next().is_err() {
191                return None;
192            }
193        }
194    }
195
196    #[inline]
197    fn size_hint(&self) -> (usize, Option<usize>) {
198        (self.current.size_hint().0, None)
199    }
200}
201
202impl<S> SourcesQueueOutput<S>
203where
204    S: Sample + Send + 'static,
205{
206    // Called when `current` is empty and we must jump to the next element.
207    // Returns `Ok` if the sound should continue playing, or an error if it should stop.
208    //
209    // This method is separate so that it is not inlined.
210    fn go_next(&mut self) -> Result<(), ()> {
211        if let Some(signal_after_end) = self.signal_after_end.take() {
212            let _ = signal_after_end.send(());
213        }
214
215        let (next, signal_after_end) = {
216            let mut next = self.input.next_sounds.lock().unwrap();
217
218            if next.len() == 0 {
219                let silence = Box::new(Zero::<S>::new_samples(1, 44100, THRESHOLD)) as Box<_>;
220                if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
221                    // Play a short silence in order to avoid spinlocking.
222                    (silence, None)
223                } else {
224                    return Err(());
225                }
226            } else {
227                next.remove(0)
228            }
229        };
230
231        self.current = next;
232        self.signal_after_end = signal_after_end;
233        Ok(())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use crate::buffer::SamplesBuffer;
240    use crate::queue;
241    use crate::source::Source;
242
243    #[test]
244    #[ignore] // FIXME: samples rate and channel not updated immediately after transition
245    fn basic() {
246        let (tx, mut rx) = queue::queue(false);
247
248        tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
249        tx.append(SamplesBuffer::new(2, 96000, vec![5i16, 5, 5, 5]));
250
251        assert_eq!(rx.channels(), 1);
252        assert_eq!(rx.sample_rate(), 48000);
253        assert_eq!(rx.next(), Some(10));
254        assert_eq!(rx.next(), Some(-10));
255        assert_eq!(rx.next(), Some(10));
256        assert_eq!(rx.next(), Some(-10));
257        assert_eq!(rx.channels(), 2);
258        assert_eq!(rx.sample_rate(), 96000);
259        assert_eq!(rx.next(), Some(5));
260        assert_eq!(rx.next(), Some(5));
261        assert_eq!(rx.next(), Some(5));
262        assert_eq!(rx.next(), Some(5));
263        assert_eq!(rx.next(), None);
264    }
265
266    #[test]
267    fn immediate_end() {
268        let (_, mut rx) = queue::queue::<i16>(false);
269        assert_eq!(rx.next(), None);
270    }
271
272    #[test]
273    fn keep_alive() {
274        let (tx, mut rx) = queue::queue(true);
275        tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
276
277        assert_eq!(rx.next(), Some(10));
278        assert_eq!(rx.next(), Some(-10));
279        assert_eq!(rx.next(), Some(10));
280        assert_eq!(rx.next(), Some(-10));
281
282        for _ in 0..100000 {
283            assert_eq!(rx.next(), Some(0));
284        }
285    }
286
287    #[test]
288    #[ignore] // TODO: not yet implemented
289    fn no_delay_when_added() {
290        let (tx, mut rx) = queue::queue(true);
291
292        for _ in 0..500 {
293            assert_eq!(rx.next(), Some(0));
294        }
295
296        tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
297        assert_eq!(rx.next(), Some(10));
298        assert_eq!(rx.next(), Some(-10));
299        assert_eq!(rx.next(), Some(10));
300        assert_eq!(rx.next(), Some(-10));
301    }
302}