Module radiorust::flow

source ·
Expand description

Data flow between blocks

Signal processing blocks implement the Producer trait, the Consumer trait, or both traits.

Upon creation, Producers use the new_sender function to create a pair consisting of a Sender and a SenderConnector. The Sender is passed to a background task while the SenderConnector is stored and accessible through the Producer::sender_connector method.

Consumers use the new_receiver function upon creation to create a pair of a Receiver and a ReceiverConnector. The Receiver is passed to a background task while the ReceiverConnector is stored and accessible through the Consumer::receiver_connector method.

Note that feeding data into multiple Consumers/Receivers will block if one of the Consumers blocks; i.e. all Consumers/Receivers must have received the data before more can be sent by the Producer/Sender.

For each Sender, there is a buffer capacity of 1 (see underlying broadcast_bp channel). Thus a chain of blocks may accumulate a significant buffer volume. This may be unwanted and can be handled by placing a blocks::buffering::Buffer block near the end of the chain.

Example

The following toy example passes a String from a Producer to a Consumer. For radio applications, you will usually pass Samples instead.

use radiorust::flow::*;
use tokio::sync::oneshot;
use tokio::task::spawn;

struct MySource {
    sender_connector: SenderConnector<String>,
    /* extra fields can go here */
}
impl MySource {
    fn new() -> Self {
        let (sender, sender_connector) = new_sender::<String>();
        spawn(async move {
            sender.send("Hello World!".to_string()).await;
        });
        Self { sender_connector }
    }
}
impl Producer<String> for MySource {
    fn sender_connector(&self) -> &SenderConnector<String> {
        &self.sender_connector
    }
}

struct MySink {
    receiver_connector: ReceiverConnector<String>,
    finish: oneshot::Receiver<()>,
    /* extra fields can go here */
}
impl MySink {
    fn new() -> Self {
        let (mut receiver, receiver_connector) = new_receiver::<String>();
        let (finish_send, finish_recv) = oneshot::channel::<()>();
        spawn(async move {
            assert_eq!(receiver.recv().await.unwrap(), "Hello World!".to_string());
            finish_send.send(());
        });
        Self { receiver_connector, finish: finish_recv }
    }
    async fn wait(self) {
        self.finish.await.unwrap();
    }
}
impl Consumer<String> for MySink {
    fn receiver_connector(&self) -> &ReceiverConnector<String> {
        &self.receiver_connector
    }
}

let source = MySource::new();
let sink = MySink::new();
sink.feed_from(&source);

sink.wait().await;

Re-exports

pub use broadcast_bp::RsrvError;
pub use broadcast_bp::SendError;

Structs

Receiver that can be dynamically connected to a Sender
Handle to connect a Receiver to a Sender
Guarantee to send one value from Sender to Receivers immediately
Sender that can be dynamically connected to a Receiver
Handle to connect a Sender to a Receiver

Enums

Error value returned by Receiver::recv

Traits

Type which contains a ReceiverConnector and can be connected to a Producer
Type which contains a SenderConnector and can be connected to a Consumer

Functions

Create a Receiver with an associated ReceiverConnector
Create a Sender with an associated SenderConnector