futuresdr 0.0.39

An Experimental Async SDR Runtime for Heterogeneous Architectures.
Documentation
use anyhow::Result;
use anyhow::anyhow;
use futuresdr::async_io::block_on;
use futuresdr::blocks::ChannelSource;
use futuresdr::blocks::VectorSink;
use futuresdr::futures::SinkExt;
use futuresdr::prelude::*;

#[test]
fn channel_source_min() -> Result<()> {
    let mut fg = Flowgraph::new();
    let (mut tx, rx) = mpsc::channel(10);

    let cs = ChannelSource::<u32>::new(rx);
    let snk = VectorSink::<u32>::new(1024);
    connect!(fg, cs > snk);

    let rt = Runtime::new();
    block_on(async move {
        let (fg, _) = rt.start(fg).await?;
        tx.send(vec![0, 1, 2].into_boxed_slice()).await?;
        tx.close().await?;
        fg.await.map_err(|e| anyhow!("Flowgraph error, {e}"))
    })?;

    let snk = snk.get()?;
    assert_eq!(*snk.items(), vec![0, 1, 2]);
    Ok(())
}

#[test]
fn channel_source_small() -> Result<()> {
    let mut fg = Flowgraph::new();
    let (mut tx, rx) = mpsc::channel(10);

    let cs = ChannelSource::<u32>::new(rx);
    let snk = VectorSink::<u32>::new(1024);
    connect!(fg, cs > snk);

    let rt = Runtime::new();
    block_on(async move {
        let (fg, _) = rt.start(fg).await?;
        tx.send(vec![0, 1, 2].into_boxed_slice()).await?;
        tx.send(vec![3, 4].into_boxed_slice()).await?;
        tx.send(vec![].into_boxed_slice()).await?;
        tx.send(vec![5].into_boxed_slice()).await?;
        tx.close().await?;
        fg.await.map_err(|e| anyhow!("Flowgraph error, {e}"))
    })?;

    let snk = snk.get()?;
    assert_eq!(*snk.items(), vec![0, 1, 2, 3, 4, 5]);
    Ok(())
}

#[test]
fn channel_source_big() -> Result<()> {
    let mut fg = Flowgraph::new();
    let (mut tx, rx) = mpsc::channel(10);

    let cs = ChannelSource::<u32>::new(rx);
    let snk = VectorSink::<u32>::new(1024);
    connect!(fg, cs > snk);

    let rt = Runtime::new();
    block_on(async move {
        let (fg, _) = rt.start(fg).await?;
        tx.send(vec![0; 99999].into_boxed_slice()).await?;
        tx.send(vec![1; 88888].into_boxed_slice()).await?;
        tx.close().await?;
        fg.await.map_err(|e| anyhow!("Flowgraph error, {e}"))
    })?;

    let snk = snk.get()?;
    let mut expected = vec![0; 99999];
    expected.extend_from_slice(&[1; 88888]);
    assert_eq!(*snk.items(), expected);

    Ok(())
}