use futures::SinkExt;
use futuresdr::channel::mpsc::Sender;
use futuresdr_types::BlockId;
use crate::runtime::BlockMessage;
use crate::runtime::BlockPortCtx;
use crate::runtime::Error;
use crate::runtime::Pmt;
use crate::runtime::PortId;
#[derive(Debug)]
pub struct MessageOutput {
name: String,
handlers: Vec<(PortId, Sender<BlockMessage>)>,
}
impl MessageOutput {
pub fn new(name: &str) -> MessageOutput {
MessageOutput {
name: name.to_string(),
handlers: Vec::new(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn connect(&mut self, port: PortId, sender: Sender<BlockMessage>) {
self.handlers.push((port, sender));
}
pub async fn notify_finished(&mut self) {
for (port_id, sender) in self.handlers.iter_mut() {
let _ = sender
.send(BlockMessage::Call {
port_id: port_id.clone(),
data: Pmt::Finished,
})
.await;
}
}
pub async fn post(&mut self, p: Pmt) {
for (port_id, sender) in self.handlers.iter_mut() {
let _ = sender
.send(BlockMessage::Call {
port_id: port_id.clone(),
data: p.clone(),
})
.await;
}
}
}
#[derive(Debug)]
pub struct MessageOutputs {
block_id: BlockId,
outputs: Vec<MessageOutput>,
}
impl MessageOutputs {
pub fn new(block_id: BlockId, outputs: Vec<String>) -> Self {
let outputs = outputs.iter().map(|x| MessageOutput::new(x)).collect();
MessageOutputs { block_id, outputs }
}
pub async fn post(&mut self, id: impl Into<PortId>, p: Pmt) -> Result<(), Error> {
let id = id.into();
self.output_mut(&id)
.ok_or(Error::InvalidMessagePort(BlockPortCtx::None, id))?
.post(p)
.await;
Ok(())
}
pub fn connect(
&mut self,
src_port: &PortId,
dst_block_inbox: Sender<BlockMessage>,
dst_port: &PortId,
) -> Result<(), Error> {
let block_id = self.block_id;
self.output_mut(src_port)
.ok_or_else(|| Error::InvalidMessagePort(BlockPortCtx::Id(block_id), src_port.clone()))?
.connect(dst_port.clone(), dst_block_inbox);
Ok(())
}
pub async fn notify_finished(&mut self) {
for o in self.outputs.iter_mut() {
o.notify_finished().await;
}
}
fn output_mut(&mut self, port: &PortId) -> Option<&mut MessageOutput> {
self.outputs
.iter_mut()
.find(|item| item.name() == port.name())
}
}