oddio/
stream.rs

1//! Streaming audio support
2
3use crate::{frame, math::Float, spsc, Frame, Signal};
4
5/// Dynamic audio from an external source
6pub struct Stream<T> {
7    rate: u32,
8    inner: spsc::Receiver<T>,
9    /// Offset of t=0 from the start of the buffer, in frames
10    t: f32,
11    /// Whether `inner` will receive no further updates
12    stopping: bool,
13}
14
15impl<T> Stream<T> {
16    /// Construct a stream of dynamic audio
17    ///
18    /// Samples can be appended to the stream through its [`StreamControl`]. This allows the
19    /// business of obtaining streaming audio, e.g. from a streaming decoder or the network, to take
20    /// place without interfering with the low-latency requirements of audio output.
21    ///
22    /// - `rate` is the stream's sample rate
23    /// - `size` dictates the maximum number of buffered frames
24    pub fn new(rate: u32, size: usize) -> (StreamControl<T>, Self) {
25        let (send, recv) = spsc::channel(size);
26        let signal = Self {
27            rate,
28            inner: recv,
29            t: 0.0,
30            stopping: false,
31        };
32        let control = StreamControl(send);
33        (control, signal)
34    }
35
36    #[inline]
37    fn get(&self, sample: isize) -> T
38    where
39        T: Frame + Copy,
40    {
41        if sample < 0 {
42            return T::ZERO;
43        }
44        let sample = sample as usize;
45        if sample >= self.inner.len() {
46            return T::ZERO;
47        }
48        self.inner[sample]
49    }
50
51    fn sample_single(&self, s: f32) -> T
52    where
53        T: Frame + Copy,
54    {
55        let x0 = s.trunc() as isize;
56        let fract = s.fract();
57        let x1 = x0 + 1;
58        let a = self.get(x0);
59        let b = self.get(x1);
60        frame::lerp(&a, &b, fract)
61    }
62
63    fn advance(&mut self, dt: f32) {
64        let next = self.t + dt * self.rate as f32;
65        let t = next.min(self.inner.len() as f32);
66        self.inner.release(t as usize);
67        self.t = t.fract();
68    }
69}
70
71impl<T: Frame + Copy> Signal for Stream<T> {
72    type Frame = T;
73
74    fn sample(&mut self, interval: f32, out: &mut [T]) {
75        self.inner.update();
76        if self.inner.is_closed() {
77            self.stopping = true;
78        }
79        let s0 = self.t;
80        let ds = interval * self.rate as f32;
81
82        for (i, o) in out.iter_mut().enumerate() {
83            *o = self.sample_single(s0 + ds * i as f32);
84        }
85        self.advance(interval * out.len() as f32);
86    }
87
88    #[allow(clippy::float_cmp)]
89    fn is_finished(&self) -> bool {
90        self.stopping && self.t == self.inner.len() as f32
91    }
92}
93
94/// Thread-safe control for a [`Stream`]
95pub struct StreamControl<T>(spsc::Sender<T>);
96
97impl<T> StreamControl<T> {
98    /// Lower bound to the number of samples that the next `write` call will successfully consume
99    pub fn free(&mut self) -> usize {
100        self.0.free()
101    }
102
103    /// Add more samples. Returns the number of samples consumed. Remaining samples should be passed
104    /// in again in a future call.
105    pub fn write(&mut self, samples: &[T]) -> usize
106    where
107        T: Copy,
108    {
109        self.0.send_from_slice(samples)
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use alloc::vec;
117
118    fn assert_out(stream: &mut Stream<f32>, expected: &[f32]) {
119        let mut output = vec![0.0; expected.len()];
120        stream.sample(1.0, &mut output);
121        assert_eq!(output, expected);
122    }
123
124    #[test]
125    fn smoke() {
126        let (mut c, mut s) = Stream::<f32>::new(1, 3);
127        assert_eq!(c.write(&[1.0, 2.0]), 2);
128        assert_eq!(c.write(&[3.0, 4.0]), 1);
129        assert_out(&mut s, &[1.0, 2.0, 3.0, 0.0, 0.0]);
130        assert_eq!(c.write(&[5.0, 6.0, 7.0, 8.0]), 3);
131        assert_out(&mut s, &[5.0]);
132        assert_out(&mut s, &[6.0, 7.0, 0.0, 0.0]);
133        assert_out(&mut s, &[0.0, 0.0]);
134    }
135
136    #[test]
137    fn cleanup() {
138        let (mut c, mut s) = Stream::<f32>::new(1, 4);
139        assert_eq!(c.write(&[1.0, 2.0]), 2);
140        assert!(!s.is_finished());
141        drop(c);
142        assert!(!s.is_finished());
143        s.sample(1.0, &mut [0.0]);
144        assert!(!s.is_finished());
145        s.sample(1.0, &mut [0.0]);
146        assert!(s.is_finished());
147        s.sample(1.0, &mut [0.0]);
148        assert!(s.is_finished());
149    }
150}