use anyhow::Result;
use anyhow::anyhow;
use futuresdr::blocks::ChannelSource;
use futuresdr::blocks::VectorSink;
use futuresdr::prelude::*;
#[test]
fn channel_source_min() -> Result<()> {
let mut fg = Flowgraph::new();
let (tx, rx) = mpsc::channel(10);
let cs = ChannelSource::<u32>::new(rx);
let snk = VectorSink::<u32>::new(1024);
connect!(fg, cs > snk);
let rt = Runtime::new();
let running = rt.start(fg)?;
let fg = Runtime::block_on(async move {
tx.send(vec![0, 1, 2].into_boxed_slice()).await?;
drop(tx);
running
.wait_async()
.await
.map_err(|e| anyhow!("Flowgraph error, {e}"))
})?;
let snk = fg.block(&snk)?;
assert_eq!(*snk.items(), vec![0, 1, 2]);
Ok(())
}
#[test]
fn channel_source_small() -> Result<()> {
let mut fg = Flowgraph::new();
let (tx, rx) = mpsc::channel(10);
let cs = ChannelSource::<u32>::new(rx);
let snk = VectorSink::<u32>::new(1024);
connect!(fg, cs > snk);
let rt = Runtime::new();
let running = rt.start(fg)?;
let fg = Runtime::block_on(async move {
tx.send(vec![0, 1, 2].into_boxed_slice()).await?;
tx.send(vec![3, 4].into_boxed_slice()).await?;
tx.send(vec![].into_boxed_slice()).await?;
tx.send(vec![5].into_boxed_slice()).await?;
drop(tx);
running
.wait_async()
.await
.map_err(|e| anyhow!("Flowgraph error, {e}"))
})?;
let snk = fg.block(&snk)?;
assert_eq!(*snk.items(), vec![0, 1, 2, 3, 4, 5]);
Ok(())
}
#[test]
fn channel_source_big() -> Result<()> {
let mut fg = Flowgraph::new();
let (tx, rx) = mpsc::channel(10);
let cs = ChannelSource::<u32>::new(rx);
let snk = VectorSink::<u32>::new(1024);
connect!(fg, cs > snk);
let rt = Runtime::new();
let running = rt.start(fg)?;
let fg = Runtime::block_on(async move {
tx.send(vec![0; 99999].into_boxed_slice()).await?;
tx.send(vec![1; 88888].into_boxed_slice()).await?;
drop(tx);
running
.wait_async()
.await
.map_err(|e| anyhow!("Flowgraph error, {e}"))
})?;
let snk = fg.block(&snk)?;
let mut expected = vec![0; 99999];
expected.extend_from_slice(&[1; 88888]);
assert_eq!(*snk.items(), expected);
Ok(())
}