use core::cell::{Cell, RefCell};
use crate::{frame, math::Float, spsc, Controlled, Frame, Signal};
pub struct Stream<T> {
send: RefCell<spsc::Sender<T>>,
rate: u32,
inner: RefCell<spsc::Receiver<T>>,
t: Cell<f32>,
closed: Cell<bool>,
}
impl<T> Stream<T> {
pub fn new(rate: u32, size: usize) -> Self {
let (send, recv) = spsc::channel(size);
Self {
send: RefCell::new(send),
rate,
inner: RefCell::new(recv),
t: Cell::new(0.0),
closed: Cell::new(false),
}
}
#[inline]
fn get(&self, sample: isize) -> T
where
T: Frame + Copy,
{
if sample < 0 {
return T::ZERO;
}
let sample = sample as usize;
let inner = self.inner.borrow();
if sample >= inner.len() {
return T::ZERO;
}
inner[sample]
}
fn sample_single(&self, s: f32) -> T
where
T: Frame + Copy,
{
let x0 = s.trunc() as isize;
let fract = s.fract() as f32;
let x1 = x0 + 1;
let a = self.get(x0);
let b = self.get(x1);
frame::lerp(&a, &b, fract)
}
fn advance(&self, dt: f32) {
let mut inner = self.inner.borrow_mut();
let next = self.t.get() + dt * self.rate as f32;
let t = next.min(inner.len() as f32);
inner.release(t as usize);
self.t.set(t.fract());
}
}
impl<T: Frame + Copy> Signal for Stream<T> {
type Frame = T;
fn sample(&self, interval: f32, out: &mut [T]) {
self.inner.borrow_mut().update();
let s0 = self.t.get();
let ds = interval * self.rate as f32;
for (i, o) in out.iter_mut().enumerate() {
*o = self.sample_single(s0 + ds * i as f32);
}
self.advance(interval * out.len() as f32);
}
#[allow(clippy::float_cmp)]
fn is_finished(&self) -> bool {
if !self.closed.get() {
return false;
}
self.t.get() == self.inner.borrow().len() as f32
}
fn handle_dropped(&self) {
self.closed.set(true);
self.inner.borrow_mut().update();
}
}
pub struct StreamControl<'a, T>(&'a Stream<T>);
unsafe impl<'a, T> Controlled<'a> for Stream<T>
where
T: 'static,
{
type Control = StreamControl<'a, T>;
unsafe fn make_control(signal: &'a Stream<T>) -> Self::Control {
StreamControl(signal)
}
}
impl<'a, T> StreamControl<'a, T> {
pub fn write(&mut self, samples: &[T]) -> usize
where
T: Copy,
{
self.0.send.borrow_mut().send_from_slice(samples)
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
fn assert_out(stream: &Stream<f32>, expected: &[f32]) {
let mut output = vec![0.0; expected.len()];
stream.sample(1.0, &mut output);
assert_eq!(output, expected);
}
#[test]
fn smoke() {
let s = Stream::<f32>::new(1, 3);
assert_eq!(StreamControl(&s).write(&[1.0, 2.0]), 2);
assert_eq!(StreamControl(&s).write(&[3.0, 4.0]), 1);
assert_out(&s, &[1.0, 2.0, 3.0, 0.0, 0.0]);
assert_eq!(StreamControl(&s).write(&[5.0, 6.0, 7.0, 8.0]), 3);
assert_out(&s, &[5.0]);
assert_out(&s, &[6.0, 7.0, 0.0, 0.0]);
assert_out(&s, &[0.0, 0.0]);
}
#[test]
fn cleanup() {
let s = Stream::<f32>::new(1, 4);
assert_eq!(StreamControl(&s).write(&[1.0, 2.0]), 2);
assert!(!s.is_finished());
s.handle_dropped();
assert!(!s.is_finished());
s.sample(1.0, &mut [0.0]);
assert!(!s.is_finished());
s.sample(1.0, &mut [0.0]);
assert!(s.is_finished());
s.sample(1.0, &mut [0.0]);
assert!(s.is_finished());
}
}