1use std::io::Write;
2
3use crate::block::{Block, BlockRet};
4use crate::stream::ReadStream;
5use crate::{Result, Sample};
6
7#[derive(rustradio_macros::Block)]
9#[rustradio(crate)]
10pub struct WriterSink<T: Sample> {
11 writer: Box<dyn Write + Send>,
12 #[rustradio(in)]
13 src: ReadStream<T>,
14}
15
16impl<T: Sample> WriterSink<T> {
17 pub fn new<R: Write + Send + 'static>(src: ReadStream<T>, writer: R) -> Self {
18 Self {
19 writer: Box::new(writer),
20 src,
21 }
22 }
23}
24
25impl<T> Block for WriterSink<T>
26where
27 T: Sample<Type = T> + std::fmt::Debug,
28{
29 fn work(&mut self) -> Result<BlockRet<'_>> {
30 loop {
32 let (i, _) = self.src.read_buf()?;
33 if i.is_empty() {
34 return Ok(BlockRet::WaitForStream(&self.src, 1));
35 }
36 let b = i.slice()[0].serialize();
38 let rc = self.writer.write(&b)?;
39 assert_eq!(rc, b.len(), "TODO: handle short writes");
40 i.consume(1);
41 }
42 }
43}
44
45#[cfg(test)]
46mod tests {
47 use super::*;
48 use crate::blocks::VectorSource;
49 use std::io::Cursor;
50 use std::sync::{Arc, Mutex};
51
52 #[derive(Clone)]
53 struct Fake {
54 cur: Arc<Mutex<Cursor<Vec<u8>>>>,
55 }
56 impl Default for Fake {
57 fn default() -> Self {
58 Self {
59 cur: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
60 }
61 }
62 }
63
64 impl Write for Fake {
65 fn write(&mut self, b: &[u8]) -> std::result::Result<usize, std::io::Error> {
66 self.cur.lock().unwrap().write(b)
67 }
68 fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
69 self.cur.lock().unwrap().flush()
70 }
71 }
72
73 #[test]
74 fn writer_sink() -> Result<()> {
75 let (mut b, prev) = VectorSource::new(b"hello world".to_vec());
76 b.work()?;
77 let fake = Fake::default();
78 let mut b = WriterSink::<u8>::new(prev, fake.clone());
79 b.work()?;
80 assert_eq!(fake.cur.lock().unwrap().get_ref(), b"hello world");
81 Ok(())
82 }
83}