1use crate::{frame, math::Float, spsc, Frame, Signal};
4
5pub struct Stream<T> {
7 rate: u32,
8 inner: spsc::Receiver<T>,
9 t: f32,
11 stopping: bool,
13}
14
15impl<T> Stream<T> {
16 pub fn new(rate: u32, size: usize) -> (StreamControl<T>, Self) {
25 let (send, recv) = spsc::channel(size);
26 let signal = Self {
27 rate,
28 inner: recv,
29 t: 0.0,
30 stopping: false,
31 };
32 let control = StreamControl(send);
33 (control, signal)
34 }
35
36 #[inline]
37 fn get(&self, sample: isize) -> T
38 where
39 T: Frame + Copy,
40 {
41 if sample < 0 {
42 return T::ZERO;
43 }
44 let sample = sample as usize;
45 if sample >= self.inner.len() {
46 return T::ZERO;
47 }
48 self.inner[sample]
49 }
50
51 fn sample_single(&self, s: f32) -> T
52 where
53 T: Frame + Copy,
54 {
55 let x0 = s.trunc() as isize;
56 let fract = s.fract();
57 let x1 = x0 + 1;
58 let a = self.get(x0);
59 let b = self.get(x1);
60 frame::lerp(&a, &b, fract)
61 }
62
63 fn advance(&mut self, dt: f32) {
64 let next = self.t + dt * self.rate as f32;
65 let t = next.min(self.inner.len() as f32);
66 self.inner.release(t as usize);
67 self.t = t.fract();
68 }
69}
70
71impl<T: Frame + Copy> Signal for Stream<T> {
72 type Frame = T;
73
74 fn sample(&mut self, interval: f32, out: &mut [T]) {
75 self.inner.update();
76 if self.inner.is_closed() {
77 self.stopping = true;
78 }
79 let s0 = self.t;
80 let ds = interval * self.rate as f32;
81
82 for (i, o) in out.iter_mut().enumerate() {
83 *o = self.sample_single(s0 + ds * i as f32);
84 }
85 self.advance(interval * out.len() as f32);
86 }
87
88 #[allow(clippy::float_cmp)]
89 fn is_finished(&self) -> bool {
90 self.stopping && self.t == self.inner.len() as f32
91 }
92}
93
94pub struct StreamControl<T>(spsc::Sender<T>);
96
97impl<T> StreamControl<T> {
98 pub fn free(&mut self) -> usize {
100 self.0.free()
101 }
102
103 pub fn write(&mut self, samples: &[T]) -> usize
106 where
107 T: Copy,
108 {
109 self.0.send_from_slice(samples)
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use alloc::vec;
117
118 fn assert_out(stream: &mut Stream<f32>, expected: &[f32]) {
119 let mut output = vec![0.0; expected.len()];
120 stream.sample(1.0, &mut output);
121 assert_eq!(output, expected);
122 }
123
124 #[test]
125 fn smoke() {
126 let (mut c, mut s) = Stream::<f32>::new(1, 3);
127 assert_eq!(c.write(&[1.0, 2.0]), 2);
128 assert_eq!(c.write(&[3.0, 4.0]), 1);
129 assert_out(&mut s, &[1.0, 2.0, 3.0, 0.0, 0.0]);
130 assert_eq!(c.write(&[5.0, 6.0, 7.0, 8.0]), 3);
131 assert_out(&mut s, &[5.0]);
132 assert_out(&mut s, &[6.0, 7.0, 0.0, 0.0]);
133 assert_out(&mut s, &[0.0, 0.0]);
134 }
135
136 #[test]
137 fn cleanup() {
138 let (mut c, mut s) = Stream::<f32>::new(1, 4);
139 assert_eq!(c.write(&[1.0, 2.0]), 2);
140 assert!(!s.is_finished());
141 drop(c);
142 assert!(!s.is_finished());
143 s.sample(1.0, &mut [0.0]);
144 assert!(!s.is_finished());
145 s.sample(1.0, &mut [0.0]);
146 assert!(s.is_finished());
147 s.sample(1.0, &mut [0.0]);
148 assert!(s.is_finished());
149 }
150}