use log::trace;
use crate::block::{Block, BlockRet};
use crate::stream::{NCReadStream, Tag, TagValue, WriteStream};
use crate::{Result, Sample};
pub const TAG_START: &str = "VecToStream::start";
pub const TAG_END: &str = "VecToStream::end";
#[derive(rustradio_macros::Block)]
#[rustradio(crate, new)]
pub struct VecToStream<T: Sample> {
#[rustradio(in)]
src: NCReadStream<Vec<T>>,
#[rustradio(out)]
dst: WriteStream<T>,
}
impl<T: Sample> Block for VecToStream<T> {
fn work(&mut self) -> Result<BlockRet<'_>> {
let Some(n) = self.src.peek_size() else {
return Ok(BlockRet::WaitForStream(&self.src, 1));
};
let mut o = self.dst.write_buf()?;
if n > o.len() {
return Ok(BlockRet::WaitForStream(&self.src, n));
}
let (v, mut tags) = self
.src
.pop()
.expect("we just checked the size. It must exist");
debug_assert_eq!(v.len(), n);
if n == 0 {
trace!("VecToStream: discarded empty vector");
return Ok(BlockRet::Again);
}
o.fill_from_iter(v);
tags.extend([
Tag::new(0, TAG_START, TagValue::U64(n as u64)),
Tag::new(n - 1, TAG_END, TagValue::U64(n as u64)),
]);
o.produce(n, &tags);
Ok(BlockRet::Again)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Result;
use crate::stream::new_nocopy_stream;
#[test]
fn empty_input() -> Result<()> {
let (_tx, rx) = new_nocopy_stream();
let (mut b, out) = VecToStream::<u8>::new(rx);
assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
assert_eq!(out.read_buf()?.0.len(), 0);
Ok(())
}
#[test]
fn empty_vec() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
tx.push(vec![], &[]);
let (mut b, out) = VecToStream::<u8>::new(rx);
assert!(matches![b.work()?, BlockRet::Again]);
assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
assert_eq!(out.read_buf()?.0.len(), 0);
Ok(())
}
#[test]
fn two() -> Result<()> {
let (tx, rx) = new_nocopy_stream();
tx.push(vec![11, 22, 33], &[]);
tx.push(vec![3, 2, 1, 0], &[]);
let (mut b, out) = VecToStream::<u8>::new(rx);
assert!(matches![b.work()?, BlockRet::Again]);
assert_eq!(out.read_buf()?.0.len(), 3);
assert!(matches![b.work()?, BlockRet::Again]);
assert_eq!(out.read_buf()?.0.len(), 7);
assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
let (o, tags) = out.read_buf()?;
assert_eq!(o.len(), 7);
assert_eq!(
tags,
&[
Tag::new(0, "VecToStream::start", TagValue::U64(3)),
Tag::new(2, "VecToStream::end", TagValue::U64(3)),
Tag::new(3, "VecToStream::start", TagValue::U64(4)),
Tag::new(6, "VecToStream::end", TagValue::U64(4)),
]
);
Ok(())
}
}