use async_lock::Mutex;
use async_lock::MutexGuard;
use std::fmt::Debug;
use std::sync::Arc;
use crate::runtime::Block;
use crate::runtime::BlockId;
use crate::runtime::BlockPortCtx;
use crate::runtime::BufferReader;
use crate::runtime::BufferWriter;
use crate::runtime::Error;
use crate::runtime::Kernel;
use crate::runtime::KernelInterface;
use crate::runtime::PortId;
use crate::runtime::WrappedKernel;
pub struct BlockRef<K: Kernel> {
id: BlockId,
block: Arc<Mutex<WrappedKernel<K>>>,
}
impl<K: Kernel> BlockRef<K> {
pub fn get(&self) -> Result<MutexGuard<'_, WrappedKernel<K>>, Error> {
self.block.try_lock().ok_or(Error::LockError)
}
}
impl<K: Kernel> Clone for BlockRef<K> {
fn clone(&self) -> Self {
Self {
id: self.id,
block: self.block.clone(),
}
}
}
impl<K: Kernel> Debug for BlockRef<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockRef")
.field("id", &self.id)
.field(
"instance_name",
&self.block.try_lock().map(|b| {
b.meta
.instance_name()
.map(String::from)
.unwrap_or("<unknown>".to_string())
}),
)
.finish()
}
}
impl<K: Kernel> From<BlockRef<K>> for BlockId {
fn from(value: BlockRef<K>) -> Self {
value.id
}
}
impl<K: Kernel> From<&BlockRef<K>> for BlockId {
fn from(value: &BlockRef<K>) -> Self {
value.id
}
}
pub struct Flowgraph {
pub(crate) blocks: Vec<Arc<Mutex<dyn Block>>>,
pub(crate) stream_edges: Vec<(BlockId, PortId, BlockId, PortId)>,
pub(crate) message_edges: Vec<(BlockId, PortId, BlockId, PortId)>,
}
impl Flowgraph {
pub fn new() -> Flowgraph {
Flowgraph {
blocks: Vec::new(),
stream_edges: vec![],
message_edges: vec![],
}
}
pub fn add_block<K: Kernel + KernelInterface + 'static>(&mut self, block: K) -> BlockRef<K> {
let block_id = BlockId(self.blocks.len());
let mut b = WrappedKernel::new(block, block_id);
let block_name = b.type_name();
b.set_instance_name(&format!("{}-{}", block_name, block_id.0));
let b = Arc::new(Mutex::new(b));
self.blocks.push(b.clone());
BlockRef {
id: block_id,
block: b,
}
}
pub fn connect_stream<B: BufferWriter>(&mut self, src_port: &mut B, dst_port: &mut B::Reader) {
self.stream_edges.push((
src_port.block_id(),
src_port.port_id(),
dst_port.block_id(),
dst_port.port_id(),
));
src_port.connect(dst_port);
}
pub fn connect_dyn(
&mut self,
src: impl Into<BlockId>,
src_port: impl Into<PortId>,
dst: impl Into<BlockId>,
dst_port: impl Into<PortId>,
) -> Result<(), Error> {
let src_id = src.into();
let src_port = src_port.into();
let dst = dst.into();
let dst_port: PortId = dst_port.into();
let src = self
.blocks
.get(src_id.0)
.ok_or(Error::InvalidBlock(src_id))?;
let dst = self.blocks.get(dst.0).ok_or(Error::InvalidBlock(dst))?;
let mut tmp = dst.try_lock().ok_or(Error::LockError)?;
let reader = tmp
.stream_input(dst_port.name())
.ok_or(Error::InvalidStreamPort(BlockPortCtx::Id(src_id), dst_port))?;
src.try_lock()
.ok_or(Error::LockError)?
.connect_stream_output(src_port.name(), reader)
}
pub fn connect_message(
&mut self,
src_block: impl Into<BlockId>,
src_port: impl Into<PortId>,
dst_block: impl Into<BlockId>,
dst_port: impl Into<PortId>,
) -> Result<(), Error> {
let src_id = src_block.into();
let dst_id = dst_block.into();
let src_port = src_port.into();
let dst_port = dst_port.into();
debug_assert_ne!(src_id, dst_id);
let mut src_block = self
.blocks
.get(src_id.0)
.ok_or(Error::InvalidBlock(src_id))?
.try_lock()
.ok_or_else(|| Error::RuntimeError(format!("unable to lock block {src_id:?}")))?;
let dst_block = self
.blocks
.get(dst_id.0)
.ok_or(Error::InvalidBlock(dst_id))?
.try_lock()
.ok_or_else(|| Error::RuntimeError(format!("unable to lock block {dst_id:?}")))?;
let dst_box = dst_block.inbox();
src_block.connect(&src_port, dst_box, &dst_port)?;
if !dst_block.message_inputs().contains(&dst_port.name()) {
return Err(Error::InvalidMessagePort(
BlockPortCtx::Id(dst_id),
dst_port,
));
}
self.message_edges
.push((src_id, src_port, dst_id, dst_port));
Ok(())
}
pub fn get_block(&self, id: BlockId) -> Result<Arc<Mutex<dyn Block>>, Error> {
Ok(self
.blocks
.get(id.0)
.ok_or(Error::InvalidBlock(id))?
.clone())
}
}
impl Default for Flowgraph {
fn default() -> Self {
Self::new()
}
}