Expand description
Data flow between blocks
Signal processing blocks implement the Producer
trait, the Consumer
trait, or both traits.
Upon creation, Producer
s use the new_sender
function to create a pair
consisting of a Sender
and a SenderConnector
. The Sender
is
passed to a background task while the SenderConnector
is stored and
accessible through the Producer::sender_connector
method.
Consumers
use the new_receiver
function upon creation to create a
pair of a Receiver
and a ReceiverConnector
. The Receiver
is
passed to a background task while the ReceiverConnector
is stored and
accessible through the Consumer::receiver_connector
method.
Note that feeding data into multiple Consumer
s/Receiver
s will block if
one of the Consumer
s blocks; i.e. all Consumer
s/Receiver
s must have
received the data before more can be sent by the Producer
/Sender
.
For each Sender
, there is a buffer capacity of 1
(see underlying
broadcast_bp
channel). Thus a chain of blocks may accumulate a
significant buffer volume. This may be unwanted and can be handled by
placing a blocks::buffering::Buffer
block near the end of the chain.
Example
The following toy example passes a String
from a Producer
to a
Consumer
. For radio applications, you will usually pass Samples
instead.
use radiorust::flow::*;
use tokio::sync::oneshot;
use tokio::task::spawn;
struct MySource {
sender_connector: SenderConnector<String>,
/* extra fields can go here */
}
impl MySource {
fn new() -> Self {
let (sender, sender_connector) = new_sender::<String>();
spawn(async move {
sender.send("Hello World!".to_string()).await;
});
Self { sender_connector }
}
}
impl Producer<String> for MySource {
fn sender_connector(&self) -> &SenderConnector<String> {
&self.sender_connector
}
}
struct MySink {
receiver_connector: ReceiverConnector<String>,
finish: oneshot::Receiver<()>,
/* extra fields can go here */
}
impl MySink {
fn new() -> Self {
let (mut receiver, receiver_connector) = new_receiver::<String>();
let (finish_send, finish_recv) = oneshot::channel::<()>();
spawn(async move {
assert_eq!(receiver.recv().await.unwrap(), "Hello World!".to_string());
finish_send.send(());
});
Self { receiver_connector, finish: finish_recv }
}
async fn wait(self) {
self.finish.await.unwrap();
}
}
impl Consumer<String> for MySink {
fn receiver_connector(&self) -> &ReceiverConnector<String> {
&self.receiver_connector
}
}
let source = MySource::new();
let sink = MySink::new();
sink.feed_from(&source);
sink.wait().await;
Re-exports
Structs
Enums
Receiver::recv
Traits
ReceiverConnector
and can be connected to a
Producer
SenderConnector
and can be connected to a
Consumer
Functions
Receiver
with an associated ReceiverConnector
Sender
with an associated SenderConnector