rustradio/
writer_sink.rs

1use std::io::Write;
2
3use crate::block::{Block, BlockRet};
4use crate::stream::ReadStream;
5use crate::{Result, Sample};
6
7/// Arbitrary writer sink.
8#[derive(rustradio_macros::Block)]
9#[rustradio(crate)]
10pub struct WriterSink<T: Sample> {
11    writer: Box<dyn Write + Send>,
12    #[rustradio(in)]
13    src: ReadStream<T>,
14}
15
16impl<T: Sample> WriterSink<T> {
17    pub fn new<R: Write + Send + 'static>(src: ReadStream<T>, writer: R) -> Self {
18        Self {
19            writer: Box::new(writer),
20            src,
21        }
22    }
23}
24
25impl<T> Block for WriterSink<T>
26where
27    T: Sample<Type = T> + std::fmt::Debug,
28{
29    fn work(&mut self) -> Result<BlockRet<'_>> {
30        // TODO: make nonblock.
31        loop {
32            let (i, _) = self.src.read_buf()?;
33            if i.is_empty() {
34                return Ok(BlockRet::WaitForStream(&self.src, 1));
35            }
36            // TODO: very inefficient.
37            let b = i.slice()[0].serialize();
38            let rc = self.writer.write(&b)?;
39            assert_eq!(rc, b.len(), "TODO: handle short writes");
40            i.consume(1);
41        }
42    }
43}
44
45#[cfg(test)]
46mod tests {
47    use super::*;
48    use crate::blocks::VectorSource;
49    use std::io::Cursor;
50    use std::sync::{Arc, Mutex};
51
52    #[derive(Clone)]
53    struct Fake {
54        cur: Arc<Mutex<Cursor<Vec<u8>>>>,
55    }
56    impl Default for Fake {
57        fn default() -> Self {
58            Self {
59                cur: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
60            }
61        }
62    }
63
64    impl Write for Fake {
65        fn write(&mut self, b: &[u8]) -> std::result::Result<usize, std::io::Error> {
66            self.cur.lock().unwrap().write(b)
67        }
68        fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
69            self.cur.lock().unwrap().flush()
70        }
71    }
72
73    #[test]
74    fn writer_sink() -> Result<()> {
75        let (mut b, prev) = VectorSource::new(b"hello world".to_vec());
76        b.work()?;
77        let fake = Fake::default();
78        let mut b = WriterSink::<u8>::new(prev, fake.clone());
79        b.work()?;
80        assert_eq!(fake.cur.lock().unwrap().get_ref(), b"hello world");
81        Ok(())
82    }
83}