use log::{debug, error, info};
use flowcore::errors::ResultExt;
use flowcore::model::connection::Connection;
use flowcore::model::datatype::DataType;
use flowcore::model::flow_definition::FlowDefinition;
use flowcore::model::io::{IO, IOType};
use flowcore::model::process::Process::FlowProcess;
use flowcore::model::process::Process::FunctionProcess;
use flowcore::model::route::HasRoute;
use flowcore::model::route::Route;
use crate::compiler::compile::CompilerTables;
use crate::errors::*;
pub fn gather_functions_and_connections(flow: &FlowDefinition, tables: &mut CompilerTables) -> Result<()> {
info!("\n=== Compiler: Gathering Functions and Connections");
_gather_functions_and_connections(flow, tables)?;
tables.sort_functions();
tables.create_routes_table();
info!("Gathered {} functions and {} connections", tables.functions.len(), tables.connections.len());
Ok(())
}
fn _gather_functions_and_connections(flow: &FlowDefinition, tables: &mut CompilerTables) -> Result<()> {
let mut connections = flow.connections.clone();
tables.connections.append(&mut connections);
for subprocess in &flow.subprocesses {
match subprocess.1 {
FlowProcess(ref flow) => {
_gather_functions_and_connections(flow, tables)?; }
FunctionProcess(ref function) => {
let mut table_function = function.clone();
table_function.set_id(tables.functions.len());
tables.functions.push(table_function);
}
}
}
let lib_refs = &flow.lib_references;
tables.libs.extend(lib_refs.iter().cloned());
let context_refs = &flow.context_references;
tables.context_functions.extend(context_refs.iter().cloned());
Ok(())
}
pub fn collapse_connections(tables: &mut CompilerTables) -> Result<()> {
info!("\n=== Compiler: Collapsing {} flow connections", tables.connections.len());
let mut collapsed_connections: Vec<Connection> = Vec::new();
for connection in &tables.connections {
match connection.from_io().io_type() {
&IOType::FunctionOutput | &IOType::FunctionInput => {
debug!("Trying to create connection from function IO at '{}'",
connection.from_io().route());
if connection.to_io().io_type() == &IOType::FunctionInput {
debug!("\tFound direct connection to function input at '{}'",
connection.to_io().route());
collapsed_connections.push(connection.clone());
} else {
for (source_subroute, destination_io) in find_connection_destinations(
Route::from(""),
connection.to_io(),
connection.level(),
&tables.connections)? {
let mut collapsed_connection = connection.clone();
let from_route = connection
.from_io()
.route()
.clone()
.extend(&source_subroute)
.clone();
collapsed_connection
.from_io_mut()
.set_route(&from_route, &IOType::FunctionOutput);
*collapsed_connection.to_io_mut() = destination_io;
DataType::compatible_types(collapsed_connection.from_io().datatypes(),
collapsed_connection.to_io().datatypes(), &source_subroute)
.chain_err(|| format!("Incompatible types in collapsed connection from '{}' to '{}'",
collapsed_connection.from_io().route(), collapsed_connection.to_io().route()))?;
debug!("\tIndirect connection {}", collapsed_connection);
collapsed_connections.push(collapsed_connection);
}
}
},
IOType::FlowInput | IOType::FlowOutput => {
if connection.from_io().get_initializer().is_some() {
let destinations = if connection.to_io().io_type() == &IOType::FunctionInput {
vec!((Route::default(), connection.to_io().clone()))
} else {
find_connection_destinations(
Route::from(""),
connection.to_io(),
connection.level(),
&tables.connections,
)?
};
for (_, destination_io) in destinations {
let (destination_function_id, destination_input_index, _) =
tables.destination_routes.get(destination_io.route())
.ok_or(format!("Could not find a destination route matching '{}'", destination_io.route()))?;
let destination_function = tables.functions.get_mut(*destination_function_id)
.ok_or(format!("Could not find a function #{destination_function_id}"))?;
let flow_initializer = connection.from_io().get_initializer().clone();
destination_function.
set_flow_initializer(*destination_input_index, flow_initializer)?;
}
}
}
}
}
info!("{} connections collapsed down to {}", tables.connections.len(), collapsed_connections.len());
tables.collapsed_connections = collapsed_connections;
Ok(())
}
fn find_connection_destinations(
prev_subroute: Route,
from_io: &IO,
from_level: usize,
connections: &[Connection],
) -> Result<Vec<(Route, IO)>> {
let mut destinations = vec![];
for next_connection in connections {
if let Some(subroute) = next_connection
.from_io()
.route()
.sub_route_of(from_io.route())
{
let next_level = match *next_connection.from_io().io_type() {
IOType::FlowOutput if from_level > 0 => from_level - 1,
IOType::FlowOutput if from_level == 0 => usize::MAX,
IOType::FlowInput => from_level + 1,
_ => from_level,
};
if next_connection.level() == next_level {
let accumulated_source_subroute = prev_subroute.clone().extend(&subroute).clone();
match *next_connection.to_io().io_type() {
IOType::FunctionInput => {
debug!("\t\tFound destination function input at '{}'",
next_connection.to_io().route());
destinations
.push((accumulated_source_subroute, next_connection.to_io().clone()));
},
IOType::FunctionOutput => error!("Error - destination of {:?} is a functions output!",
next_connection),
IOType::FlowInput => {
debug!("\t\tFollowing connection into sub-flow via '{}'", from_io.route());
let new_destinations = &mut find_connection_destinations(
accumulated_source_subroute,
next_connection.to_io(),
next_connection.level(),
connections,
)?;
destinations.append(new_destinations);
},
IOType::FlowOutput => {
debug!("\t\tFollowing connection out of flow via '{}'", from_io.route());
let new_destinations = &mut find_connection_destinations(
accumulated_source_subroute,
next_connection.to_io(),
next_connection.level(),
connections,
)?;
destinations.append(new_destinations);
}
}
}
}
}
if destinations.is_empty() {
info!("Connection from '{}' : did not find a destination Function Input", from_io.route());
} else {
for (sub_route, destination_io) in &destinations {
DataType::compatible_types(from_io.datatypes(), destination_io.datatypes(), sub_route)
.chain_err(|| format!("Failed to connect '{}{sub_route}' to '{}' due to incompatible types",
from_io.route(), destination_io.route()))?;
}
}
Ok(destinations)
}
#[cfg(test)]
mod test {
use flowcore::model::connection::Connection;
use flowcore::model::datatype::STRING_TYPE;
use flowcore::model::io::{IO, IOType};
use flowcore::model::route::HasRoute;
use flowcore::model::route::Route;
use crate::compiler::compile::CompilerTables;
use super::collapse_connections;
#[test]
fn collapse_drops_a_useless_connections() {
let mut unused = Connection::new("/f1/a", "/f2/a");
unused
.connect(IO::new(vec!(STRING_TYPE.into()), "/f1/a"),
IO::new(vec!(STRING_TYPE.into()), "/f2/a"), 1)
.expect("Could not connect IOs");
unused.to_io_mut().set_io_type(IOType::FlowInput);
let mut tables = CompilerTables::new();
tables.connections = vec![unused];
collapse_connections(&mut tables).expect("Could not collapse connections");
assert_eq!(tables.collapsed_connections.len(), 0);
}
#[test]
fn no_collapse_of_a_loopback_connection() {
let mut only_connection = Connection::new("/function1/out", "/function1/in");
only_connection
.connect(
IO::new(vec!(STRING_TYPE.into()), "/function1/out"),
IO::new(vec!(STRING_TYPE.into()), "/function1/in"),
0,
).expect("Could not connect IOs");
only_connection.from_io_mut().set_io_type(IOType::FunctionOutput);
only_connection.to_io_mut().set_io_type(IOType::FunctionInput);
let mut tables = CompilerTables::new();
tables.connections = vec![only_connection];
collapse_connections(&mut tables).expect("Could not collapse connections");
assert_eq!(tables.collapsed_connections.len(), 1);
assert_eq!(*tables.collapsed_connections[0].from_io().route(), Route::from("/function1/out"));
assert_eq!(*tables.collapsed_connections[0].to_io().route(), Route::from("/function1/in"));
}
#[test]
fn no_collapse_of_a_direct_connection() {
let mut only_connection = Connection::new("/function1/out", "/function2/in");
only_connection
.connect(
IO::new(vec!(STRING_TYPE.into()), "/function1/out"),
IO::new(vec!(STRING_TYPE.into()), "/function2/in"),
0,
)
.expect("Could not connect IOs");
only_connection.from_io_mut().set_io_type(IOType::FunctionOutput);
only_connection.to_io_mut().set_io_type(IOType::FunctionInput);
let mut tables = CompilerTables::new();
tables.connections = vec![only_connection];
collapse_connections(&mut tables).expect("Could not collapse connections");
assert_eq!(tables.collapsed_connections.len(), 1);
assert_eq!(*tables.collapsed_connections[0].from_io().route(), Route::from("/function1/out"));
assert_eq!(*tables.collapsed_connections[0].to_io().route(), Route::from("/function2/in"));
}
#[test]
fn collapse_a_connection() {
let mut left_side = Connection::new("/function1", "/flow2/a");
left_side
.connect(
IO::new(vec!(STRING_TYPE.into()), "/function1"),
IO::new(vec!(STRING_TYPE.into()), "/flow2/a"),
0,
)
.expect("Could not connect IOs");
left_side.from_io_mut().set_io_type(IOType::FunctionOutput);
left_side.to_io_mut().set_io_type(IOType::FlowInput);
let mut extra_one = Connection::new("/flow2/a", "/flow2/f4/a");
extra_one
.connect(
IO::new(vec!(STRING_TYPE.into()), "/flow2/a"),
IO::new(vec!(STRING_TYPE.into()), "/flow2/f4/a"),
1,
)
.expect("Could not connect IOs");
extra_one.from_io_mut().set_io_type(IOType::FlowInput);
extra_one.to_io_mut().set_io_type(IOType::FlowInput);
let mut right_side = Connection::new("/flow2/a", "/flow2/function3");
right_side
.connect(
IO::new(vec!(STRING_TYPE.into()), "/flow2/a"),
IO::new(vec!(STRING_TYPE.into()), "/flow2/function3"),
1,
)
.expect("Could not connect IOs");
right_side.from_io_mut().set_io_type(IOType::FlowInput);
right_side.to_io_mut().set_io_type(IOType::FunctionInput);
let mut tables = CompilerTables::new();
tables.connections = vec![left_side, extra_one, right_side];
collapse_connections(&mut tables).expect("Could not collapse connections");
assert_eq!(tables.collapsed_connections.len(), 1);
assert_eq!(*tables.collapsed_connections[0].from_io().route(), Route::from("/function1"));
assert_eq!(*tables.collapsed_connections[0].to_io().route(), Route::from("/flow2/function3"));
}
#[test]
fn collapse_two_connections_from_flow_boundary() {
let mut left_side = Connection::new("/f1", "/f2/a");
left_side
.connect(IO::new(vec!(STRING_TYPE.into()), "/f1"),
IO::new(vec!(STRING_TYPE.into()), "/f2/a"), 0)
.expect("Could not connect IOs");
left_side.from_io_mut().set_io_type(IOType::FunctionOutput);
left_side.to_io_mut().set_io_type(IOType::FlowInput);
let mut right_side_one = Connection::new("/f2/a", "/f2/value1");
right_side_one
.connect(
IO::new(vec!(STRING_TYPE.into()), "/f2/a"),
IO::new(vec!(STRING_TYPE.into()), "/f2/value1"),
1,
)
.expect("Could not connect IOs");
right_side_one.from_io_mut().set_io_type(IOType::FlowInput);
right_side_one.to_io_mut().set_io_type(IOType::FunctionInput);
let mut right_side_two = Connection::new("/f2/a", "/f2/value2");
right_side_two
.connect(
IO::new(vec!(STRING_TYPE.into()), "/f2/a"),
IO::new(vec!(STRING_TYPE.into()), "/f2/value2"),
1,
)
.expect("Could not connect IOs");
right_side_two.from_io_mut().set_io_type(IOType::FlowInput);
right_side_two.to_io_mut().set_io_type(IOType::FunctionInput);
let mut tables = CompilerTables::new();
tables.connections = vec![left_side, right_side_one, right_side_two];
collapse_connections(&mut tables).expect("Could not collapse connections");
assert_eq!(2, tables.collapsed_connections.len());
assert_eq!(*tables.collapsed_connections[0].from_io().route(), Route::from("/f1"));
assert_eq!(*tables.collapsed_connections[0].to_io().route(), Route::from("/f2/value1"));
assert_eq!(*tables.collapsed_connections[1].from_io().route(), Route::from("/f1"));
assert_eq!(*tables.collapsed_connections[1].to_io().route(), Route::from("/f2/value2"));
}
#[test]
fn collapse_connection_into_sub_flow() {
let mut first_level = Connection::new("/function1/out", "/flow1/a");
first_level
.connect(
IO::new(vec!(STRING_TYPE.into()), "/function1/out"),
IO::new(vec!(STRING_TYPE.into()), "/flow1/a"),
0,
)
.expect("Could not connect IOs");
first_level.from_io_mut().set_io_type(IOType::FunctionOutput);
first_level.to_io_mut().set_io_type(IOType::FlowInput);
let mut second_level = Connection::new("/flow1/a", "/flow1/flow2/a");
second_level
.connect(
IO::new(vec!(STRING_TYPE.into()), "/flow1/a"),
IO::new(vec!(STRING_TYPE.into()), "/flow1/flow2/a"),
1,
)
.expect("Could not connect IOs");
second_level.from_io_mut().set_io_type(IOType::FlowInput);
second_level.to_io_mut().set_io_type(IOType::FlowInput);
let mut third_level = Connection::new("/flow1/flow2/a", "/flow1/flow2/func/in");
third_level
.connect(
IO::new(vec!(STRING_TYPE.into()), "/flow1/flow2/a"),
IO::new(vec!(STRING_TYPE.into()), "/flow1/flow2/func/in"),
2,
)
.expect("Could not connect IOs");
third_level.from_io_mut().set_io_type(IOType::FlowInput);
third_level.to_io_mut().set_io_type(IOType::FunctionInput);
let mut tables = CompilerTables::new();
tables.connections = vec![first_level, second_level, third_level];
collapse_connections(&mut tables).expect("Could not collapse connections");
assert_eq!(1, tables.collapsed_connections.len());
assert_eq!(*tables.collapsed_connections[0].from_io().route(), Route::from("/function1/out"));
assert_eq!(*tables.collapsed_connections[0].to_io().route(), Route::from("/flow1/flow2/func/in"));
}
#[test]
fn does_not_collapse_a_non_connection() {
let mut one = Connection::new("/f1/a", "/f2/a");
one.connect(IO::new(vec!(STRING_TYPE.into()), "/f1/a"),
IO::new(vec!(STRING_TYPE.into()), "/f2/a"), 1)
.expect("Could not connect IOs");
let mut other = Connection::new("/f3/a", "/f4/a");
other
.connect(IO::new(vec!(STRING_TYPE.into()), "/f3/a"),
IO::new(vec!(STRING_TYPE.into()), "/f4/a"), 1)
.expect("Could not connect IOs");
let mut tables = CompilerTables::new();
tables.connections = vec![one, other];
collapse_connections(&mut tables).expect("Could not collapse connections");
assert_eq!(tables.collapsed_connections.len(), 2);
}
}