use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::mem::take;
use error_chain::bail;
use log::{debug, error, trace};
use serde_derive::{Deserialize, Serialize};
use url::Url;
use crate::errors::*;
use crate::model::connection::Connection;
use crate::model::connection::Direction;
use crate::model::connection::Direction::FROM;
use crate::model::connection::Direction::TO;
use crate::model::input::InputInitializer;
use crate::model::io::{IO, IOType};
use crate::model::io::Find;
use crate::model::io::IOSet;
use crate::model::metadata::MetaData;
use crate::model::name::HasName;
use crate::model::name::Name;
use crate::model::process::Process;
use crate::model::process::Process::FlowProcess;
use crate::model::process::Process::FunctionProcess;
use crate::model::process_reference::ProcessReference;
use crate::model::route::{Route, RouteType};
use crate::model::route::HasRoute;
use crate::model::route::SetIORoutes;
use crate::model::route::SetRoute;
use crate::model::validation::Validate;
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct FlowDefinition {
#[serde(rename = "flow")]
pub name: Name,
#[serde(default, rename = "input")]
pub inputs: IOSet,
#[serde(default, rename = "output")]
pub outputs: IOSet,
#[serde(default, rename = "process")]
pub process_refs: Vec<ProcessReference>,
#[serde(default, rename = "connection")]
pub connections: Vec<Connection>,
#[serde(default)]
pub metadata: MetaData,
#[serde(default)]
pub docs: String,
#[serde(skip)]
pub alias: Name,
#[serde(skip)]
pub id: usize,
#[serde(skip, default = "FlowDefinition::default_url")]
pub source_url: Url,
#[serde(skip)]
pub route: Route,
#[serde(skip)]
pub subprocesses: BTreeMap<Name, Process>,
#[serde(skip)]
pub lib_references: BTreeSet<Url>,
#[serde(skip)]
pub context_references: BTreeSet<Url>,
}
impl Validate for FlowDefinition {
fn validate(&self) -> Result<()> {
for input in &self.inputs {
input.validate()?;
}
for output in &self.outputs {
output.validate()?;
}
for connection in &self.connections {
connection.validate()?;
}
Ok(())
}
}
impl fmt::Display for FlowDefinition {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "\tname: \t\t\t{}\n\tid: \t\t\t{}\n\talias: \t\t\t{}\n\tsource_url: \t{}\n\troute: \t\t\t{}",
self.name, self.id, self.alias, self.source_url, self.route)?;
writeln!(f, "\tinputs:")?;
for input in &self.inputs {
writeln!(f, "\t\t\t\t\t{input:#?}")?;
}
writeln!(f, "\toutputs:")?;
for output in &self.outputs {
writeln!(f, "\t\t\t\t\t{output:#?}")?;
}
writeln!(f, "\tprocesses:")?;
for flow_ref in &self.process_refs {
writeln!(f, "\t{flow_ref}")?;
}
writeln!(f, "\tconnections:")?;
for connection in &self.connections {
writeln!(f, "\t\t\t\t\t{connection}")?;
}
Ok(())
}
}
impl Default for FlowDefinition {
fn default() -> FlowDefinition {
FlowDefinition {
name: Default::default(),
inputs: vec![],
outputs: vec![],
process_refs: vec![],
connections: vec![],
metadata: Default::default(),
docs: "".to_string(),
alias: Default::default(),
id: 0,
source_url: Url::parse("file://").expect("Could not create Url"),
route: Default::default(),
subprocesses: Default::default(),
lib_references: Default::default(),
context_references: Default::default(),
}
}
}
impl HasName for FlowDefinition {
fn name(&self) -> &Name {
&self.name
}
fn alias(&self) -> &Name {
&self.alias
}
}
impl HasRoute for FlowDefinition {
fn route(&self) -> &Route {
&self.route
}
fn route_mut(&mut self) -> &mut Route {
&mut self.route
}
}
impl SetRoute for FlowDefinition {
fn set_routes_from_parent(&mut self, parent_route: &Route) {
if parent_route.is_empty() {
self.route = Route::from(format!("/{}", self.alias));
} else {
self.route = Route::from(format!("{parent_route}/{}", self.alias));
}
self.inputs
.set_io_routes_from_parent(&self.route, IOType::FlowInput);
self.outputs
.set_io_routes_from_parent(&self.route, IOType::FlowOutput);
}
}
impl FlowDefinition {
pub fn default_url() -> Url {
Url::parse("file://").expect("Could not create default_url")
}
pub fn set_alias(&mut self, alias: &Name) {
if alias.is_empty() {
self.alias = self.name.clone();
} else {
self.alias = alias.clone();
}
}
pub fn get_docs(&self) -> &str {
&self.docs
}
pub fn inputs(&self) -> &IOSet {
&self.inputs
}
pub fn inputs_mut(&mut self) -> &mut IOSet {
&mut self.inputs
}
pub fn outputs(&self) -> &IOSet {
&self.outputs
}
fn set_initializers(&mut self, initializer_map: &BTreeMap<String, InputInitializer>)
-> Result<()> {
for (input_name, initializer) in initializer_map {
for (index, input) in self.inputs.iter_mut().enumerate() {
if *input.name() == Name::from(input_name)
|| (input_name.as_str() == "default" && index == 0)
{
input.set_initializer(Some(initializer.clone()))
.chain_err(|| "Failed to set initializers in flow")?;
}
}
}
Ok(())
}
pub fn config(
&mut self,
source_url: &Url,
parent_route: &Route,
alias_from_reference: &Name,
id: usize,
initializations: &BTreeMap<String, InputInitializer>,
) -> Result<()> {
self.id = id;
self.set_alias(alias_from_reference);
self.source_url = source_url.to_owned();
self.set_initializers(initializations)?;
self.set_routes_from_parent(parent_route);
self.validate()
}
pub fn is_runnable(&self) -> bool {
self.inputs().is_empty() && self.outputs().is_empty()
}
fn get_subprocess_io(
&mut self,
subprocess_alias: &Name,
direction: Direction,
sub_route: &Route,
) -> Result<IO> {
debug!("\tLooking for subprocess with alias = '{}'", subprocess_alias);
match self.subprocesses.get_mut(subprocess_alias) {
Some(FlowProcess(ref mut sub_flow)) => {
debug!("\tFlow sub-process with matching name found, name = '{subprocess_alias}'");
match direction {
TO => sub_flow
.inputs
.find_by_subroute(sub_route),
FROM => sub_flow
.outputs
.find_by_subroute(sub_route),
}
},
Some(FunctionProcess(ref mut function)) => {
debug!("\tFunction sub-process with name = '{subprocess_alias}' found");
match direction {
TO => function.inputs.find_by_subroute(sub_route),
FROM => function.outputs.find_by_subroute(sub_route)
.or_else(|e1| { function.inputs.find_by_subroute(sub_route)
.chain_err(|| e1 )
}),
}
}
None => {
bail!("No sub-process named '{subprocess_alias}' exists in the flow '{}'\n\
possible sub-process names are: '{}'",
self.route, self.subprocesses.keys().map(|k| k.as_str() )
.collect::<Vec<&str>>().join(", "))
}
}
}
fn parse_subroute(&self, route: &Route) -> Result<RouteType> {
let segments: Vec<&str> = route.split('/').collect();
match segments[0] {
"input" => Ok(RouteType::FlowInput(segments[1].into(),
segments[2..].join("/").into())),
"output" => Ok(RouteType::FlowOutput(segments[1].into())),
"" => bail!("Invalid Route in connection - must be an input, output or sub-process name"),
process_name => Ok(RouteType::SubProcess(process_name.into(),
segments[1..].join("/").into())),
}
}
fn get_io_by_route(
&mut self,
direction: Direction,
route: &Route,
) -> Result<IO> {
debug!("Looking for connection {:?} '{}'", direction, route);
match (&direction, self.parse_subroute(route)?) {
(&FROM, RouteType::FlowInput(input_name, sub_route)) => {
let mut from = self
.inputs
.find_by_subroute(&Route::from(input_name.to_string()))?;
from.route_mut().extend(&sub_route);
Ok(from)
},
(&TO, RouteType::FlowOutput(output_name)) =>
self.outputs.find_by_subroute(&Route::from(output_name.to_string())),
(_, RouteType::SubProcess(process_name, sub_route)) =>
self.get_subprocess_io(&process_name, direction, &sub_route),
(&FROM, RouteType::FlowOutput(output_name)) => {
bail!("Invalid connection FROM an output named: '{}'", output_name)
},
(&TO, RouteType::FlowInput(input_name, sub_route)) => {
bail!(
"Invalid connection TO an input named: '{}' with sub_route: '{}'",
input_name, sub_route
)
}
}
}
pub fn build_connections(&mut self, level: usize) -> Result<()> {
debug!("Building connections for flow '{}'", self.name);
let mut error_count = 0;
let mut connections = take(&mut self.connections);
for connection in connections.iter_mut() {
if let Err(e) = self.build_connection(connection, level) {
error_count += 1;
error!("{}", e);
}
}
if error_count == 0 {
debug!(
"All connections inside flow '{}' successfully built",
self.source_url
);
Ok(())
} else {
bail!(
"{} connection errors found in flow '{}'",
error_count,
self.source_url
)
}
}
fn build_connection(&mut self, connection: &Connection, level: usize) -> Result<()> {
let from_io = self.get_io_by_route(FROM, connection.from())
.chain_err(|| format!("Did not find connection source: '{}' specified in flow '{}'\n",
connection.from(), self.source_url))?;
trace!("Found connection source:\n{:#?}", from_io);
for to_route in connection.to() {
match self.get_io_by_route(TO, to_route) {
Ok(to_io) => {
trace!("Found connection destination:\n{:#?}", to_io);
let mut new_connection = connection.clone();
new_connection.connect(from_io.clone(), to_io, level)?;
self.connections.push(new_connection);
}
Err(error) => {
bail!(
"Did not find connection destination: '{}' in flow '{}'\n\t\t{}",
to_route,
self.source_url,
error
);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use serde_json::json;
use crate::model::connection::{Connection, Direction};
use crate::model::datatype::{NUMBER_TYPE, STRING_TYPE};
use crate::model::flow_definition::FlowDefinition;
use crate::model::function_definition::FunctionDefinition;
use crate::model::input::InputInitializer::Always;
use crate::model::input::InputInitializer::Once;
use crate::model::io::IO;
use crate::model::name::{HasName, Name};
use crate::model::process::Process;
use crate::model::route::{HasRoute, Route, RouteType, SetRoute};
use crate::model::validation::Validate;
fn test_flow() -> FlowDefinition {
let mut flow = FlowDefinition {
name: "test_flow".into(),
alias: "test_flow".into(),
inputs: vec![
IO::new_named(vec!(STRING_TYPE.into()), "string", "string"),
IO::new_named(vec!(NUMBER_TYPE.into()), "number", "number"),
],
outputs: vec![
IO::new_named(vec!(STRING_TYPE.into()), "string", "string"),
IO::new_named(vec!(NUMBER_TYPE.into()), "number", "number"),
],
source_url: FlowDefinition::default_url(),
..Default::default()
};
let process_1 = Process::FunctionProcess(FunctionDefinition {
name: "process_1".into(),
inputs: vec![IO::new_named(vec!(STRING_TYPE.into()),
"", "")],
outputs: vec![IO::new_named(vec!(STRING_TYPE.into()),
"output_1", "output_1")],
..Default::default()
});
let process_2 = Process::FunctionProcess(FunctionDefinition {
name: "process_2".into(),
function_id: 1,
inputs: vec![IO::new_named(vec!(STRING_TYPE.into()), "", "")],
outputs: vec![IO::new_named(vec!(NUMBER_TYPE.into()), "", "")],
..Default::default()
});
let _ = flow.subprocesses.insert("process_1".into(), process_1);
let _ = flow.subprocesses.insert("process_2".into(), process_2);
flow
}
#[test]
fn test_name() {
let flow = FlowDefinition::default();
assert_eq!(flow.name(), &Name::default());
}
#[test]
fn test_alias() {
let flow = FlowDefinition::default();
assert_eq!(flow.alias(), &Name::default());
}
#[test]
fn test_set_alias() {
let mut flow = FlowDefinition::default();
flow.set_alias(&Name::from("test flow"));
assert_eq!(flow.alias(), &Name::from("test flow"));
}
#[test]
fn test_set_empty_alias() {
let mut flow = FlowDefinition::default();
flow.set_alias(&Name::from(""));
assert_eq!(flow.alias(), &Name::from(""));
}
#[test]
fn test_route() {
let flow = FlowDefinition::default();
assert_eq!(flow.route(), &Route::default());
}
#[test]
fn test_invalid_connection_route() {
let flow = test_flow();
match flow.parse_subroute(&Route::from("")) {
Ok(_) => panic!("Connection route should not be valid"),
Err(e) => assert!(e.to_string()
.contains("Invalid Route in connection"))
}
}
#[test]
fn test_parse_valid_input() {
let flow = test_flow();
assert_eq!(flow.parse_subroute(&Route::from("input/string"))
.expect("Could not find input"),
RouteType::FlowInput(Name::from("string"),
Route::default()));
}
#[test]
fn test_parse_valid_output() {
let flow = test_flow();
assert_eq!(flow.parse_subroute(&Route::from("output/string"))
.expect("Could not find input"),
RouteType::FlowOutput(Name::from("string")));
}
#[test]
fn test_parse_valid_subprocess() {
let flow = test_flow();
assert_eq!(flow.parse_subroute(&Route::from("sub-process"))
.expect("Could not find input"),
RouteType::SubProcess(Name::from("sub-process"), Route::default()));
}
#[test]
fn test_non_existent_subprocess_in_connection() {
let mut flow = test_flow();
match flow.get_subprocess_io(&Name::from("foo"),
Direction::FROM,
&Route::from("who-cares")) {
Ok(_) => panic!("Should not find non-existent sub-process"),
Err(e) => {
assert!(e.to_string().contains("No sub-process named"))
},
}
}
#[test]
fn test_existent_subprocess_existing_io_in_connection() {
let mut flow = test_flow();
flow.get_subprocess_io(&Name::from("process_1"),
Direction::FROM,
&Route::from(""))
.expect("Could not find sub-process called process_1");
}
#[test]
fn test_existent_subprocess_non_existing_input_in_connection() {
let mut flow = test_flow();
match flow.get_subprocess_io(&Name::from("process_1"),
Direction::TO,
&Route::from("no-such-io")) {
Ok(_) => panic!("Should not find non-existent sub-process input"),
Err(e) => assert!(e.to_string().contains("No IO"))
}
}
#[test]
fn test_existent_subprocess_non_existing_io_in_connection() {
let mut flow = test_flow();
match flow.get_subprocess_io(&Name::from("process_1"),
Direction::FROM,
&Route::from("no-such-io")) {
Ok(_) => panic!("Should not find non-existent sub-process IO"),
Err(e) => {
assert!(e.to_string().contains("No IO"))
},
}
}
#[test]
fn test_route_mut() {
let mut flow = FlowDefinition::default();
let route = flow.route_mut();
assert_eq!(route, &Route::default());
*route = Route::from("/root");
assert_eq!(route, &Route::from("/root"));
}
#[test]
fn test_set_empty_parent_route() {
let mut flow = test_flow();
flow.set_routes_from_parent(&Route::from(""));
assert_eq!(flow.route(), &Route::from("/test_flow"));
}
#[test]
fn test_set_parent_route() {
let mut flow = test_flow();
flow.set_routes_from_parent(&Route::from("/root"));
assert_eq!(flow.route(), &Route::from("/root/test_flow"));
}
#[test]
fn validate_flow() {
let mut flow = test_flow();
let connection = Connection::new("process_1", "process_2");
flow.connections = vec![connection];
assert!(flow.validate().is_ok());
}
#[test]
fn duplicate_connection() {
let mut flow = test_flow();
let connection = Connection::new("process_1", "process_2");
flow.connections = vec![connection.clone(), connection];
assert!(flow.validate().is_ok());
}
#[test]
fn check_outputs() {
let flow = test_flow();
assert_eq!(flow.outputs().len(), 2);
}
#[test]
fn check_inputs() {
let flow = test_flow();
assert_eq!(flow.inputs().len(), 2);
}
#[test]
fn check_inputs_mut() {
let mut flow = test_flow();
let inputs = flow.inputs_mut();
assert_eq!(inputs.len(), 2);
*inputs = vec![];
assert_eq!(inputs.len(), 0);
}
#[test]
fn test_inputs_initializers() {
let mut flow = test_flow();
let mut initializers = BTreeMap::new();
initializers.insert(STRING_TYPE.into(), Always(json!("Hello")));
initializers.insert(NUMBER_TYPE.into(), Once(json!(42)));
flow.set_initializers(&initializers).expect("Could not set initializers");
assert_eq!(
flow.inputs()
.get(0)
.expect("Could not get input")
.get_initializer()
.as_ref()
.expect("Could not get initializer"),
&Always(json!("Hello"))
);
assert_eq!(
flow.inputs()
.get(1)
.expect("Could not get input")
.get_initializer()
.as_ref()
.expect("Could not get initializer"),
&Once(json!(42))
);
}
#[test]
fn display_flow() {
let mut flow = test_flow();
let connection = Connection::new("process_1", "process_2");
flow.connections = vec![connection];
println!("flow: {flow}");
}
mod build_connection_tests {
use crate::model::connection::Connection;
use crate::model::flow_definition::test::test_flow;
#[test]
fn build_compatible_internal_connection() {
let mut flow = test_flow();
let connection = Connection::new("process_1", "process_2");
assert!(flow.build_connection(&connection, 0).is_ok());
}
#[test]
fn build_incompatible_internal_connection() {
let mut flow = test_flow();
let connection = Connection::new("process_2", "process_1");
assert!(flow.build_connection(&connection, 0).is_err());
}
#[test]
fn build_from_flow_input_to_sub_process() {
let mut flow = test_flow();
let connection = Connection::new("input/string", "process_1");
assert!(flow.build_connection(&connection, 1).is_ok());
}
#[test]
fn build_from_sub_process_flow_output() {
let mut flow = test_flow();
let connection = Connection::new("process_1", "output/string");
assert!(flow.build_connection(&connection, 0).is_ok());
}
#[test]
fn build_from_flow_input_to_flow_output() {
let mut flow = test_flow();
let connection = Connection::new("input/string", "output/string");
assert!(flow.build_connection(&connection, 1).is_ok());
}
#[test]
fn build_incompatible_from_flow_input_to_sub_process() {
let mut flow = test_flow();
let connection = Connection::new("input/number", "process_1");
assert!(flow.build_connection(&connection, 1).is_err());
}
#[test]
fn build_incompatible_from_sub_process_flow_output() {
let mut flow = test_flow();
let connection = Connection::new("process_1", "output/number");
assert!(flow.build_connection(&connection, 0).is_err());
}
#[test]
fn build_incompatible_from_flow_input_to_flow_output() {
let mut flow = test_flow();
let connection = Connection::new("input/string", "output/number");
assert!(flow.build_connection(&connection, 1).is_err());
}
#[test]
fn fail_build_from_flow_input_to_flow_input() {
let mut flow = test_flow();
let connection = Connection::new("input/string", "input/number");
assert!(flow.build_connection(&connection, 1).is_err());
}
#[test]
fn fail_build_from_flow_output_to_flow_output() {
let mut flow = test_flow();
let connection = Connection::new("output/string", "output/number");
assert!(flow.build_connection(&connection, 1).is_err());
}
#[test]
fn build_all_flow_connections() {
let mut flow = test_flow();
let connection1 = Connection::new("input/string", "output/string");
let connection2 = Connection::new("input/string", "process_1");
let connection3 = Connection::new("process_1", "output/string");
flow.connections = vec![connection1, connection2, connection3];
assert!(flow.build_connections(0).is_ok());
}
#[test]
fn fail_build_flow_connections() {
let mut flow = test_flow();
let connection1 = Connection::new("input/number", "process_1");
flow.connections = vec![connection1];
assert!(flow.build_connections(0).is_err());
}
}
}