use crate::model::descriptor::link::{CompositeInputDescriptor, CompositeOutputDescriptor};
use crate::model::descriptor::node::{try_load_descriptor_from_file, NodeDescriptor};
use crate::model::descriptor::LinkDescriptor;
use crate::prelude::PortId;
use crate::types::configuration::Merge;
use crate::types::{Configuration, NodeId};
use crate::utils::parse_uri;
use crate::zfresult::{ErrorKind, ZFResult as Result};
use crate::{bail, zferror};
use async_recursion::async_recursion;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct OperatorDescriptor {
pub id: NodeId,
pub inputs: Vec<PortId>,
pub outputs: Vec<PortId>,
pub uri: Option<String>,
pub configuration: Option<Configuration>,
}
impl std::fmt::Display for OperatorDescriptor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{} - Kind: Operator (Simple)", self.id)
}
}
impl OperatorDescriptor {
pub fn from_yaml(data: &str) -> Result<Self> {
let dataflow_descriptor = serde_yaml::from_str::<OperatorDescriptor>(data)
.map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
Ok(dataflow_descriptor)
}
pub fn from_json(data: &str) -> Result<Self> {
let dataflow_descriptor = serde_json::from_str::<OperatorDescriptor>(data)
.map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
Ok(dataflow_descriptor)
}
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
pub fn to_yaml(&self) -> Result<String> {
serde_yaml::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CompositeOperatorDescriptor {
pub id: NodeId,
pub inputs: Vec<CompositeInputDescriptor>,
pub outputs: Vec<CompositeOutputDescriptor>,
pub operators: Vec<NodeDescriptor>,
pub links: Vec<LinkDescriptor>,
pub configuration: Option<Configuration>,
}
impl std::fmt::Display for CompositeOperatorDescriptor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{} - Kind: Operator (Composite)", self.id)
}
}
impl CompositeOperatorDescriptor {
pub fn from_yaml(data: &str) -> Result<Self> {
let dataflow_descriptor = serde_yaml::from_str::<CompositeOperatorDescriptor>(data)
.map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
Ok(dataflow_descriptor)
}
pub fn from_json(data: &str) -> Result<Self> {
let dataflow_descriptor = serde_json::from_str::<CompositeOperatorDescriptor>(data)
.map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
Ok(dataflow_descriptor)
}
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
pub fn to_yaml(&self) -> Result<String> {
serde_yaml::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
#[async_recursion]
pub(crate) async fn flatten(
mut self,
composite_id: NodeId,
links: &mut Vec<LinkDescriptor>,
global_configuration: Option<Configuration>,
ancestors: &mut Vec<String>,
) -> Result<Vec<OperatorDescriptor>> {
log::trace!("[Descriptor] Flattening {}", self.id);
let mut simple_operators = vec![];
self.configuration = global_configuration.merge_overwrite(self.configuration);
for o in self.operators {
let description = match parse_uri(&o.descriptor)? {
crate::model::ZFUri::File(path) => try_load_descriptor_from_file(path).await,
crate::model::ZFUri::Builtin(_) => bail!(
ErrorKind::ConfigurationError,
"Builtin operators are not yet supported!"
),
}?;
let NodeDescriptor {
id: operator_id,
descriptor,
configuration,
} = o;
let configuration = self.configuration.clone().merge_overwrite(configuration);
let res_simple = OperatorDescriptor::from_yaml(&description);
if let Ok(mut simple_operator) = res_simple {
log::trace!(
"[Descriptor] Flattening {} - {} is simple",
self.id,
simple_operator.id
);
let new_id: NodeId = format!("{composite_id}/{operator_id}").into();
let output_ids: HashMap<_, _> = self
.outputs
.iter()
.filter(|&output| output.node == operator_id)
.map(|output| (&output.id, &output.output))
.collect();
let input_ids: HashMap<_, _> = self
.inputs
.iter()
.filter(|&input| input.node == operator_id)
.map(|input| (&input.id, &input.input))
.collect();
for l in &mut self.links {
if l.from.node == operator_id {
log::trace!("Updating {} to {}", l.from.node, new_id);
l.from.node = new_id.clone();
}
if l.to.node == operator_id {
log::trace!("Updating {} to {}", l.to.node, new_id);
l.to.node = new_id.clone();
}
}
links
.iter_mut()
.filter(|link| {
link.from.node == composite_id
&& output_ids.keys().contains(&&link.from.output)
})
.for_each(|link| {
link.from.node = new_id.clone();
link.from.output = (*output_ids.get(&&link.from.output).unwrap()).clone();
});
links
.iter_mut()
.filter(|link| {
link.to.node == composite_id && input_ids.keys().contains(&&link.to.input)
})
.for_each(|link| {
link.to.node = new_id.clone();
link.to.input = (*input_ids.get(&&link.to.input).unwrap()).clone();
});
simple_operator.id = new_id;
simple_operator.configuration = configuration
.clone()
.merge_overwrite(simple_operator.configuration);
log::trace!(
"[Descriptor] Flattening {} - Pushing simple {}",
self.id,
simple_operator.id
);
simple_operators.push(simple_operator);
continue;
}
let res_composite = CompositeOperatorDescriptor::from_yaml(&description);
if let Ok(composite_operator) = res_composite {
log::trace!(
"[Descriptor] Flattening {} - {} is composite",
self.id,
composite_operator.id
);
if let Ok(index) = ancestors.binary_search(&descriptor) {
bail!(
ErrorKind::GenericError, "Possible recursion detected, < {} > would be included again after: {:?}",
descriptor,
&ancestors[index..]
);
}
ancestors.push(descriptor.clone());
let mut operators = composite_operator
.flatten(operator_id, &mut self.links, configuration, ancestors)
.await?;
for operator in operators.iter_mut() {
let new_id: NodeId = format!("{}/{}", composite_id, operator.id).into();
self.links
.iter_mut()
.filter(|link| link.from.node == operator.id || link.to.node == operator.id)
.for_each(|link| {
if link.from.node == operator.id {
link.from.node = new_id.clone();
}
if link.to.node == operator.id {
link.to.node = new_id.clone();
}
});
operator.id = new_id;
}
simple_operators.append(&mut operators);
ancestors.pop();
continue;
}
log::error!(
"Could not parse < {} > as either a Simple or a Composite Operator:",
operator_id
);
log::error!("Simple: {:?}", res_simple.err().unwrap());
log::error!("Composite: {:?}", res_composite.err().unwrap());
bail!(
ErrorKind::ParsingError,
"Could not parse < {} >",
operator_id
);
}
links.append(&mut self.links);
Ok(simple_operators)
}
}
#[cfg(test)]
#[path = "../tests/flatten-composite.rs"]
mod tests;