futuresdr 0.0.41

An Experimental Async SDR Runtime for Heterogeneous Architectures.
Documentation
# Flowgraph

A [`Flowgraph`](https://docs.rs/futuresdr/latest/futuresdr/runtime/struct.Flowgraph.html) is a directed graph of blocks and connections. Blocks do the actual work; the flowgraph describes which stream ports and message ports are connected.

Stream connections carry sample streams between blocks. They must form a directed acyclic graph. Message connections carry PMTs between message handlers and can use arbitrary topologies.

## Constructing Flowgraphs

Create an empty flowgraph with `Flowgraph::new()`, add blocks, and connect them. The usual way to build a flowgraph is the `connect!` macro. It adds blocks to the flowgraph if needed and wires their ports.

The simplest stream connection uses the default stream output and input port names:

```rust
use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = NullSource::<u8>::new();
let head = Head::<u8>::new(1024);
let snk = NullSink::<u8>::new();

connect!(fg, src > head > snk);
```

Named stream ports can be selected explicitly. Output ports are written after the source block, and input ports are written before the destination block:

```rust
connect!(fg, src.output > input.head > snk);
```

Message connections use `|` instead of `>`. This example connects the `out` message output of `msg_source` to the `in` message input of `msg_copy`, then forwards messages to `msg_sink`:

```rust
use futuresdr::blocks::MessageCopy;
use futuresdr::blocks::MessageSink;
use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();

let msg_source = MessageSourceBuilder::new(Pmt::String("foo".to_string()), Duration::from_millis(100))
    .n_messages(20)
    .build();
let msg_copy = MessageCopy::new();
let msg_sink = MessageSink::new();

connect!(fg, msg_source | msg_copy | msg_sink);
```

Message ports can also be named explicitly:

```rust
connect!(fg, msg_source.out | r#in.msg_copy);
```

The `r#in` spelling is Rust's raw-identifier syntax for a port named `in`.

Stream and message connections can be mixed in one macro invocation. Separate independent connections with semicolons:

```rust
connect!(fg,
    src > head > snk;
    msg_source | msg_copy | msg_sink;
);
```

Blocks can also be added and connected manually. This is what the macro is doing for the common cases: it stores blocks in the flowgraph, gets their port endpoints, and records stream or message edges.

```rust
use futuresdr::blocks::MessageCopy;
use futuresdr::blocks::MessageSink;
use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();

let src = fg.add(VectorSource::<u32>::new(vec![1, 2, 3, 4]));
let snk = fg.add(VectorSink::<u32>::new(4));
fg.stream_dyn(src, "output", snk, "input")?;

let msg_source = fg.add(
    MessageSourceBuilder::new(Pmt::String("foo".to_string()),
            Duration::from_millis(100))
        .n_messages(20)
        .build(),
);
let msg_copy = fg.add(MessageCopy::new());
let msg_sink = fg.add(MessageSink::new());

fg.message(msg_source, "out", msg_copy, "in")?;
fg.message(msg_copy, "out", msg_sink, "in")?;

let fg = Runtime::new().run(fg)?;
```

Use `connect!` for normal application code. The explicit form is useful when block types are selected dynamically or when it helps to understand the lower-level API.

## Local Domains

Normal blocks and buffers must be send-capable because the scheduler may move block tasks between workers. A local domain gives you a single-thread execution island for blocks or buffers that are not `Send`, or for integrations that must stay on one thread. On WASM, local domains are backed by web workers.

Create a local domain, add blocks with `add_local()`, and connect local-only stream buffers with `~>` in `connect!` or `stream_local()` manually:

```rust
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;
use futuresdr::runtime::buffer::LocalCpuReader;
use futuresdr::runtime::buffer::LocalCpuWriter;

let mut fg = Flowgraph::new();
let local = fg.local_domain()?;

let src = fg.add_local(local, || {
    NullSource::<f32, LocalCpuWriter<f32>>::new()
});
let snk = fg.add_local(local, || {
    NullSink::<f32, LocalCpuReader<f32>>::new()
});

fg.stream_local(&src, |b| b.output(), &snk, |b| b.input())?;
```

The `~>` macro operator is the equivalent typed local-stream connection:

```rust
connect!(fg, src ~> snk);
```

Local-only stream connections must stay inside one local domain. Send-capable stream buffers can still connect normal blocks and local-domain blocks. Message connections are not restricted by local domains.

When a block's state must be created on the local-domain thread itself, build that part of the graph with `domain_run()` (or `domain_run_async()` in async code). The closure receives a `LocalDomainContext`; add local blocks through `ctx.add(...)` and use the same `connect!` syntax with `ctx` as the graph argument:

```rust
use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;
use futuresdr::runtime::buffer::LocalCpuReader;
use futuresdr::runtime::buffer::LocalCpuWriter;

let mut fg = Flowgraph::new();
let local = fg.local_domain()?;

let snk = fg.domain_run(local, |ctx| {
    let src = ctx.add(NullSource::<u8, LocalCpuWriter<u8>>::new());
    let head = ctx.add(Head::<u8, LocalCpuReader<u8>, LocalCpuWriter<u8>>::new(10));
    let snk = ctx.add(NullSink::<u8, LocalCpuReader<u8>>::new());

    connect!(ctx, src ~> head ~> snk);

    Ok(snk)
})?;
```

On `wasm32`, use the async forms (`add_local_async`, `domain_run_async`, and `connect_async!`) while constructing the graph.

## Accessing Blocks

When a block is added to a flowgraph, FutureSDR returns a `BlockRef<T>`. A block reference is a lightweight typed identifier. It is copyable, can be converted to a `BlockId`, and can be used to access the block while the flowgraph owns it.

The `connect!` macro also leaves you with block references for the blocks it added. After a blocking `Runtime::run()`, the finished `Flowgraph` is returned, so the same `BlockRef` can be used to inspect block state:

```rust
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = VectorSource::<u32>::new(vec![1, 2, 3, 4]);
let snk = VectorSink::<u32>::new(4);

connect!(fg, src > snk);

let fg = Runtime::new().run(fg)?;
let snk = fg.block(&snk)?;

assert_eq!(snk.items(), &vec![1, 2, 3, 4]);
```

Similarly, `block_mut()` can be used to update block metadata or block state:

```rust
let mut fg = Flowgraph::new();
let snk = fg.add(VectorSink::<u32>::new(4));

fg.block_mut(&snk)?.set_instance_name("samples");
```

Use `BlockRef::id()` or convert a `BlockRef` into `BlockId` when a runtime interaction API needs an untyped block identifier.

## Flowgraph Interactions

`Runtime::run()` is the simplest way to execute a flowgraph when you only need the result after it finishes. To interact with a flowgraph while it is running, start it with `Runtime::start()` on native targets or `Runtime::start_async()` in async code. Both return a [`RunningFlowgraph`](https://docs.rs/futuresdr/latest/futuresdr/runtime/struct.RunningFlowgraph.html).

`RunningFlowgraph` can post messages, call message handlers, describe the running graph, stop it, and wait for completion.

The following example starts a flowgraph and continuously hops through a list of frequencies by posting `Pmt::F64` values to a block's `freq` message handler:

```rust
use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();
// set up the flowgraph

// `my_seify_source` is a source or sink block with a `freq` message input.
let radio = fg.add(my_seify_source);

let rt = Runtime::new();
let running = rt.start(fg)?;

Runtime::block_on(async move {
    let frequencies = [100.0e6, 101.0e6, 102.0e6];

    for freq in frequencies.iter().cycle() {
        running.post(radio, "freq", Pmt::F64(*freq)).await?;
        Timer::after(Duration::from_secs(1)).await;
    }
})?;
```

Waiting for completion is a separate operation. Use it when the flowgraph is expected to finish on its own, for example when a finite source reaches the end of its input:

```rust
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = VectorSource::<u32>::new(vec![1, 2, 3, 4]);
let snk = VectorSink::<u32>::new(4);

connect!(fg, src > snk);

let rt = Runtime::new();
let running = rt.start(fg)?;

let fg = running.wait()?;
let snk = fg.block(&snk)?;

assert_eq!(snk.items(), &vec![1, 2, 3, 4]);
```

For flowgraphs that do not finish on their own, request shutdown before waiting:

```rust
Runtime::block_on(async move {
    running.stop().await?;
    let fg = running.wait_async().await?;
    Ok::<_, Error>(fg)
})?;
```

If multiple tasks need access to the same running flowgraph, keep a clonable handle:

```rust
let handle = running.handle();
Runtime::block_on(async move {
    handle.post(radio_id, "freq", Pmt::F64(100.0e6)).await
})?;
```