use crate::runtime::channel::mpsc::Sender;
use futures::channel::oneshot;
use std::cmp::PartialEq;
use std::fmt::Debug;
use futuresdr::runtime::BlockDescription;
use futuresdr::runtime::BlockId;
use futuresdr::runtime::Error;
use futuresdr::runtime::FlowgraphDescription;
use futuresdr::runtime::FlowgraphMessage;
use futuresdr::runtime::Pmt;
use futuresdr::runtime::PortId;
use futuresdr::runtime::Timer;
#[derive(Debug, Clone)]
pub struct FlowgraphHandle {
inbox: Sender<FlowgraphMessage>,
}
#[derive(Debug, Clone)]
pub struct FlowgraphBlockHandle {
flowgraph: FlowgraphHandle,
block_id: BlockId,
}
impl PartialEq for FlowgraphHandle {
fn eq(&self, other: &Self) -> bool {
self.inbox.same_receiver(&other.inbox)
}
}
impl FlowgraphHandle {
pub(crate) fn new(inbox: Sender<FlowgraphMessage>) -> FlowgraphHandle {
FlowgraphHandle { inbox }
}
pub fn block(&self, block_id: impl Into<BlockId>) -> FlowgraphBlockHandle {
FlowgraphBlockHandle {
flowgraph: self.clone(),
block_id: block_id.into(),
}
}
pub async fn post(
&self,
block_id: impl Into<BlockId>,
port_id: impl Into<PortId>,
data: Pmt,
) -> Result<(), Error> {
let block_id = block_id.into();
let (tx, rx) = oneshot::channel::<Result<(), Error>>();
self.inbox
.send(FlowgraphMessage::BlockCall {
block_id,
port_id: port_id.into(),
data,
tx,
})
.await
.or(Err(Error::InvalidBlock(block_id)))?;
rx.await?
}
pub async fn call(
&self,
block_id: impl Into<BlockId>,
port_id: impl Into<PortId>,
data: Pmt,
) -> Result<Pmt, Error> {
let block_id = block_id.into();
let (tx, rx) = oneshot::channel::<Result<Pmt, Error>>();
self.inbox
.send(FlowgraphMessage::BlockCallback {
block_id,
port_id: port_id.into(),
data,
tx,
})
.await
.map_err(|_| Error::InvalidBlock(block_id))?;
rx.await?
}
pub async fn describe(&self) -> Result<FlowgraphDescription, Error> {
let (tx, rx) = oneshot::channel::<FlowgraphDescription>();
self.inbox
.send(FlowgraphMessage::FlowgraphDescription { tx })
.await
.or(Err(Error::FlowgraphTerminated))?;
let d = rx.await.or(Err(Error::FlowgraphTerminated))?;
Ok(d)
}
pub async fn describe_block(
&self,
block_id: impl Into<BlockId>,
) -> Result<BlockDescription, Error> {
let block_id = block_id.into();
let (tx, rx) = oneshot::channel::<Result<BlockDescription, Error>>();
self.inbox
.send(FlowgraphMessage::BlockDescription { block_id, tx })
.await
.map_err(|_| Error::InvalidBlock(block_id))?;
let d = rx.await.map_err(|_| Error::InvalidBlock(block_id))??;
Ok(d)
}
pub async fn stop(&self) -> Result<(), Error> {
self.inbox
.send(FlowgraphMessage::Terminate)
.await
.map_err(|_| Error::FlowgraphTerminated)?;
Ok(())
}
pub async fn stop_and_wait(&self) -> Result<(), Error> {
self.stop().await.map_err(|_| Error::FlowgraphTerminated)?;
while !self.inbox.is_closed() {
Timer::after(std::time::Duration::from_millis(200)).await;
}
Ok(())
}
}
impl FlowgraphBlockHandle {
pub fn id(&self) -> BlockId {
self.block_id
}
pub async fn post(&self, port_id: impl Into<PortId>, data: Pmt) -> Result<(), Error> {
self.flowgraph.post(self.block_id, port_id, data).await
}
pub async fn call(&self, port_id: impl Into<PortId>, data: Pmt) -> Result<Pmt, Error> {
self.flowgraph.call(self.block_id, port_id, data).await
}
pub async fn describe(&self) -> Result<BlockDescription, Error> {
self.flowgraph.describe_block(self.block_id).await
}
}