pub mod builtin;
pub mod runners;
use self::runners::connector::{ZenohReceiver, ZenohSender};
use self::runners::Runner;
use super::DataFlow;
use crate::io::{Inputs, Outputs};
use crate::model::record::{LinkRecord, ZFConnectorKind};
use crate::prelude::{Context, Node};
use crate::runtime::InstanceContext;
use crate::types::NodeId;
use crate::zfresult::ErrorKind;
use crate::Result;
use crate::{bail, zferror};
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use uhlc::HLC;
pub struct DataFlowInstance {
pub(crate) _instance_context: Arc<InstanceContext>,
pub(crate) data_flow: DataFlow,
pub(crate) runners: HashMap<NodeId, Runner>,
}
impl Deref for DataFlowInstance {
type Target = DataFlow;
fn deref(&self) -> &Self::Target {
&self.data_flow
}
}
impl DataFlowInstance {
pub fn get_sinks(&self) -> Vec<NodeId> {
self.sink_constructors.keys().cloned().collect()
}
pub fn get_sources(&self) -> Vec<NodeId> {
self.source_constructors.keys().cloned().collect()
}
pub fn get_operators(&self) -> Vec<NodeId> {
self.operator_constructors.keys().cloned().collect()
}
pub fn get_connectors(&self) -> Vec<NodeId> {
self.connectors.keys().cloned().collect()
}
pub fn start_node(&mut self, node_id: &NodeId) -> Result<()> {
if let Some(runner) = self.runners.get_mut(node_id) {
runner.start();
return Ok(());
}
bail!(
ErrorKind::NodeNotFound(node_id.clone()),
"Node < {} > not found",
node_id
)
}
pub async fn stop_node(&mut self, node_id: &NodeId) -> Result<()> {
if let Some(runner) = self.runners.get_mut(node_id) {
return runner.stop().await;
}
bail!(
ErrorKind::NodeNotFound(node_id.clone()),
"Node < {} > not found",
node_id
)
}
pub async fn try_instantiate(data_flow: DataFlow, hlc: Arc<HLC>) -> Result<Self> {
let instance_context = Arc::new(InstanceContext {
flow_id: data_flow.flow.clone(),
instance_id: data_flow.uuid,
runtime: data_flow.context.clone(),
});
let mut node_ids: Vec<NodeId> = Vec::with_capacity(
data_flow.source_constructors.len()
+ data_flow.operator_constructors.len()
+ data_flow.sink_constructors.len()
+ data_flow.connectors.len(),
);
node_ids.append(
&mut data_flow
.source_constructors
.keys()
.cloned()
.collect::<Vec<_>>(),
);
node_ids.append(
&mut data_flow
.operator_constructors
.keys()
.cloned()
.collect::<Vec<_>>(),
);
node_ids.append(
&mut data_flow
.sink_constructors
.keys()
.cloned()
.collect::<Vec<_>>(),
);
node_ids.append(&mut data_flow.connectors.keys().cloned().collect::<Vec<_>>());
let mut links = create_links(&node_ids, &data_flow.links, hlc.clone())?;
let context = Context::new(&instance_context);
let mut runners = HashMap::with_capacity(data_flow.source_constructors.len());
for (source_id, source_constructor) in &data_flow.source_constructors {
let (_, outputs) = links.remove(source_id).ok_or_else(|| {
zferror!(
ErrorKind::IOError,
"Links for Source < {} > were not created.",
&source_id
)
})?;
let source = (source_constructor.constructor)(
context.clone(),
source_constructor.configuration.clone(),
outputs,
)
.await?;
let runner = Runner::new(source);
runners.insert(source_id.clone(), runner);
}
for (operator_id, operator_constructor) in &data_flow.operator_constructors {
let (inputs, outputs) = links.remove(operator_id).ok_or_else(|| {
zferror!(
ErrorKind::IOError,
"Links for Operator < {} > were not created.",
&operator_id
)
})?;
let operator = (operator_constructor.constructor)(
context.clone(),
operator_constructor.configuration.clone(),
inputs,
outputs,
)
.await?;
let runner = Runner::new(operator);
runners.insert(operator_id.clone(), runner);
}
for (sink_id, sink_constructor) in &data_flow.sink_constructors {
let (inputs, _) = links.remove(sink_id).ok_or_else(|| {
zferror!(
ErrorKind::IOError,
"Links for Sink < {} > were not created.",
&sink_id
)
})?;
let sink = (sink_constructor.constructor)(
context.clone(),
sink_constructor.configuration.clone(),
inputs,
)
.await?;
let runner = Runner::new(sink);
runners.insert(sink_id.clone(), runner);
}
for (connector_id, connector_record) in &data_flow.connectors {
let node = match &connector_record.kind {
ZFConnectorKind::Sender => {
let (inputs, _) = links.remove(connector_id).ok_or_else(|| {
zferror!(
ErrorKind::IOError,
"Links for Sink < {} > were not created.",
connector_id
)
})?;
Arc::new(
ZenohSender::new(connector_record, instance_context.clone(), inputs)
.await?,
) as Arc<dyn Node>
}
ZFConnectorKind::Receiver => {
let (_, outputs) = links.remove(connector_id).ok_or_else(|| {
zferror!(
ErrorKind::IOError,
"Links for Source < {} > were not created.",
&connector_id
)
})?;
Arc::new(
ZenohReceiver::new(connector_record, instance_context.clone(), outputs)
.await?,
) as Arc<dyn Node>
}
};
let runner = Runner::new(node);
runners.insert(connector_id.clone(), runner);
}
Ok(DataFlowInstance {
_instance_context: instance_context,
data_flow,
runners,
})
}
}
pub(crate) fn create_links(
nodes: &[NodeId],
links: &[LinkRecord],
hlc: Arc<HLC>,
) -> Result<HashMap<NodeId, (Inputs, Outputs)>> {
let mut io: HashMap<NodeId, (Inputs, Outputs)> = HashMap::with_capacity(nodes.len());
for link_desc in links {
let upstream_node = link_desc.from.node.clone();
let downstream_node = link_desc.to.node.clone();
if !nodes.contains(&upstream_node) || !nodes.contains(&downstream_node) {
continue;
}
let (tx, rx) = flume::unbounded();
let from = link_desc.from.output.clone();
let to = link_desc.to.input.clone();
match io.get_mut(&upstream_node) {
Some((_, outputs)) => outputs.insert(from.clone(), tx),
None => {
let inputs = Inputs::new();
let mut outputs = Outputs::new(hlc.clone());
outputs.insert(from.clone(), tx);
io.insert(upstream_node, (inputs, outputs));
}
}
match io.get_mut(&downstream_node) {
Some((inputs, _)) => inputs.insert(to.clone(), rx),
None => {
let outputs = Outputs::new(hlc.clone());
let mut inputs = Inputs::new();
inputs.insert(to.clone(), rx);
io.insert(downstream_node, (inputs, outputs));
}
}
}
Ok(io)
}