use crate::model::descriptor::validator::DataFlowValidator;
use crate::model::descriptor::Vars;
use crate::model::descriptor::{
LinkDescriptor, NodeDescriptor, OperatorDescriptor, SinkDescriptor, SourceDescriptor,
};
use crate::types::configuration::Merge;
use crate::types::{Configuration, NodeId, RuntimeId};
use crate::zfresult::ErrorKind;
use crate::{zferror, Result};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::hash::{Hash, Hasher};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DataFlowDescriptor {
pub flow: String,
pub operators: Vec<NodeDescriptor>,
pub sources: Vec<NodeDescriptor>,
pub sinks: Vec<NodeDescriptor>,
pub links: Vec<LinkDescriptor>,
pub mapping: Option<HashMap<NodeId, RuntimeId>>,
#[serde(alias = "configuration")]
pub global_configuration: Option<Configuration>,
}
impl DataFlowDescriptor {
pub fn from_yaml(data: &str) -> Result<Self> {
let descriptor = Vars::expand_mustache_yaml(data)?;
let dataflow_descriptor = serde_yaml::from_str::<DataFlowDescriptor>(&descriptor)
.map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
Ok(dataflow_descriptor)
}
pub fn from_json(data: &str) -> Result<Self> {
let descriptor = Vars::expand_mustache_json(data)?;
let dataflow_descriptor = serde_json::from_str::<DataFlowDescriptor>(&descriptor)
.map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
Ok(dataflow_descriptor)
}
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
pub fn to_yaml(&self) -> Result<String> {
serde_yaml::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
pub fn get_runtimes(&self) -> Vec<RuntimeId> {
match &self.mapping {
Some(mapping) => mapping.values().cloned().unique().collect(),
None => vec![],
}
}
pub async fn flatten(self) -> Result<FlattenDataFlowDescriptor> {
let Self {
flow,
operators,
sources,
sinks,
mut links,
mapping,
global_configuration,
} = self;
let mut flattened_sources = Vec::with_capacity(sources.len());
for source in sources {
let config = global_configuration
.clone()
.merge_overwrite(source.configuration.clone());
flattened_sources.push(source.load_source(config).await?);
}
let mut flattened_sinks = Vec::with_capacity(sinks.len());
for sink in sinks {
let config = global_configuration
.clone()
.merge_overwrite(sink.configuration.clone());
flattened_sinks.push(sink.load_sink(config).await?);
}
let mut flattened_operators = Vec::new();
for operator in operators {
let config = global_configuration
.clone()
.merge_overwrite(operator.configuration.clone());
let id = operator.id.clone();
let mut flattened = operator
.flatten(id, &mut links, config, &mut Vec::new())
.await?;
flattened_operators.append(&mut flattened);
}
Ok(FlattenDataFlowDescriptor {
flow,
sources: flattened_sources,
sinks: flattened_sinks,
operators: flattened_operators,
links,
mapping,
global_configuration,
})
}
}
impl Hash for DataFlowDescriptor {
fn hash<H: Hasher>(&self, state: &mut H) {
self.flow.hash(state);
}
}
impl PartialEq for DataFlowDescriptor {
fn eq(&self, other: &DataFlowDescriptor) -> bool {
self.flow == other.flow
}
}
impl Eq for DataFlowDescriptor {}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FlattenDataFlowDescriptor {
pub flow: String,
pub operators: Vec<OperatorDescriptor>,
pub sources: Vec<SourceDescriptor>,
pub sinks: Vec<SinkDescriptor>,
pub links: Vec<LinkDescriptor>,
pub mapping: Option<HashMap<NodeId, RuntimeId>>,
#[serde(alias = "configuration")]
pub global_configuration: Option<Configuration>,
}
impl FlattenDataFlowDescriptor {
pub fn from_yaml(data: &str) -> Result<FlattenDataFlowDescriptor> {
let dataflow_descriptor = serde_yaml::from_str::<FlattenDataFlowDescriptor>(data)
.map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
dataflow_descriptor.validate()?;
Ok(dataflow_descriptor)
}
pub fn from_json(data: &str) -> Result<FlattenDataFlowDescriptor> {
let dataflow_descriptor = serde_json::from_str::<FlattenDataFlowDescriptor>(data)
.map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
dataflow_descriptor.validate()?;
Ok(dataflow_descriptor)
}
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
pub fn to_yaml(&self) -> Result<String> {
serde_yaml::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
}
pub fn get_runtimes(&self) -> Vec<RuntimeId> {
match &self.mapping {
Some(mapping) => mapping.values().cloned().unique().collect(),
None => vec![],
}
}
pub fn validate(&self) -> Result<()> {
let validator = DataFlowValidator::try_from(self)?;
validator.validate_ports()?;
Ok(())
}
}
impl Hash for FlattenDataFlowDescriptor {
fn hash<H: Hasher>(&self, state: &mut H) {
self.flow.hash(state);
}
}
impl PartialEq for FlattenDataFlowDescriptor {
fn eq(&self, other: &FlattenDataFlowDescriptor) -> bool {
self.flow == other.flow
}
}
impl Eq for FlattenDataFlowDescriptor {}
#[cfg(test)]
#[path = "./tests/flatten-descriptor.rs"]
mod tests;