use std::cell::RefCell;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use crate::runtime::BlockDescription;
use crate::runtime::BlockId;
use crate::runtime::BlockMessage;
use crate::runtime::BlockPortCtx;
use crate::runtime::Edge;
use crate::runtime::Error;
use crate::runtime::FlowgraphDescription;
use crate::runtime::FlowgraphId;
use crate::runtime::FlowgraphMessage;
use crate::runtime::Pmt;
use crate::runtime::PortId;
use crate::runtime::Result;
use crate::runtime::block::Block;
use crate::runtime::block::BlockObject;
use crate::runtime::buffer::BufferReader;
use crate::runtime::buffer::BufferWriter;
use crate::runtime::buffer::CircuitWriter;
use crate::runtime::buffer::SendBufferWriter;
use crate::runtime::channel::mpsc::Receiver;
use crate::runtime::channel::mpsc::Sender;
use crate::runtime::channel::oneshot;
use crate::runtime::dev::BlockInbox;
use crate::runtime::dev::BlockMeta;
use crate::runtime::dev::Kernel;
use crate::runtime::dev::SendKernel;
use crate::runtime::kernel_interface::KernelInterface;
use crate::runtime::kernel_interface::SendKernelInterface;
use crate::runtime::local_domain::LocalDomainRuntime;
use crate::runtime::local_domain_common::LocalDomainState;
use crate::runtime::scheduler::Scheduler;
use crate::runtime::wrapped_kernel::WrappedKernel;
static NEXT_FLOWGRAPH_ID: AtomicUsize = AtomicUsize::new(0);
pub struct TypedBlockGuard<'a, K> {
id: BlockId,
meta: &'a BlockMeta,
kernel: &'a K,
}
pub struct TypedBlockGuardMut<'a, K> {
id: BlockId,
meta: &'a mut BlockMeta,
kernel: &'a mut K,
}
impl<K> TypedBlockGuard<'_, K> {
pub fn id(&self) -> BlockId {
self.id
}
pub fn meta(&self) -> &BlockMeta {
self.meta
}
pub fn instance_name(&self) -> Option<&str> {
self.meta.instance_name()
}
}
impl<K> Deref for TypedBlockGuard<'_, K> {
type Target = K;
fn deref(&self) -> &Self::Target {
self.kernel
}
}
impl<K> TypedBlockGuardMut<'_, K> {
pub fn id(&self) -> BlockId {
self.id
}
pub fn meta(&self) -> &BlockMeta {
self.meta
}
pub fn meta_mut(&mut self) -> &mut BlockMeta {
self.meta
}
pub fn instance_name(&self) -> Option<&str> {
self.meta.instance_name()
}
pub fn set_instance_name(&mut self, name: &str) {
self.meta.set_instance_name(name);
}
}
impl<K> Deref for TypedBlockGuardMut<'_, K> {
type Target = K;
fn deref(&self) -> &Self::Target {
self.kernel
}
}
impl<K> DerefMut for TypedBlockGuardMut<'_, K> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.kernel
}
}
pub struct BlockRef<K> {
id: BlockId,
flowgraph_id: FlowgraphId,
placement: BlockPlacement,
_marker: PhantomData<fn() -> K>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BlockPlacement {
Normal,
Local { domain_id: usize, local_id: usize },
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct LocalEndpoint {
block_id: BlockId,
domain_id: usize,
local_id: usize,
}
impl LocalEndpoint {
fn new(block_id: BlockId, domain_id: usize, local_id: usize) -> Self {
Self {
block_id,
domain_id,
local_id,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum StreamPlan {
NormalNormal {
src: BlockId,
dst: BlockId,
},
LocalLocalSame {
src: LocalEndpoint,
dst: LocalEndpoint,
},
LocalLocalCross {
src: LocalEndpoint,
dst: LocalEndpoint,
},
LocalToNormal {
src: LocalEndpoint,
dst: BlockId,
},
NormalToLocal {
src: BlockId,
dst: LocalEndpoint,
},
}
pub(crate) struct StartupSnapshot {
inboxes: Vec<Option<BlockInbox>>,
ids: Vec<BlockId>,
stream_edges: Vec<Edge>,
message_edges: Vec<Edge>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct LocalDomain {
flowgraph_id: FlowgraphId,
domain_id: usize,
}
struct LocalDomainContextEntry {
placement: BlockPlacement,
inbox: BlockInbox,
message_inputs: &'static [&'static str],
}
struct LocalDomainContextInner<'a> {
flowgraph_id: FlowgraphId,
domain_id: usize,
next_block_id: usize,
next_local_id: usize,
entries: Vec<LocalDomainContextEntry>,
state: &'a mut LocalDomainState,
}
pub struct LocalDomainContext<'a> {
inner: RefCell<LocalDomainContextInner<'a>>,
}
impl<'a> LocalDomainContext<'a> {
fn new(
flowgraph_id: FlowgraphId,
domain_id: usize,
next_block_id: usize,
next_local_id: usize,
state: &'a mut LocalDomainState,
) -> Self {
Self {
inner: RefCell::new(LocalDomainContextInner {
flowgraph_id,
domain_id,
next_block_id,
next_local_id,
entries: Vec::new(),
state,
}),
}
}
fn take_entries(&self) -> Vec<LocalDomainContextEntry> {
std::mem::take(&mut self.inner.borrow_mut().entries)
}
pub fn add<K>(&self, block: K) -> BlockRef<K>
where
K: Kernel + KernelInterface + 'static,
{
let mut inner = self.inner.borrow_mut();
let block_id = BlockId(inner.next_block_id);
inner.next_block_id += 1;
let local_id = inner.next_local_id;
inner.next_local_id += 1;
let placement = BlockPlacement::Local {
domain_id: inner.domain_id,
local_id,
};
let mut block = WrappedKernel::new(block, block_id);
block
.meta
.set_instance_name(format!("{}-{}", K::type_name(), block_id.0));
let inbox = block.inbox();
inner
.state
.insert_block(local_id, Box::new(block))
.expect("failed to insert local-domain block");
inner.entries.push(LocalDomainContextEntry {
placement,
inbox,
message_inputs: K::message_inputs(),
});
BlockRef {
id: block_id,
flowgraph_id: inner.flowgraph_id,
placement,
_marker: PhantomData,
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn stream_local<KS, KD, B, FS, FD>(
&self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
B: BufferWriter + 'static,
FS: FnOnce(&mut KS) -> &mut B,
FD: FnOnce(&mut KD) -> &mut B::Reader,
{
crate::runtime::block_on(
self.stream_local_async::<KS, KD, B, FS, FD>(src_block, src_port, dst_block, dst_port),
)
}
pub async fn stream_local_async<KS, KD, B, FS, FD>(
&self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
B: BufferWriter + 'static,
FS: FnOnce(&mut KS) -> &mut B,
FD: FnOnce(&mut KD) -> &mut B::Reader,
{
let mut inner = self.inner.borrow_mut();
if src_block.flowgraph_id != inner.flowgraph_id {
return Err(Error::ValidationError(format!(
"block {:?} belongs to another flowgraph",
src_block.id
)));
}
if dst_block.flowgraph_id != inner.flowgraph_id {
return Err(Error::ValidationError(format!(
"block {:?} belongs to another flowgraph",
dst_block.id
)));
}
let (
BlockPlacement::Local {
domain_id: src_domain,
local_id: src_local,
},
BlockPlacement::Local {
domain_id: dst_domain,
local_id: dst_local,
},
) = (src_block.placement, dst_block.placement)
else {
return Err(Error::ValidationError(
"local-domain context stream connections require local blocks".to_string(),
));
};
if src_domain != inner.domain_id || dst_domain != inner.domain_id {
return Err(Error::ValidationError(
"local-domain context stream connections require blocks in this domain".to_string(),
));
}
let edge = {
let (src, dst) = Flowgraph::two_local_state_kernels_mut::<KS, KD>(
inner.state,
(src_local, src_block.id),
(dst_local, dst_block.id),
)?;
Flowgraph::connect_stream_ports(src_port(src), dst_port(dst))
};
inner.state.add_stream_edge(edge);
Ok(())
}
pub async fn stream_async<KS, KD, B, FS, FD>(
&self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
B: BufferWriter + 'static,
FS: FnOnce(&mut KS) -> &mut B,
FD: FnOnce(&mut KD) -> &mut B::Reader,
{
self.stream_local_async(src_block, src_port, dst_block, dst_port)
.await
}
#[cfg(not(target_arch = "wasm32"))]
pub fn message(
&self,
src_block_id: impl Into<BlockId>,
src_port_id: impl Into<PortId>,
dst_block_id: impl Into<BlockId>,
dst_port_id: impl Into<PortId>,
) -> Result<(), Error> {
crate::runtime::block_on(self.message_async(
src_block_id,
src_port_id,
dst_block_id,
dst_port_id,
))
}
pub async fn message_async(
&self,
src_block_id: impl Into<BlockId>,
src_port_id: impl Into<PortId>,
dst_block_id: impl Into<BlockId>,
dst_port_id: impl Into<PortId>,
) -> Result<(), Error> {
let src_block_id = src_block_id.into();
let src_port_id = src_port_id.into();
let dst_block_id = dst_block_id.into();
let dst_port_id = dst_port_id.into();
let mut inner = self.inner.borrow_mut();
let first_block_id = inner.next_block_id - inner.entries.len();
let src_placement = inner
.entries
.get(
src_block_id
.0
.checked_sub(first_block_id)
.ok_or(Error::InvalidBlock(src_block_id))?,
)
.map(|entry| entry.placement)
.ok_or(Error::InvalidBlock(src_block_id))?;
let dst_placement = inner
.entries
.get(
dst_block_id
.0
.checked_sub(first_block_id)
.ok_or(Error::InvalidBlock(dst_block_id))?,
)
.map(|entry| entry.placement)
.ok_or(Error::InvalidBlock(dst_block_id))?;
let (
BlockPlacement::Local {
domain_id: src_domain,
local_id: src_local,
..
},
BlockPlacement::Local {
domain_id: dst_domain,
local_id: dst_local,
..
},
) = (src_placement, dst_placement)
else {
return Err(Error::ValidationError(
"local-domain context message connections require local blocks".to_string(),
));
};
if src_domain != inner.domain_id || dst_domain != inner.domain_id {
return Err(Error::ValidationError(
"local-domain context message connections require blocks in this domain"
.to_string(),
));
}
let dst_block = inner.state.block(dst_local, dst_block_id)?;
if !dst_block.message_inputs().contains(&dst_port_id.name()) {
return Err(Error::InvalidMessagePort(
BlockPortCtx::Id(dst_block_id),
dst_port_id.clone(),
));
}
let dst_box = dst_block.inbox();
let src_block = inner.state.block_mut(src_local, src_block_id)?;
src_block.connect(&src_port_id, dst_box, &dst_port_id)?;
inner.state.add_message_edge(Edge::new(
src_block_id,
src_port_id,
dst_block_id,
dst_port_id,
));
Ok(())
}
}
pub(crate) struct BlockEntry {
block: Option<Box<dyn Block>>,
placement: BlockPlacement,
inbox: Option<BlockInbox>,
message_inputs: &'static [&'static str],
}
impl BlockEntry {
fn reserved(placement: BlockPlacement, message_inputs: &'static [&'static str]) -> Self {
Self {
block: None,
placement,
inbox: None,
message_inputs,
}
}
fn with_block(
block: Box<dyn Block>,
placement: BlockPlacement,
inbox: BlockInbox,
message_inputs: &'static [&'static str],
) -> Self {
Self {
block: Some(block),
placement,
inbox: Some(inbox),
message_inputs,
}
}
}
impl<K> BlockRef<K> {
pub fn id(&self) -> BlockId {
self.id
}
}
impl<K: 'static> BlockRef<K> {
pub fn get<'a>(&self, fg: &'a Flowgraph) -> Result<TypedBlockGuard<'a, K>, Error> {
fg.block(self)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with<R>(
&self,
fg: &Flowgraph,
f: impl FnOnce(&K) -> R + Send + 'static,
) -> Result<R, Error>
where
R: Send + 'static,
{
crate::runtime::block_on(self.with_async(fg, f))
}
pub async fn with_async<R>(
&self,
fg: &Flowgraph,
f: impl FnOnce(&K) -> R + Send + 'static,
) -> Result<R, Error>
where
R: Send + 'static,
{
fg.validate_block_ref(self)?;
match self.placement {
BlockPlacement::Normal => {
let block = fg.block(self)?;
Ok(f(&block))
}
BlockPlacement::Local {
domain_id,
local_id,
..
} => {
let domain = fg
.local_domains
.get(domain_id)
.ok_or(Error::InvalidBlock(self.id))?;
if domain.is_running() {
return Err(Error::LockError);
}
let block_id = self.id;
domain
.exec(move |state| {
Box::pin(async move {
Ok(f(Flowgraph::local_state_kernel_ref(
state, local_id, block_id,
)?))
})
})
.await
}
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_mut<R>(
&self,
fg: &mut Flowgraph,
f: impl FnOnce(&mut K) -> R + Send + 'static,
) -> Result<R, Error>
where
R: Send + 'static,
{
crate::runtime::block_on(self.with_mut_async(fg, f))
}
pub async fn with_mut_async<R>(
&self,
fg: &mut Flowgraph,
f: impl FnOnce(&mut K) -> R + Send + 'static,
) -> Result<R, Error>
where
R: Send + 'static,
{
fg.validate_block_ref(self)?;
match self.placement {
BlockPlacement::Normal => {
let mut block = fg.block_mut(self)?;
Ok(f(&mut block))
}
BlockPlacement::Local {
domain_id,
local_id,
..
} => {
let domain = fg
.local_domains
.get(domain_id)
.ok_or(Error::InvalidBlock(self.id))?;
if domain.is_running() {
return Err(Error::LockError);
}
let block_id = self.id;
domain
.exec(move |state| {
Box::pin(async move {
Ok(f(Flowgraph::local_state_kernel_mut(
state, local_id, block_id,
)?))
})
})
.await
}
}
}
}
impl<K> Copy for BlockRef<K> {}
impl<K> Clone for BlockRef<K> {
fn clone(&self) -> Self {
*self
}
}
impl<K> Debug for BlockRef<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockRef")
.field("id", &self.id)
.field("flowgraph_id", &self.flowgraph_id)
.field("placement", &self.placement)
.field("type_name", &std::any::type_name::<K>())
.finish()
}
}
impl<K> From<BlockRef<K>> for BlockId {
fn from(value: BlockRef<K>) -> Self {
value.id
}
}
impl<K> From<&BlockRef<K>> for BlockId {
fn from(value: &BlockRef<K>) -> Self {
value.id
}
}
pub struct Flowgraph {
pub(crate) id: FlowgraphId,
pub(crate) blocks: Vec<BlockEntry>,
pub(crate) local_domains: Vec<LocalDomainRuntime>,
pub(crate) stream_edges: Vec<Edge>,
pub(crate) message_edges: Vec<Edge>,
}
impl Flowgraph {
pub fn new() -> Flowgraph {
Flowgraph {
id: FlowgraphId(NEXT_FLOWGRAPH_ID.fetch_add(1, Ordering::Relaxed)),
blocks: Vec::new(),
local_domains: Vec::new(),
stream_edges: vec![],
message_edges: vec![],
}
}
pub fn local_domain(&mut self) -> Result<LocalDomain, Error> {
let domain_id = self.local_domains.len();
self.local_domains.push(LocalDomainRuntime::new()?);
Ok(LocalDomain {
flowgraph_id: self.id,
domain_id,
})
}
fn commit_local_context_entries(
&mut self,
domain_id: usize,
entries: Vec<LocalDomainContextEntry>,
) {
self.local_domains[domain_id].reserve_blocks(entries.len());
self.blocks
.extend(entries.into_iter().map(|entry| BlockEntry {
block: None,
placement: entry.placement,
inbox: Some(entry.inbox),
message_inputs: entry.message_inputs,
}));
}
#[cfg(not(target_arch = "wasm32"))]
pub fn domain_run<R>(
&mut self,
domain: LocalDomain,
f: impl FnOnce(&LocalDomainContext<'_>) -> Result<R, Error> + Send + 'static,
) -> Result<R, Error>
where
R: Send + 'static,
{
crate::runtime::block_on(
self.domain_run_async(domain, async move |ctx: &LocalDomainContext<'_>| f(ctx)),
)
}
pub async fn domain_run_async<R, F>(&mut self, domain: LocalDomain, f: F) -> Result<R, Error>
where
R: Send + 'static,
F: for<'a> std::ops::AsyncFnOnce(&'a LocalDomainContext<'a>) -> Result<R, Error>
+ Send
+ 'static,
{
let domain_id = self.validate_local_domain(domain)?;
if self.local_domains[domain_id].is_running() {
return Err(Error::LockError);
}
let next_block_id = self.blocks.len();
let next_local_id = self.local_domains[domain_id].block_count();
let flowgraph_id = self.id;
let (ret, entries) = self.local_domains[domain_id]
.exec(move |state| {
Box::pin(async move {
let ctx = LocalDomainContext::new(
flowgraph_id,
domain_id,
next_block_id,
next_local_id,
state,
);
let ret = f(&ctx).await?;
Ok((ret, ctx.take_entries()))
})
})
.await?;
self.commit_local_context_entries(domain_id, entries);
Ok(ret)
}
pub fn add<K>(&mut self, block: K) -> BlockRef<K>
where
K: SendKernel + SendKernelInterface + 'static,
{
#[cfg(not(target_arch = "wasm32"))]
{
crate::runtime::block_on(self.add_async(block))
}
#[cfg(target_arch = "wasm32")]
{
if <K as KernelInterface>::is_blocking() {
panic!("Flowgraph::add cannot add blocking blocks on wasm32; use add_async");
}
self.add_normal_kernel(block)
}
}
pub async fn add_async<K>(&mut self, block: K) -> BlockRef<K>
where
K: SendKernel + SendKernelInterface + 'static,
{
if <K as KernelInterface>::is_blocking() {
let domain_id = self.local_domains.len();
self.local_domains
.push(LocalDomainRuntime::new().expect("failed to create local domain"));
self.add_kernel_to_domain_async(domain_id, move || block)
.await
} else {
self.add_normal_kernel(block)
}
}
fn add_normal_kernel<K>(&mut self, block: K) -> BlockRef<K>
where
K: SendKernel + SendKernelInterface + 'static,
{
let block_id = BlockId(self.blocks.len());
let mut b = WrappedKernel::new(block, block_id);
let block_name = <K as KernelInterface>::type_name();
b.meta
.set_instance_name(format!("{}-{}", block_name, block_id.0));
let inbox = b.inbox();
self.add_normal_block(Box::new(b), inbox, <K as KernelInterface>::message_inputs())
}
fn reserve_block_id(
&mut self,
placement: BlockPlacement,
message_inputs: &'static [&'static str],
) -> BlockId {
let block_id = BlockId(self.blocks.len());
self.blocks
.push(BlockEntry::reserved(placement, message_inputs));
block_id
}
fn block_ref<K>(&self, block_id: BlockId, placement: BlockPlacement) -> BlockRef<K> {
BlockRef {
id: block_id,
flowgraph_id: self.id,
placement,
_marker: PhantomData,
}
}
fn add_normal_block<K>(
&mut self,
block: Box<dyn Block>,
inbox: BlockInbox,
message_inputs: &'static [&'static str],
) -> BlockRef<K> {
let block_id = BlockId(self.blocks.len());
let placement = BlockPlacement::Normal;
self.blocks.push(BlockEntry::with_block(
block,
placement,
inbox,
message_inputs,
));
self.block_ref(block_id, placement)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn add_local<K>(
&mut self,
domain: LocalDomain,
block: impl FnOnce() -> K + Send + 'static,
) -> BlockRef<K>
where
K: Kernel + KernelInterface + 'static,
{
let domain_id = self
.validate_local_domain(domain)
.expect("local domain belongs to another flowgraph");
self.add_kernel_to_domain(domain_id, block)
}
pub async fn add_local_async<K>(
&mut self,
domain: LocalDomain,
block: impl FnOnce() -> K + Send + 'static,
) -> BlockRef<K>
where
K: Kernel + KernelInterface + 'static,
{
let domain_id = self
.validate_local_domain(domain)
.expect("local domain belongs to another flowgraph");
self.add_kernel_to_domain_async(domain_id, block).await
}
#[cfg(not(target_arch = "wasm32"))]
fn add_kernel_to_domain<K>(
&mut self,
domain_id: usize,
block: impl FnOnce() -> K + Send + 'static,
) -> BlockRef<K>
where
K: Kernel + KernelInterface + 'static,
{
crate::runtime::block_on(self.add_kernel_to_domain_async(domain_id, block))
}
async fn add_kernel_to_domain_async<K>(
&mut self,
domain_id: usize,
block: impl FnOnce() -> K + Send + 'static,
) -> BlockRef<K>
where
K: Kernel + KernelInterface + 'static,
{
let local_id = self.local_domains[domain_id].reserve_block();
let placement = BlockPlacement::Local {
domain_id,
local_id,
};
let block_id = self.reserve_block_id(placement, K::message_inputs());
let inbox = self.local_domains[domain_id]
.build(
local_id,
Box::new(move || {
let mut block = WrappedKernel::new(block(), block_id);
block
.meta
.set_instance_name(format!("{}-{}", K::type_name(), block_id.0));
Box::new(block)
}),
)
.await
.expect("failed to build block in local domain");
let entry = &mut self.blocks[block_id.0];
entry.inbox = Some(inbox);
self.block_ref(block_id, placement)
}
pub(crate) fn validate_block_ref<K>(&self, block: &BlockRef<K>) -> Result<(), Error> {
if block.flowgraph_id != self.id {
return Err(Error::ValidationError(format!(
"block {:?} belongs to flowgraph {}, not {}",
block.id, block.flowgraph_id, self.id
)));
}
if self.blocks.get(block.id.0).map(|entry| entry.placement) != Some(block.placement) {
return Err(Error::InvalidBlock(block.id));
}
Ok(())
}
fn validate_local_domain(&self, domain: LocalDomain) -> Result<usize, Error> {
if domain.flowgraph_id != self.id {
return Err(Error::ValidationError(format!(
"local domain belongs to flowgraph {}, not {}",
domain.flowgraph_id, self.id
)));
}
if domain.domain_id >= self.local_domains.len() {
return Err(Error::ValidationError("invalid local domain".to_string()));
}
Ok(domain.domain_id)
}
fn placement(&self, block_id: BlockId) -> Result<BlockPlacement, Error> {
self.blocks
.get(block_id.0)
.map(|entry| entry.placement)
.ok_or(Error::InvalidBlock(block_id))
}
fn two_block_entries_mut(
&mut self,
first: BlockId,
second: BlockId,
) -> Result<(&mut BlockEntry, &mut BlockEntry), Error> {
if first == second {
return Err(Error::LockError);
}
let len = self.blocks.len();
let invalid_block = if first.0 >= len { first } else { second };
let [first_slot, second_slot] =
self.blocks
.get_disjoint_mut([first.0, second.0])
.map_err(|err| match err {
std::slice::GetDisjointMutError::IndexOutOfBounds => {
Error::InvalidBlock(invalid_block)
}
std::slice::GetDisjointMutError::OverlappingIndices => Error::LockError,
})?;
Ok((first_slot, second_slot))
}
fn stream_plan(
src_id: BlockId,
src: BlockPlacement,
dst_id: BlockId,
dst: BlockPlacement,
) -> StreamPlan {
match (src, dst) {
(BlockPlacement::Normal, BlockPlacement::Normal) => StreamPlan::NormalNormal {
src: src_id,
dst: dst_id,
},
(
BlockPlacement::Local {
domain_id: src_domain,
local_id: src_local,
},
BlockPlacement::Local {
domain_id: dst_domain,
local_id: dst_local,
},
) => {
let src = LocalEndpoint::new(src_id, src_domain, src_local);
let dst = LocalEndpoint::new(dst_id, dst_domain, dst_local);
if src_domain == dst_domain {
StreamPlan::LocalLocalSame { src, dst }
} else {
StreamPlan::LocalLocalCross { src, dst }
}
}
(
BlockPlacement::Local {
domain_id,
local_id,
},
BlockPlacement::Normal,
) => StreamPlan::LocalToNormal {
src: LocalEndpoint::new(src_id, domain_id, local_id),
dst: dst_id,
},
(
BlockPlacement::Normal,
BlockPlacement::Local {
domain_id,
local_id,
},
) => StreamPlan::NormalToLocal {
src: src_id,
dst: LocalEndpoint::new(dst_id, domain_id, local_id),
},
}
}
fn stream_plan_by_id(&self, src_id: BlockId, dst_id: BlockId) -> Result<StreamPlan, Error> {
Ok(Self::stream_plan(
src_id,
self.placement(src_id)?,
dst_id,
self.placement(dst_id)?,
))
}
fn raw_block(&self, block_id: BlockId) -> Result<&dyn BlockObject, Error> {
match self.placement(block_id)? {
BlockPlacement::Normal => self
.blocks
.get(block_id.0)
.ok_or(Error::InvalidBlock(block_id))?
.block
.as_ref()
.map(|block| block.as_ref() as &dyn BlockObject)
.ok_or(Error::LockError),
BlockPlacement::Local { .. } => Err(Error::LockError),
}
}
fn raw_block_mut(&mut self, block_id: BlockId) -> Result<&mut dyn BlockObject, Error> {
match self.placement(block_id)? {
BlockPlacement::Normal => self
.blocks
.get_mut(block_id.0)
.ok_or(Error::InvalidBlock(block_id))?
.block
.as_mut()
.map(|block| block.as_mut() as &mut dyn BlockObject)
.ok_or(Error::LockError),
BlockPlacement::Local { .. } => Err(Error::LockError),
}
}
fn get_typed_wrapped_block_by_id<K: 'static>(
&self,
block_id: BlockId,
) -> Result<&WrappedKernel<K>, Error> {
let block = self.raw_block(block_id)?;
block
.as_any()
.downcast_ref::<WrappedKernel<K>>()
.ok_or_else(|| {
Error::ValidationError(format!(
"block {:?} has unexpected type for {}",
block_id,
std::any::type_name::<K>()
))
})
}
fn get_typed_wrapped_block_mut_by_id<K: 'static>(
&mut self,
block_id: BlockId,
) -> Result<&mut WrappedKernel<K>, Error> {
let block = self.raw_block_mut(block_id)?;
block
.as_any_mut()
.downcast_mut::<WrappedKernel<K>>()
.ok_or_else(|| {
Error::ValidationError(format!(
"block {:?} has unexpected type for {}",
block_id,
std::any::type_name::<K>()
))
})
}
fn get_two_typed_wrapped_blocks_mut<KS, KD>(
&mut self,
src_id: BlockId,
dst_id: BlockId,
) -> Result<(&mut WrappedKernel<KS>, &mut WrappedKernel<KD>), Error>
where
KS: 'static,
KD: 'static,
{
let (src_slot, dst_slot) = self.two_block_entries_mut(src_id, dst_id)?;
let src = src_slot
.block
.as_mut()
.ok_or(Error::LockError)?
.as_mut()
.as_any_mut()
.downcast_mut::<WrappedKernel<KS>>()
.ok_or_else(|| {
Error::ValidationError(format!(
"block {:?} has unexpected type for {}",
src_id,
std::any::type_name::<KS>()
))
})?;
let dst = dst_slot
.block
.as_mut()
.ok_or(Error::LockError)?
.as_mut()
.as_any_mut()
.downcast_mut::<WrappedKernel<KD>>()
.ok_or_else(|| {
Error::ValidationError(format!(
"block {:?} has unexpected type for {}",
dst_id,
std::any::type_name::<KD>()
))
})?;
Ok((src, dst))
}
fn local_kernel_ref<K: 'static>(
block: &dyn BlockObject,
block_id: BlockId,
) -> Result<&K, Error> {
block
.as_any()
.downcast_ref::<WrappedKernel<K>>()
.map(|block| &block.kernel)
.ok_or_else(|| {
Error::ValidationError(format!(
"local block {:?} has unexpected type for {}",
block_id,
std::any::type_name::<K>()
))
})
}
fn local_kernel_mut<K: 'static>(
block: &mut dyn BlockObject,
block_id: BlockId,
) -> Result<&mut K, Error> {
block
.as_any_mut()
.downcast_mut::<WrappedKernel<K>>()
.map(|block| &mut block.kernel)
.ok_or_else(|| {
Error::ValidationError(format!(
"local block {:?} has unexpected type for {}",
block_id,
std::any::type_name::<K>()
))
})
}
fn local_state_kernel_ref<K: 'static>(
state: &LocalDomainState,
local_id: usize,
block_id: BlockId,
) -> Result<&K, Error> {
let block = state.block(local_id, block_id)?;
Self::local_kernel_ref(block, block_id)
}
fn local_state_kernel_mut<K: 'static>(
state: &mut LocalDomainState,
local_id: usize,
block_id: BlockId,
) -> Result<&mut K, Error> {
let block = state.block_mut(local_id, block_id)?;
Self::local_kernel_mut(block, block_id)
}
fn two_local_state_kernels_mut<KS: 'static, KD: 'static>(
state: &mut LocalDomainState,
src: (usize, BlockId),
dst: (usize, BlockId),
) -> Result<(&mut KS, &mut KD), Error> {
let (src_local, src_id) = src;
let (dst_local, dst_id) = dst;
let (src_block, dst_block) =
state.two_blocks_mut((src_local, src_id), (dst_local, dst_id))?;
let src = Self::local_kernel_mut(src_block, src_id)?;
let dst = Self::local_kernel_mut(dst_block, dst_id)?;
Ok((src, dst))
}
fn get_typed_block_by_id<K: 'static>(
&self,
block_id: BlockId,
) -> Result<TypedBlockGuard<'_, K>, Error> {
let wrapped = self.get_typed_wrapped_block_by_id(block_id)?;
Ok(TypedBlockGuard {
id: wrapped.id,
meta: &wrapped.meta,
kernel: &wrapped.kernel,
})
}
pub fn block<K: 'static>(&self, block: &BlockRef<K>) -> Result<TypedBlockGuard<'_, K>, Error> {
self.validate_block_ref(block)?;
self.get_typed_block_by_id(block.id)
}
pub fn block_mut<K: 'static>(
&mut self,
block: &BlockRef<K>,
) -> Result<TypedBlockGuardMut<'_, K>, Error> {
self.validate_block_ref(block)?;
let wrapped = self.get_typed_wrapped_block_mut_by_id::<K>(block.id)?;
Ok(TypedBlockGuardMut {
id: wrapped.id,
meta: &mut wrapped.meta,
kernel: &mut wrapped.kernel,
})
}
fn connect_stream_ports<B: BufferWriter>(src_port: &mut B, dst_port: &mut B::Reader) -> Edge {
let edge = Edge::new(
src_port.block_id(),
src_port.port_id(),
dst_port.block_id(),
dst_port.port_id(),
);
src_port.connect(dst_port);
edge
}
fn connect_stream_ports_dyn(
src_block_id: BlockId,
src_port_id: &PortId,
src_block: &mut dyn BlockObject,
dst_block_id: BlockId,
dst_port_id: &PortId,
dst_block: &mut dyn BlockObject,
) -> Result<Edge, Error> {
let reader = dst_block.stream_input(dst_port_id).map_err(|e| match e {
Error::InvalidStreamPort(_, port) => {
Error::InvalidStreamPort(crate::runtime::BlockPortCtx::Id(dst_block_id), port)
}
o => o,
})?;
src_block
.connect_stream_output(src_port_id, reader)
.map_err(|e| match e {
Error::InvalidStreamPort(_, port) => {
Error::InvalidStreamPort(crate::runtime::BlockPortCtx::Id(src_block_id), port)
}
o => o,
})?;
Ok(Edge::new(
src_block_id,
src_port_id.clone(),
dst_block_id,
dst_port_id.clone(),
))
}
async fn connect_local_local_stream_async<KS, KD, B, FS, FD>(
&self,
src: LocalEndpoint,
src_port: FS,
dst: LocalEndpoint,
dst_port: FD,
) -> Result<Edge, Error>
where
KS: 'static,
KD: 'static,
B: BufferWriter,
FS: FnOnce(&mut KS) -> &mut B + Send + 'static,
FD: FnOnce(&mut KD) -> &mut B::Reader + Send + 'static,
{
if src.domain_id != dst.domain_id {
return Err(Error::ValidationError(
"stream connections between different local domains are not supported".to_string(),
));
}
let domain = self
.local_domains
.get(src.domain_id)
.ok_or(Error::InvalidBlock(src.block_id))?;
domain
.exec(move |state| {
let result = (|| {
let (src, dst) = Self::two_local_state_kernels_mut::<KS, KD>(
state,
(src.local_id, src.block_id),
(dst.local_id, dst.block_id),
)?;
Ok(Self::connect_stream_ports(src_port(src), dst_port(dst)))
})();
Box::pin(futures::future::ready(result))
})
.await
}
async fn connect_cross_local_stream_async<KS, KD, B, FS, FD>(
&self,
src: LocalEndpoint,
src_port: FS,
dst: LocalEndpoint,
dst_port: FD,
) -> Result<Edge, Error>
where
KS: 'static,
KD: 'static,
B: SendBufferWriter + Default + 'static,
FS: FnOnce(&mut KS) -> &mut B + Send + 'static,
FD: FnOnce(&mut KD) -> &mut B::Reader + Send + 'static,
{
let src_handle = self
.local_domains
.get(src.domain_id)
.ok_or(Error::InvalidBlock(src.block_id))?
.handle();
let dst_handle = self
.local_domains
.get(dst.domain_id)
.ok_or(Error::InvalidBlock(dst.block_id))?
.handle();
src_handle
.exec(move |state| {
Box::pin(async move {
let src =
Self::local_state_kernel_mut::<KS>(state, src.local_id, src.block_id)?;
let src_port = src_port(src);
let writer = Arc::new(Mutex::new(Some(std::mem::take(src_port))));
let dst_writer = Arc::clone(&writer);
let edge_result = dst_handle
.exec(move |state| {
let result = (|| {
let mut writer_guard =
dst_writer.lock().map_err(|_| Error::LockError)?;
let writer = writer_guard.as_mut().ok_or(Error::LockError)?;
let dst = Self::local_state_kernel_mut::<KD>(
state,
dst.local_id,
dst.block_id,
)?;
Ok(Self::connect_stream_ports(writer, dst_port(dst)))
})();
Box::pin(futures::future::ready(result))
})
.await;
let mut writer_guard = writer.lock().map_err(|_| Error::LockError)?;
*src_port = writer_guard.take().ok_or(Error::LockError)?;
edge_result
})
})
.await
}
fn wrapped_kernel_mut<K: 'static>(
block: &mut dyn BlockObject,
block_id: BlockId,
) -> Result<&mut WrappedKernel<K>, Error> {
block
.as_any_mut()
.downcast_mut::<WrappedKernel<K>>()
.ok_or_else(|| {
Error::ValidationError(format!(
"block {:?} has unexpected type for {}",
block_id,
std::any::type_name::<K>()
))
})
}
async fn with_normal_local_blocks_mut_async<R, F>(
&mut self,
normal_id: BlockId,
local: LocalEndpoint,
f: F,
) -> Result<R, Error>
where
F: FnOnce(&mut dyn BlockObject, &mut dyn BlockObject) -> Result<R, Error> + Send + 'static,
R: Send + 'static,
{
let normal = self.blocks[normal_id.0]
.block
.take()
.ok_or(Error::LockError)?;
let (normal, result) = self.local_domains[local.domain_id]
.exec(move |state| {
Box::pin(async move {
let mut normal = normal;
let result = (|| {
let local = state.block_mut(local.local_id, local.block_id)?;
f(normal.as_mut(), local)
})();
Ok((normal, result))
})
})
.await?;
self.blocks[normal_id.0].block = Some(normal);
result
}
async fn connect_local_normal_stream_async<KS, KD, B, FS, FD>(
&mut self,
src: LocalEndpoint,
src_port: FS,
dst_id: BlockId,
dst_port: FD,
) -> Result<Edge, Error>
where
KS: 'static,
KD: 'static,
B: SendBufferWriter + 'static,
FS: FnOnce(&mut KS) -> &mut B + Send + 'static,
FD: FnOnce(&mut KD) -> &mut B::Reader + Send + 'static,
{
self.with_normal_local_blocks_mut_async(dst_id, src, move |dst, src_block| {
let src = Self::local_kernel_mut::<KS>(src_block, src.block_id)?;
let dst = Self::wrapped_kernel_mut::<KD>(dst, dst_id)?;
Ok(Self::connect_stream_ports(
src_port(src),
dst_port(&mut dst.kernel),
))
})
.await
}
async fn connect_normal_local_stream_async<KS, KD, B, FS, FD>(
&mut self,
src_id: BlockId,
src_port: FS,
dst: LocalEndpoint,
dst_port: FD,
) -> Result<Edge, Error>
where
KS: 'static,
KD: 'static,
B: SendBufferWriter + 'static,
FS: FnOnce(&mut KS) -> &mut B + Send + 'static,
FD: FnOnce(&mut KD) -> &mut B::Reader + Send + 'static,
{
self.with_normal_local_blocks_mut_async(src_id, dst, move |src, dst_block| {
let src = Self::wrapped_kernel_mut::<KS>(src, src_id)?;
let dst = Self::local_kernel_mut::<KD>(dst_block, dst.block_id)?;
Ok(Self::connect_stream_ports(
src_port(&mut src.kernel),
dst_port(dst),
))
})
.await
}
fn connect_normal_normal_stream_dyn(
&mut self,
src_block_id: BlockId,
src_port_id: &PortId,
dst_block_id: BlockId,
dst_port_id: &PortId,
) -> Result<Edge, Error> {
let (src_slot, dst_slot) = self.two_block_entries_mut(src_block_id, dst_block_id)?;
let src_block = src_slot
.block
.as_mut()
.map(Box::as_mut)
.ok_or(Error::LockError)?;
let dst_block = dst_slot
.block
.as_mut()
.map(Box::as_mut)
.ok_or(Error::LockError)?;
Self::connect_stream_ports_dyn(
src_block_id,
src_port_id,
src_block,
dst_block_id,
dst_port_id,
dst_block,
)
}
async fn connect_local_local_stream_dyn_async(
&self,
src: LocalEndpoint,
src_port_id: PortId,
dst: LocalEndpoint,
dst_port_id: PortId,
) -> Result<Edge, Error> {
if src.domain_id != dst.domain_id {
return Err(Error::ValidationError(
"stream connections between different local domains are not supported".to_string(),
));
}
let domain = self
.local_domains
.get(src.domain_id)
.ok_or(Error::InvalidBlock(src.block_id))?;
domain
.exec(move |state| {
let result = (|| {
let (src_block, dst_block) = state.two_blocks_mut(
(src.local_id, src.block_id),
(dst.local_id, dst.block_id),
)?;
Self::connect_stream_ports_dyn(
src.block_id,
&src_port_id,
src_block,
dst.block_id,
&dst_port_id,
dst_block,
)
})();
Box::pin(futures::future::ready(result))
})
.await
}
async fn connect_local_normal_stream_dyn_async(
&mut self,
src: LocalEndpoint,
src_port_id: PortId,
dst_id: BlockId,
dst_port_id: PortId,
) -> Result<Edge, Error> {
self.with_normal_local_blocks_mut_async(dst_id, src, move |dst_block, src_block| {
Self::connect_stream_ports_dyn(
src.block_id,
&src_port_id,
src_block,
dst_id,
&dst_port_id,
dst_block,
)
})
.await
}
async fn connect_normal_local_stream_dyn_async(
&mut self,
src_id: BlockId,
src_port_id: PortId,
dst: LocalEndpoint,
dst_port_id: PortId,
) -> Result<Edge, Error> {
self.with_normal_local_blocks_mut_async(src_id, dst, move |src_block, dst_block| {
Self::connect_stream_ports_dyn(
src_id,
&src_port_id,
src_block,
dst.block_id,
&dst_port_id,
dst_block,
)
})
.await
}
#[cfg(not(target_arch = "wasm32"))]
pub fn stream<KS, KD, B, FS, FD>(
&mut self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
B: SendBufferWriter + Default + 'static,
FS: FnOnce(&mut KS) -> &mut B + Send + 'static,
FD: FnOnce(&mut KD) -> &mut B::Reader + Send + 'static,
{
crate::runtime::block_on(
self.stream_async::<KS, KD, B, FS, FD>(src_block, src_port, dst_block, dst_port),
)
}
pub async fn stream_async<KS, KD, B, FS, FD>(
&mut self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
B: SendBufferWriter + Default + 'static,
FS: FnOnce(&mut KS) -> &mut B + Send + 'static,
FD: FnOnce(&mut KD) -> &mut B::Reader + Send + 'static,
{
self.validate_block_ref(src_block)?;
self.validate_block_ref(dst_block)?;
let src_id = src_block.id;
let dst_id = dst_block.id;
let edge = match Self::stream_plan(src_id, src_block.placement, dst_id, dst_block.placement)
{
StreamPlan::NormalNormal {
src: src_id,
dst: dst_id,
} => {
let (src, dst) = self.get_two_typed_wrapped_blocks_mut(src_id, dst_id)?;
Self::connect_stream_ports(src_port(&mut src.kernel), dst_port(&mut dst.kernel))
}
StreamPlan::LocalLocalSame { src, dst } => {
self.connect_local_local_stream_async::<KS, KD, B, FS, FD>(
src, src_port, dst, dst_port,
)
.await?
}
StreamPlan::LocalLocalCross { src, dst } => {
self.connect_cross_local_stream_async::<KS, KD, B, FS, FD>(
src, src_port, dst, dst_port,
)
.await?
}
StreamPlan::LocalToNormal { src, dst } => {
self.connect_local_normal_stream_async::<KS, KD, B, FS, FD>(
src, src_port, dst, dst_port,
)
.await?
}
StreamPlan::NormalToLocal { src, dst } => {
self.connect_normal_local_stream_async::<KS, KD, B, FS, FD>(
src, src_port, dst, dst_port,
)
.await?
}
};
self.stream_edges.push(edge);
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn stream_local<KS, KD, B, FS, FD>(
&mut self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
B: BufferWriter + 'static,
FS: FnOnce(&mut KS) -> &mut B + Send + 'static,
FD: FnOnce(&mut KD) -> &mut B::Reader + Send + 'static,
{
crate::runtime::block_on(
self.stream_local_async::<KS, KD, B, FS, FD>(src_block, src_port, dst_block, dst_port),
)
}
pub async fn stream_local_async<KS, KD, B, FS, FD>(
&mut self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
B: BufferWriter + 'static,
FS: FnOnce(&mut KS) -> &mut B + Send + 'static,
FD: FnOnce(&mut KD) -> &mut B::Reader + Send + 'static,
{
self.validate_block_ref(src_block)?;
self.validate_block_ref(dst_block)?;
let src_id = src_block.id;
let dst_id = dst_block.id;
let edge = match Self::stream_plan(src_id, src_block.placement, dst_id, dst_block.placement)
{
StreamPlan::LocalLocalSame { src, dst } => {
self.connect_local_local_stream_async::<KS, KD, B, FS, FD>(
src, src_port, dst, dst_port,
)
.await?
}
StreamPlan::LocalLocalCross { .. } => {
return Err(Error::ValidationError(
"stream connections between different local domains are not supported"
.to_string(),
));
}
_ => {
return Err(Error::ValidationError(
"local stream connections require source and destination blocks in the same local domain"
.to_string(),
));
}
};
self.stream_edges.push(edge);
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn close_circuit<KS, KD, CW, FS, FD>(
&mut self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
CW: CircuitWriter + 'static,
FS: FnOnce(&mut KS) -> &mut CW,
FD: FnOnce(&mut KD) -> &mut CW::CircuitEnd,
{
crate::runtime::block_on(
self.close_circuit_async::<KS, KD, CW, FS, FD>(
src_block, src_port, dst_block, dst_port,
),
)
}
pub async fn close_circuit_async<KS, KD, CW, FS, FD>(
&mut self,
src_block: &BlockRef<KS>,
src_port: FS,
dst_block: &BlockRef<KD>,
dst_port: FD,
) -> Result<(), Error>
where
KS: 'static,
KD: 'static,
CW: CircuitWriter + 'static,
FS: FnOnce(&mut KS) -> &mut CW,
FD: FnOnce(&mut KD) -> &mut CW::CircuitEnd,
{
self.validate_block_ref(src_block)?;
self.validate_block_ref(dst_block)?;
let (src, dst) = self.get_two_typed_wrapped_blocks_mut(src_block.id, dst_block.id)?;
src_port(&mut src.kernel).close_circuit(dst_port(&mut dst.kernel));
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn stream_dyn(
&mut self,
src_block_id: impl Into<BlockId>,
src_port_id: impl Into<PortId>,
dst_block_id: impl Into<BlockId>,
dst_port_id: impl Into<PortId>,
) -> Result<(), Error> {
crate::runtime::block_on(self.stream_dyn_async(
src_block_id,
src_port_id,
dst_block_id,
dst_port_id,
))
}
pub async fn stream_dyn_async(
&mut self,
src_block_id: impl Into<BlockId>,
src_port_id: impl Into<PortId>,
dst_block_id: impl Into<BlockId>,
dst_port_id: impl Into<PortId>,
) -> Result<(), Error> {
let src_block_id = src_block_id.into();
let src_port_id = src_port_id.into();
let dst_block_id = dst_block_id.into();
let dst_port_id = dst_port_id.into();
let edge = match self.stream_plan_by_id(src_block_id, dst_block_id)? {
StreamPlan::NormalNormal { src, dst } => {
self.connect_normal_normal_stream_dyn(src, &src_port_id, dst, &dst_port_id)?
}
StreamPlan::LocalLocalSame { .. } | StreamPlan::LocalLocalCross { .. } => {
return Err(Error::ValidationError(
"stream_dyn does not connect local-local streams; use stream_local_dyn for same-domain local stream buffers"
.to_string(),
));
}
StreamPlan::LocalToNormal { src, dst } => {
self.connect_local_normal_stream_dyn_async(src, src_port_id, dst, dst_port_id)
.await?
}
StreamPlan::NormalToLocal { src, dst } => {
self.connect_normal_local_stream_dyn_async(src, src_port_id, dst, dst_port_id)
.await?
}
};
self.stream_edges.push(edge);
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn stream_local_dyn(
&mut self,
src_block_id: impl Into<BlockId>,
src_port_id: impl Into<PortId>,
dst_block_id: impl Into<BlockId>,
dst_port_id: impl Into<PortId>,
) -> Result<(), Error> {
crate::runtime::block_on(self.stream_local_dyn_async(
src_block_id,
src_port_id,
dst_block_id,
dst_port_id,
))
}
pub async fn stream_local_dyn_async(
&mut self,
src_block_id: impl Into<BlockId>,
src_port_id: impl Into<PortId>,
dst_block_id: impl Into<BlockId>,
dst_port_id: impl Into<PortId>,
) -> Result<(), Error> {
let src_block_id = src_block_id.into();
let src_port_id = src_port_id.into();
let dst_block_id = dst_block_id.into();
let dst_port_id = dst_port_id.into();
let edge = match self.stream_plan_by_id(src_block_id, dst_block_id)? {
StreamPlan::LocalLocalSame { src, dst } => {
self.connect_local_local_stream_dyn_async(src, src_port_id, dst, dst_port_id)
.await?
}
StreamPlan::LocalLocalCross { .. } => {
return Err(Error::ValidationError(
"stream connections between different local domains are not supported"
.to_string(),
));
}
_ => {
return Err(Error::ValidationError(
"local dynamic stream connections require source and destination blocks in the same local domain"
.to_string(),
));
}
};
self.stream_edges.push(edge);
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn message(
&mut self,
src_block_id: impl Into<BlockId>,
src_port_id: impl Into<PortId>,
dst_block_id: impl Into<BlockId>,
dst_port_id: impl Into<PortId>,
) -> Result<(), Error> {
crate::runtime::block_on(self.message_async(
src_block_id,
src_port_id,
dst_block_id,
dst_port_id,
))
}
pub async fn message_async(
&mut self,
src_block_id: impl Into<BlockId>,
src_port_id: impl Into<PortId>,
dst_block_id: impl Into<BlockId>,
dst_port_id: impl Into<PortId>,
) -> Result<(), Error> {
let src_block_id = src_block_id.into();
let src_port_id = src_port_id.into();
let dst_block_id = dst_block_id.into();
let dst_port_id = dst_port_id.into();
let dst_inputs = self
.blocks
.get(dst_block_id.0)
.map(|entry| entry.message_inputs)
.ok_or(Error::InvalidBlock(dst_block_id))?;
if !dst_inputs.contains(&dst_port_id.name()) {
return Err(Error::InvalidMessagePort(
BlockPortCtx::Id(dst_block_id),
dst_port_id.clone(),
));
}
let dst_box = self
.blocks
.get(dst_block_id.0)
.and_then(|entry| entry.inbox.as_ref())
.cloned()
.ok_or(Error::InvalidBlock(dst_block_id))?;
match self.placement(src_block_id)? {
BlockPlacement::Normal => {
let src_block = self.raw_block_mut(src_block_id)?;
src_block.connect(&src_port_id, dst_box, &dst_port_id)?;
}
BlockPlacement::Local {
domain_id,
local_id,
..
} => {
let src_port = src_port_id.clone();
let dst_port = dst_port_id.clone();
self.local_domains[domain_id]
.exec(move |state| {
let result = (|| {
let src_block = state.block_mut(local_id, src_block_id)?;
src_block.connect(&src_port, dst_box, &dst_port)
})();
Box::pin(futures::future::ready(result))
})
.await?;
}
}
self.message_edges.push(Edge::new(
src_block_id,
src_port_id,
dst_block_id,
dst_port_id,
));
Ok(())
}
pub(crate) async fn run_flowgraph<S: Scheduler>(
mut self,
scheduler: S,
main_channel: Sender<FlowgraphMessage>,
main_rx: Receiver<FlowgraphMessage>,
initialized: oneshot::Sender<Result<(), Error>>,
) -> Result<Flowgraph, Error> {
debug!("in run_flowgraph");
let StartupSnapshot {
mut inboxes,
ids,
stream_edges,
message_edges,
} = self.startup_snapshot()?.await?;
let stream_edges_desc = Self::edge_endpoints(&stream_edges);
let message_edges_desc = Self::edge_endpoints(&message_edges);
let blocks = self.take_blocks()?;
let block_tasks = scheduler.run_domain(blocks, &main_channel);
let local_tasks = self.run_local_domains(main_channel.clone()).await?;
let run_result: Result<(), Error> = async {
debug!("init blocks");
let mut active_blocks = 0u32;
for inbox in inboxes.iter_mut().flatten() {
inbox.send(BlockMessage::Initialize).await?;
active_blocks += 1;
}
debug!("wait for blocks init");
let mut i = active_blocks;
let mut queue = Vec::new();
let mut block_error = false;
loop {
if i == 0 {
break;
}
let m = main_rx.recv().await.ok_or_else(|| {
Error::RuntimeError("no reply from blocks during init phase".to_string())
})?;
match m {
FlowgraphMessage::Initialized => i -= 1,
FlowgraphMessage::BlockError { block_id } => {
i -= 1;
active_blocks -= 1;
block_error = true;
error!("flowgraph init: block {:?} reported an error", block_id);
}
x => {
debug!(
"queueing unhandled message received during initialization {:?}",
&x
);
queue.push(x);
}
}
}
debug!("running blocks");
for inbox in inboxes.iter_mut().flatten() {
inbox.notify();
if inbox.is_closed() {
debug!("runtime wanted to start block that already terminated");
}
}
for m in queue.into_iter() {
main_channel.try_send(m)?;
}
initialized.send(Ok(())).map_err(|_| {
Error::RuntimeError("main thread panic during flowgraph init".to_string())
})?;
if block_error {
main_channel.try_send(FlowgraphMessage::Terminate)?;
}
let mut terminated = false;
loop {
if active_blocks == 0 {
break;
}
let m = main_rx.recv().await.ok_or_else(|| {
Error::RuntimeError("all senders to flowgraph inbox dropped".to_string())
})?;
match m {
FlowgraphMessage::BlockCall {
block_id,
port_id,
data,
tx,
} => {
if let Some(Some(inbox)) = inboxes.get_mut(block_id.0) {
if inbox
.send(BlockMessage::Call { port_id, data })
.await
.is_ok()
{
let _ = tx.send(Ok(()));
} else {
let _ = tx.send(Err(Error::BlockTerminated));
}
} else {
let _ = tx.send(Err(Error::InvalidBlock(block_id)));
}
}
FlowgraphMessage::BlockCallback {
block_id,
port_id,
data,
tx,
} => {
let (block_tx, block_rx) = oneshot::channel::<Result<Pmt, Error>>();
if let Some(Some(inbox)) = inboxes.get_mut(block_id.0) {
if inbox
.send(BlockMessage::Callback {
port_id,
data,
tx: block_tx,
})
.await
.is_ok()
{
match block_rx.await? {
Ok(p) => tx.send(Ok(p)).ok(),
Err(e) => tx.send(Err(Error::HandlerError(e.to_string()))).ok(),
};
} else {
let _ = tx.send(Err(Error::BlockTerminated));
}
} else {
let _ = tx.send(Err(Error::InvalidBlock(block_id)));
}
}
FlowgraphMessage::BlockDone { .. } => {
active_blocks -= 1;
}
FlowgraphMessage::BlockError { .. } => {
block_error = true;
active_blocks -= 1;
let _ = main_channel.send(FlowgraphMessage::Terminate).await;
}
FlowgraphMessage::BlockDescription { block_id, tx } => {
if let Some(Some(b)) = inboxes.get_mut(block_id.0) {
let (b_tx, rx) = oneshot::channel::<BlockDescription>();
if b.send(BlockMessage::BlockDescription { tx: b_tx })
.await
.is_ok()
{
if let Ok(b) = rx.await {
let _ = tx.send(Ok(b));
} else {
let _ = tx.send(Err(Error::RuntimeError(format!(
"Block {block_id:?} terminated or crashed"
))));
}
} else {
let _ = tx.send(Err(Error::BlockTerminated));
}
} else {
let _ = tx.send(Err(Error::InvalidBlock(block_id)));
}
}
FlowgraphMessage::FlowgraphDescription { tx } => {
let mut blocks = Vec::new();
for id in ids.iter() {
let (b_tx, rx) = oneshot::channel::<BlockDescription>();
if let Some(Some(inbox)) = inboxes.get_mut(id.0)
&& inbox
.send(BlockMessage::BlockDescription { tx: b_tx })
.await
.is_ok()
{
blocks.push(rx.await?);
}
}
if tx
.send(FlowgraphDescription {
blocks,
stream_edges: stream_edges_desc.clone(),
message_edges: message_edges_desc.clone(),
})
.is_err()
{
error!(
"Failed to send flowgraph description. Receiver may have disconnected."
);
}
}
FlowgraphMessage::Terminate => {
if !terminated {
for inbox in inboxes.iter_mut().flatten() {
if inbox.send(BlockMessage::Terminate).await.is_err() {
debug!(
"runtime tried to terminate block that was already terminated"
);
}
}
terminated = true;
}
}
_ => warn!("main loop received unhandled message"),
}
}
if block_error {
Err(Error::RuntimeError("A block raised an error".to_string()))
} else {
Ok(())
}
}
.await;
if run_result.is_err() {
for inbox in inboxes.iter_mut().flatten() {
if inbox.send(BlockMessage::Terminate).await.is_err() {
debug!("runtime tried to terminate block during shutdown cleanup");
}
}
}
let mut finished_blocks = Vec::with_capacity(block_tasks.len());
for task in block_tasks {
finished_blocks.push(task.await);
}
self.restore_blocks(finished_blocks)?;
self.join_local_domains(local_tasks).await?;
run_result?;
Ok(self)
}
pub(crate) fn take_blocks(&mut self) -> Result<Vec<Box<dyn Block>>, Error> {
let mut blocks = Vec::with_capacity(self.blocks.len());
for entry in self.blocks.iter_mut() {
if let Some(block) = entry.block.take() {
blocks.push(block);
}
}
Ok(blocks)
}
pub(crate) fn inboxes(
&self,
) -> Result<(Vec<Option<crate::runtime::dev::BlockInbox>>, Vec<BlockId>), Error> {
let mut inboxes = Vec::with_capacity(self.blocks.len());
let mut ids = Vec::with_capacity(self.blocks.len());
for (id, entry) in self.blocks.iter().enumerate() {
let block_id = BlockId(id);
let inbox = entry
.inbox
.as_ref()
.cloned()
.ok_or(Error::InvalidBlock(block_id))?;
inboxes.push(Some(inbox));
ids.push(block_id);
}
Ok((inboxes, ids))
}
pub(crate) fn startup_snapshot(
&self,
) -> Result<
impl std::future::Future<Output = Result<StartupSnapshot, Error>> + Send + 'static,
Error,
> {
let (inboxes, ids) = self.inboxes()?;
let mut stream_edges = self.stream_edges.clone();
let mut message_edges = self.message_edges.clone();
let domain_handles = self
.local_domains
.iter()
.map(LocalDomainRuntime::handle)
.collect::<Vec<_>>();
Ok(async move {
for domain in domain_handles {
let (domain_stream_edges, domain_message_edges) = domain.topology_async().await?;
stream_edges.extend(domain_stream_edges);
message_edges.extend(domain_message_edges);
}
Ok(StartupSnapshot {
inboxes,
ids,
stream_edges,
message_edges,
})
})
}
pub(crate) async fn run_local_domains(
&mut self,
main_channel: Sender<FlowgraphMessage>,
) -> Result<Vec<oneshot::Receiver<Result<(), Error>>>, Error> {
let mut tasks = Vec::new();
for domain in self.local_domains.iter_mut() {
if let Some(task) = domain.run_if_needed(main_channel.clone()).await? {
tasks.push(task);
}
}
Ok(tasks)
}
pub(crate) fn edge_endpoints(edges: &[Edge]) -> Vec<(BlockId, PortId, BlockId, PortId)> {
edges.iter().map(Edge::endpoints).collect()
}
pub(crate) fn restore_blocks(
&mut self,
blocks: Vec<(BlockId, Box<dyn Block>)>,
) -> Result<(), Error> {
for (id, block) in blocks {
let entry = self.blocks.get_mut(id.0).ok_or(Error::InvalidBlock(id))?;
if entry.block.is_some() {
return Err(Error::RuntimeError(format!(
"block slot {:?} was restored more than once",
id
)));
}
entry.block = Some(block);
}
Ok(())
}
pub(crate) async fn join_local_domains(
&mut self,
tasks: Vec<oneshot::Receiver<Result<(), Error>>>,
) -> Result<(), Error> {
let mut result = Ok(());
for task in tasks {
let task_result = task
.await
.map_err(|_| Error::RuntimeError("local domain task canceled".to_string()))
.and_then(|result| result);
if result.is_ok() {
result = task_result;
}
}
for domain in self.local_domains.iter_mut() {
domain.mark_stopped();
}
result
}
}
impl Default for Flowgraph {
fn default() -> Self {
Self::new()
}
}