Skip to main content

rustradio/
delay.rs

1//! Delay stream. Good for syncing up streams.
2use log::debug;
3
4use crate::block::{Block, BlockRet};
5use crate::stream::{ReadStream, WriteStream};
6use crate::{Result, Sample};
7
8/// Delay stream. Good for syncing up streams.
9#[derive(rustradio_macros::Block)]
10#[rustradio(crate)]
11pub struct Delay<T: Sample> {
12    delay: usize,
13    current_delay: usize,
14    skip: usize,
15    #[rustradio(in)]
16    src: ReadStream<T>,
17    #[rustradio(out)]
18    dst: WriteStream<T>,
19}
20
21impl<T: Sample> Delay<T> {
22    /// Create new Delay block.
23    #[must_use]
24    pub fn new(src: ReadStream<T>, delay: usize) -> (Self, ReadStream<T>) {
25        let (dst, dr) = crate::stream::new_stream();
26        (
27            Self {
28                src,
29                dst,
30                delay,
31                current_delay: delay,
32                skip: 0,
33            },
34            dr,
35        )
36    }
37
38    /// Change the delay.
39    pub fn set_delay(&mut self, delay: usize) {
40        if delay > self.delay {
41            self.current_delay = delay - self.delay;
42        } else {
43            let cdskip = std::cmp::min(self.current_delay, delay);
44            self.current_delay -= cdskip;
45            self.skip = (self.delay - delay) - cdskip;
46        }
47        self.delay = delay;
48    }
49}
50
51impl<T: Sample> Block for Delay<T> {
52    fn work(&mut self) -> Result<BlockRet<'_>> {
53        {
54            let o = self.dst.write_buf()?;
55            if o.is_empty() {
56                return Ok(BlockRet::Again);
57            }
58        }
59        if self.current_delay > 0 {
60            let mut o = self.dst.write_buf()?;
61            let n = std::cmp::min(self.current_delay, o.len());
62            if n == 0 {
63                return Ok(BlockRet::WaitForStream(&self.src, 1));
64            }
65            o.slice()[..n].fill(T::default());
66            o.produce(n, &[]);
67            self.current_delay -= n;
68        }
69        {
70            let (input, _tags) = self.src.read_buf()?;
71            let a = input.len();
72            let n = std::cmp::min(a, self.skip);
73            if n == 0 && a == 0 {
74                return Ok(BlockRet::WaitForStream(&self.src, 1));
75            }
76            input.consume(n);
77            debug!("Delay: skipped {n}");
78            self.skip -= n;
79        }
80        let mut o = self.dst.write_buf()?;
81        let (input, tags) = self.src.read_buf()?;
82        let n = std::cmp::min(input.len(), o.len());
83        o.fill_from_slice(input.slice());
84        o.produce(n, &tags);
85        input.consume(n);
86        Ok(BlockRet::Again)
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    // TODO: test tag propagation.
95
96    #[test]
97    fn delay_zero() -> Result<()> {
98        let s = ReadStream::from_slice(&[1.0f32, 2.0, 3.0]);
99        let (mut delay, o) = Delay::new(s, 0);
100
101        delay.work()?;
102        let (res, _) = o.read_buf()?;
103        assert_eq!(res.slice(), vec![1.0f32, 2.0, 3.0]);
104        Ok(())
105    }
106
107    #[test]
108    fn delay_one() -> Result<()> {
109        let s = ReadStream::from_slice(&[1.0f32, 2.0, 3.0]);
110        let (mut delay, o) = Delay::new(s, 1);
111
112        delay.work()?;
113        let (res, _) = o.read_buf()?;
114        assert_eq!(res.slice(), vec![0.0f32, 1.0, 2.0, 3.0]);
115        Ok(())
116    }
117
118    #[test]
119    fn delay_change() -> Result<()> {
120        let s = ReadStream::from_slice(&[1u32, 2]);
121        let (mut delay, o) = Delay::new(s, 1);
122
123        delay.work()?;
124        {
125            let (res, _) = o.read_buf()?;
126            assert_eq!(res.slice(), vec![0, 1, 2]);
127        }
128
129        // TODO: fix
130        /*
131        // 3,4 => 0,3,4
132        {
133            let mut b = s.write_buf()?;
134            b.fill_from_slice(&[3, 4]);
135            b.produce(2, &[]);
136        }
137        delay.set_delay(2);
138        delay.work()?;
139        {
140            let (res, _) = o.read_buf()?;
141            assert_eq!(res.slice(), vec![0, 1, 2, 0, 3, 4]);
142        }
143
144        // 5,6 => 0,3,4
145        {
146            let mut b = s.write_buf()?;
147            b.fill_from_slice(&[5, 6]);
148            b.produce(2, &[]);
149        }
150        delay.set_delay(0);
151        delay.work()?;
152        {
153            let (res, _) = o.read_buf()?;
154            assert_eq!(res.slice(), vec![0, 1, 2, 0, 3, 4]);
155        }
156
157        // 7 => 7
158        {
159            let mut b = s.write_buf()?;
160            b.slice()[0] = 7;
161            b.produce(1, &[]);
162        }
163        delay.set_delay(0);
164        delay.work()?;
165        {
166            let (res, _) = o.read_buf()?;
167            assert_eq!(res.slice(), vec![0, 1, 2, 0, 3, 4, 7]);
168        }
169        */
170        Ok(())
171    }
172}