use crate::model::descriptor::{
FlattenDataFlowDescriptor, InputDescriptor, LinkDescriptor, OutputDescriptor,
};
use crate::model::record::connector::{ZFConnectorKind, ZFConnectorRecord};
use crate::model::record::{LinkRecord, OperatorRecord, PortRecord, SinkRecord, SourceRecord};
use crate::types::{NodeId, PortId, RuntimeId};
use crate::zferror;
use crate::zfresult::ErrorKind;
use crate::Result as ZFResult;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::hash::{Hash, Hasher};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DataFlowRecord {
pub uuid: Uuid,
pub flow: String,
pub operators: HashMap<NodeId, OperatorRecord>,
pub sinks: HashMap<NodeId, SinkRecord>,
pub sources: HashMap<NodeId, SourceRecord>,
pub connectors: HashMap<NodeId, ZFConnectorRecord>,
pub links: Vec<LinkRecord>,
pub counter: u32,
}
impl DataFlowRecord {
pub fn from_yaml(data: &str) -> ZFResult<Self> {
serde_yaml::from_str::<DataFlowRecord>(data)
.map_err(|e| zferror!(ErrorKind::ParsingError, e).into())
}
pub fn from_json(data: &str) -> ZFResult<Self> {
serde_json::from_str::<DataFlowRecord>(data)
.map_err(|e| zferror!(ErrorKind::ParsingError, e).into())
}
pub fn to_json(&self) -> ZFResult<String> {
serde_json::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
pub fn to_yaml(&self) -> ZFResult<String> {
serde_yaml::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
pub fn find_node_runtime(&self, id: &str) -> Option<RuntimeId> {
match self.operators.get(id) {
Some(o) => Some(o.runtime.clone()),
None => match self.sources.get(id) {
Some(s) => Some(s.runtime.clone()),
None => self.sinks.get(id).map(|s| s.runtime.clone()),
},
}
}
fn find_node_uid_by_id(&self, id: &NodeId) -> Option<u32> {
if let Some(o) = self.operators.get(id) {
return Some(o.uid);
}
if let Some(s) = self.sources.get(id) {
return Some(s.uid);
}
if let Some(s) = self.sinks.get(id) {
return Some(s.uid);
}
None
}
fn find_port_id_in_node(&self, node_id: &NodeId, port_id: &PortId) -> Option<u32> {
let (inputs, outputs) = if let Some(op) = self.operators.get(node_id) {
(Some(&op.inputs), Some(&op.outputs))
} else if let Some(source) = self.sources.get(node_id) {
(None, Some(&source.outputs))
} else if let Some(sink) = self.sinks.get(node_id) {
(Some(&sink.inputs), None)
} else {
(None, None)
};
if let Some(outputs) = outputs {
if let Some(port_record) = outputs.iter().find(|&output| output.port_id == *port_id) {
return Some(port_record.uid);
}
}
if let Some(inputs) = inputs {
if let Some(port_record) = inputs.iter().find(|&input| input.port_id == *port_id) {
return Some(port_record.uid);
}
}
None
}
fn add_links(&mut self, links: &[LinkDescriptor]) -> ZFResult<()> {
for l in links.iter() {
log::debug!("Adding link: {:?}…", l);
let from_runtime = match self.find_node_runtime(&l.from.node) {
Some(rt) => rt,
None => {
log::error!("Could not find runtime for: {:?}", &l.from.node);
return Err(zferror!(
ErrorKind::Uncompleted,
"Unable to find runtime for {}",
&l.from.node
)
.into());
}
};
let to_runtime = match self.find_node_runtime(&l.to.node) {
Some(rt) => rt,
None => {
log::error!("Could not find runtime for: {:?}", &l.to.node);
return Err(zferror!(
ErrorKind::Uncompleted,
"Unable to find runtime for {}",
&l.to.node
)
.into());
}
};
if from_runtime == to_runtime {
log::debug!("Adding link: {:?}… OK", l);
self.links.push((l.clone(), self.counter).into());
self.counter += 1;
} else {
let from_uid = self
.find_node_uid_by_id(&l.from.node)
.ok_or_else(|| zferror!(ErrorKind::NotFound))?;
let from_port_uid = self
.find_port_id_in_node(&l.from.node, &l.from.output)
.ok_or_else(|| zferror!(ErrorKind::NotFound))?;
let z_resource_name = format!(
"zf/data/{}/{}/{}/{}",
&self.flow, &self.uuid, &from_uid, &from_port_uid
);
if !self.connectors.iter().any(|(_id, c)| {
c.kind == ZFConnectorKind::Sender && c.resource == z_resource_name
}) {
let sender_id: NodeId = format!(
"sender-{}-{}-{}-{}",
&self.flow, &self.uuid, &l.from.node, &l.from.output
)
.into();
let sender = ZFConnectorRecord {
kind: ZFConnectorKind::Sender,
id: sender_id.clone(),
resource: z_resource_name.clone(),
link_id: PortRecord {
uid: self.counter,
port_id: l.from.output.clone(),
},
shared_memory_element_size: l.shared_memory_element_size,
shared_memory_elements: l.shared_memory_elements,
shared_memory_backoff: l.shared_memory_backoff,
runtime: from_runtime,
};
self.counter += 1;
let link_sender = LinkDescriptor {
from: l.from.clone(),
to: InputDescriptor {
node: sender_id.clone(),
input: l.from.output.clone(),
},
shared_memory_element_size: l.shared_memory_element_size,
shared_memory_elements: l.shared_memory_elements,
shared_memory_backoff: l.shared_memory_backoff,
};
self.connectors.insert(sender_id, sender);
self.links.push((link_sender, self.counter).into());
self.counter += 1;
}
let receiver_id: NodeId = format!(
"receiver-{}-{}-{}-{}",
&self.flow, &self.uuid, &l.to.node, &l.to.input
)
.into();
let receiver = ZFConnectorRecord {
kind: ZFConnectorKind::Receiver,
id: receiver_id.clone(),
resource: z_resource_name.clone(),
link_id: PortRecord {
uid: self.counter,
port_id: l.to.input.clone(),
},
shared_memory_element_size: l.shared_memory_element_size,
shared_memory_elements: l.shared_memory_elements,
shared_memory_backoff: l.shared_memory_backoff,
runtime: to_runtime,
};
self.counter += 1;
let link_receiver = LinkDescriptor {
from: OutputDescriptor {
node: receiver_id.clone(),
output: l.to.input.clone(),
},
to: l.to.clone(),
shared_memory_element_size: l.shared_memory_element_size,
shared_memory_elements: l.shared_memory_elements,
shared_memory_backoff: l.shared_memory_backoff,
};
self.connectors.insert(receiver_id, receiver);
self.links.push((link_receiver, self.counter).into());
self.counter += 1;
}
}
Ok(())
}
}
impl TryFrom<(FlattenDataFlowDescriptor, Uuid)> for DataFlowRecord {
type Error = crate::zfresult::Error;
fn try_from(d: (FlattenDataFlowDescriptor, Uuid)) -> Result<Self, Self::Error> {
let (dataflow, id) = d;
let FlattenDataFlowDescriptor {
flow,
operators,
sources,
sinks,
links,
mapping,
global_configuration: _,
} = dataflow;
let mapping = mapping.map_or(HashMap::new(), |m| m);
let mut dfr = DataFlowRecord {
uuid: id,
flow,
operators: HashMap::with_capacity(operators.len()),
sinks: HashMap::with_capacity(sinks.len()),
sources: HashMap::with_capacity(sources.len()),
connectors: HashMap::new(),
links: Vec::new(),
counter: 0,
};
for o in operators.into_iter() {
let mut inputs: Vec<PortRecord> = vec![];
for i in o.inputs {
inputs.push((i, dfr.counter).into());
dfr.counter += 1;
}
let mut outputs: Vec<PortRecord> = vec![];
for o in o.outputs {
outputs.push((o, dfr.counter).into());
dfr.counter += 1;
}
let or = OperatorRecord {
id: o.id.clone(),
uid: dfr.counter,
inputs,
outputs,
uri: o.uri,
configuration: o.configuration,
runtime: mapping
.get(&o.id)
.ok_or_else(|| zferror!(ErrorKind::MissingConfiguration))
.cloned()?,
};
dfr.operators.insert(o.id, or);
dfr.counter += 1;
}
for s in sources.into_iter() {
let mut outputs: Vec<PortRecord> = vec![];
for o in s.outputs {
outputs.push((o, dfr.counter).into());
dfr.counter += 1;
}
let sr = SourceRecord {
id: s.id.clone(),
uid: dfr.counter,
outputs,
uri: s.uri,
configuration: s.configuration,
runtime: mapping
.get(&s.id)
.ok_or_else(|| zferror!(ErrorKind::MissingConfiguration))
.cloned()?,
};
dfr.sources.insert(s.id, sr);
dfr.counter += 1;
}
for s in sinks.into_iter() {
let mut inputs: Vec<PortRecord> = Vec::with_capacity(s.inputs.len());
for i in s.inputs {
inputs.push((i, dfr.counter).into());
dfr.counter += 1;
}
let sr = SinkRecord {
id: s.id.clone(),
uid: dfr.counter,
inputs,
uri: s.uri,
configuration: s.configuration,
runtime: mapping
.get(&s.id)
.ok_or_else(|| zferror!(ErrorKind::MissingConfiguration))
.cloned()?,
};
dfr.sinks.insert(s.id, sr);
dfr.counter += 1;
}
dfr.add_links(&links)?;
Ok(dfr)
}
}
impl Hash for DataFlowRecord {
fn hash<H: Hasher>(&self, state: &mut H) {
self.uuid.hash(state);
self.flow.hash(state);
}
}
impl PartialEq for DataFlowRecord {
fn eq(&self, other: &DataFlowRecord) -> bool {
self.uuid == other.uuid && self.flow == other.flow
}
}
impl Eq for DataFlowRecord {}