pub mod instance;
pub mod loader;
pub mod node;
use self::node::{OperatorConstructor, SinkConstructor, SourceConstructor};
use self::node::{OperatorFn, SinkFn, SourceFn};
use crate::model::descriptor::{InputDescriptor, OutputDescriptor};
use crate::model::record::{
DataFlowRecord, LinkRecord, OperatorRecord, SinkRecord, SourceRecord, ZFConnectorRecord,
};
use crate::runtime::RuntimeContext;
use crate::types::NodeId;
use crate::Result as ZFResult;
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
pub struct DataFlow {
pub(crate) uuid: Uuid,
pub(crate) flow: Arc<str>,
pub(crate) context: RuntimeContext,
pub(crate) source_constructors: HashMap<NodeId, SourceConstructor>,
pub(crate) operator_constructors: HashMap<NodeId, OperatorConstructor>,
pub(crate) sink_constructors: HashMap<NodeId, SinkConstructor>,
pub(crate) connectors: HashMap<NodeId, ZFConnectorRecord>,
pub(crate) links: Vec<LinkRecord>,
pub(crate) counter: u32,
}
impl DataFlow {
pub fn new(name: impl AsRef<str>, context: RuntimeContext) -> Self {
Self {
uuid: Uuid::new_v4(),
flow: name.as_ref().into(),
context,
source_constructors: HashMap::new(),
operator_constructors: HashMap::new(),
sink_constructors: HashMap::new(),
connectors: HashMap::new(),
links: Vec::new(),
counter: 0,
}
}
pub fn add_source(&mut self, record: SourceRecord, constructor: SourceFn) {
self.source_constructors.insert(
record.id.clone(),
SourceConstructor::new_static(record, constructor),
);
}
pub fn add_operator(&mut self, record: OperatorRecord, constructor: OperatorFn) {
self.operator_constructors.insert(
record.id.clone(),
OperatorConstructor::new_static(record, constructor),
);
}
pub fn add_sink(&mut self, record: SinkRecord, constructor: SinkFn) {
self.sink_constructors.insert(
record.id.clone(),
SinkConstructor::new_static(record, constructor),
);
}
pub fn add_link(&mut self, from: OutputDescriptor, to: InputDescriptor) {
self.links.push(LinkRecord {
uid: self.counter,
from,
to,
shared_memory_element_size: None,
shared_memory_elements: None,
shared_memory_backoff: None,
});
self.counter += 1;
}
pub fn try_new(record: DataFlowRecord, context: RuntimeContext) -> ZFResult<Self> {
let DataFlowRecord {
uuid,
flow,
operators,
sinks,
sources,
connectors,
links,
counter,
} = record;
let source_constructors = sources
.into_iter()
.filter(|(_, record)| record.runtime == context.runtime_name)
.map(|(source_id, source_record)| {
context
.loader
.load_source_constructor(source_record)
.map(|source_constructor| (source_id, source_constructor))
})
.collect::<ZFResult<HashMap<NodeId, SourceConstructor>>>()?;
let operator_constructors = operators
.into_iter()
.filter(|(_, record)| record.runtime == context.runtime_name)
.map(|(operator_id, operator_record)| {
context
.loader
.load_operator_constructor(operator_record)
.map(|operator_constructor| (operator_id, operator_constructor))
})
.collect::<ZFResult<HashMap<NodeId, OperatorConstructor>>>()?;
let sink_constructors = sinks
.into_iter()
.filter(|(_, record)| record.runtime == context.runtime_name)
.map(|(sink_id, sink_record)| {
context
.loader
.load_sink_constructor(sink_record)
.map(|sink_constructor| (sink_id, sink_constructor))
})
.collect::<ZFResult<HashMap<NodeId, SinkConstructor>>>()?;
let connectors = connectors
.into_iter()
.filter(|(_, record)| record.runtime == context.runtime_name)
.collect::<HashMap<NodeId, ZFConnectorRecord>>();
Ok(Self {
uuid,
flow: flow.into(),
context,
source_constructors,
operator_constructors,
sink_constructors,
connectors,
links,
counter,
})
}
}