use serde_json::json;
use crate::model::descriptor::{
DataFlowDescriptor, InputDescriptor, LinkDescriptor, OperatorDescriptor, OutputDescriptor,
SinkDescriptor, SourceDescriptor,
};
use std::{
fs::File,
io::{BufReader, Read},
};
const BASE_PATH: &str = "src/model/descriptor/tests/";
#[test]
fn test_flatten_descriptor() {
let _ = env_logger::try_init();
let path = format!(
"{}/{}/{}",
env!("CARGO_MANIFEST_DIR"),
BASE_PATH,
"data-flow.yml"
);
let file = File::open(path).expect("Could not open file");
let mut buf_reader = BufReader::new(file);
let mut yaml = String::new();
buf_reader
.read_to_string(&mut yaml)
.expect("Could not read file contents");
let descriptor = DataFlowDescriptor::from_yaml(&yaml).expect("Unexpected error");
let flatten = async_std::task::block_on(async { descriptor.flatten().await })
.expect("Unexpected error while calling `flatten`");
let expected_sources = vec![
SourceDescriptor {
id: "source-1".into(),
outputs: vec!["source-out".into()],
uri: Some("file://source.so".into()),
configuration: Some(json!({ "foo": "global-outer" })),
},
SourceDescriptor {
id: "source-2".into(),
outputs: vec!["source-out".into()],
uri: Some("file://source.so".into()),
configuration: Some(json!({ "foo": "global-outer" })),
},
SourceDescriptor {
id: "source-composite".into(),
outputs: vec![
"source-composite-out-1".into(),
"source-composite-out-2".into(),
],
uri: Some("file://source-composite.so".into()),
configuration: Some(json!({ "foo": "global-outer" })),
},
];
expected_sources.iter().for_each(|expected_source| {
assert!(
flatten.sources.contains(expected_source),
"Source missing or incorrect: \n\n (expected) {:?} \n\n {:?}",
expected_source,
flatten
.sources
.iter()
.find(|source| source.id == expected_source.id)
)
});
assert_eq!(expected_sources.len(), flatten.sources.len());
let expected_operators = vec![
OperatorDescriptor {
id: "operator-1".into(),
inputs: vec!["operator-in".into()],
outputs: vec!["operator-out".into()],
uri: Some("file://operator.so".into()),
configuration: Some(json!({ "foo": "global-outer" })),
},
OperatorDescriptor {
id: "operator-2".into(),
inputs: vec!["operator-in".into()],
outputs: vec!["operator-out".into()],
uri: Some("file://operator.so".into()),
configuration: Some(json!({ "foo": "global-outer" })),
},
OperatorDescriptor {
id: "operator-composite/sub-operator-1".into(),
inputs: vec!["sub-operator-1-in-1".into(), "sub-operator-1-in-2".into()],
outputs: vec!["sub-operator-1-out".into()],
uri: Some("file://sub-operator-1.so".into()),
configuration: Some(
json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer" }),
),
},
OperatorDescriptor {
id: "operator-composite/sub-operator-composite/sub-sub-operator-1".into(),
inputs: vec!["sub-sub-operator-1-in".into()],
outputs: vec!["sub-sub-operator-1-out".into()],
uri: Some("file://sub-sub-operator-1.so".into()),
configuration: Some(
json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer", "buzz": "composite-inner", "baz": "leaf" }),
),
},
OperatorDescriptor {
id: "operator-composite/sub-operator-composite/sub-sub-operator-2".into(),
inputs: vec!["sub-sub-operator-2-in".into()],
outputs: vec!["sub-sub-operator-2-out".into()],
uri: Some("file://sub-sub-operator-2.so".into()),
configuration: Some(
json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer", "buzz": "composite-inner" }),
),
},
OperatorDescriptor {
id: "operator-composite/sub-operator-2".into(),
inputs: vec!["sub-operator-2-in".into()],
outputs: vec!["sub-operator-2-out-1".into(), "sub-operator-2-out-2".into()],
uri: Some("file://sub-operator-2.so".into()),
configuration: Some(
json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer" }),
),
},
];
expected_operators.iter().for_each(|expected_operator| {
assert!(
flatten.operators.contains(expected_operator),
"Operator missing or incorrect: \n\n (expected) {:?} \n\n (found) {:?} \n\n(operators): {:?}\n\n",
expected_operator,
flatten
.operators
.iter()
.find(|operator| operator.id == expected_operator.id),
flatten.operators
)
});
assert_eq!(expected_operators.len(), flatten.operators.len());
let expected_sinks = vec![
SinkDescriptor {
id: "sink-1".into(),
inputs: vec!["sink-in".into()],
uri: Some("file://sink.so".into()),
configuration: Some(json!({ "foo": "global-outer" })),
},
SinkDescriptor {
id: "sink-2".into(),
inputs: vec!["sink-in".into()],
uri: Some("file://sink.so".into()),
configuration: Some(json!({ "foo": "global-outer" })),
},
SinkDescriptor {
id: "sink-composite".into(),
inputs: vec!["sink-composite-in-1".into(), "sink-composite-in-2".into()],
uri: Some("file://sink-composite.so".into()),
configuration: Some(json!({ "foo": "global-outer" })),
},
];
expected_sinks.iter().for_each(|expected_sink| {
assert!(
flatten.sinks.contains(expected_sink),
"Sink missing or incorrect: \n\n (expected) {:?} \n\n {:?}",
expected_sink,
flatten.sinks
)
});
assert_eq!(expected_sinks.len(), flatten.sinks.len());
let expected_links = vec![
LinkDescriptor::new(
OutputDescriptor::new("source-1", "source-out"),
InputDescriptor::new("operator-1", "operator-in"),
),
LinkDescriptor::new(
OutputDescriptor::new("operator-1", "operator-out"),
InputDescriptor::new("sink-1", "sink-in"),
),
LinkDescriptor::new(
OutputDescriptor::new("source-2", "source-out"),
InputDescriptor::new("operator-2", "operator-in"),
),
LinkDescriptor::new(
OutputDescriptor::new("operator-2", "operator-out"),
InputDescriptor::new("sink-2", "sink-in"),
),
LinkDescriptor::new(
OutputDescriptor::new("source-composite", "source-composite-out-1"),
InputDescriptor::new("operator-composite/sub-operator-1", "sub-operator-1-in-1"),
),
LinkDescriptor::new(
OutputDescriptor::new("source-composite", "source-composite-out-2"),
InputDescriptor::new("operator-composite/sub-operator-1", "sub-operator-1-in-2"),
),
LinkDescriptor::new(
OutputDescriptor::new("operator-composite/sub-operator-2", "sub-operator-2-out-1"),
InputDescriptor::new("sink-composite", "sink-composite-in-1"),
),
LinkDescriptor::new(
OutputDescriptor::new("operator-composite/sub-operator-2", "sub-operator-2-out-2"),
InputDescriptor::new("sink-composite", "sink-composite-in-2"),
),
LinkDescriptor::new(
OutputDescriptor::new("operator-composite/sub-operator-1", "sub-operator-1-out"),
InputDescriptor::new(
"operator-composite/sub-operator-composite/sub-sub-operator-1",
"sub-sub-operator-1-in",
),
),
LinkDescriptor::new(
OutputDescriptor::new(
"operator-composite/sub-operator-composite/sub-sub-operator-2",
"sub-sub-operator-2-out",
),
InputDescriptor::new("operator-composite/sub-operator-2", "sub-operator-2-in"),
),
LinkDescriptor::new(
OutputDescriptor::new(
"operator-composite/sub-operator-composite/sub-sub-operator-1",
"sub-sub-operator-1-out",
),
InputDescriptor::new(
"operator-composite/sub-operator-composite/sub-sub-operator-2",
"sub-sub-operator-2-in",
),
),
];
expected_links.iter().for_each(|expected_link| {
assert!(
flatten.links.contains(expected_link),
"Link missing or incorrect: \n\n (expected) {:?} \n\n {:?}",
expected_link,
flatten.links
)
});
assert_eq!(expected_links.len(), flatten.links.len());
}
#[test]
fn test_detect_recursion() {
let _ = env_logger::try_init();
let path = format!(
"{}/{}/{}",
env!("CARGO_MANIFEST_DIR"),
BASE_PATH,
"data-flow-recursion.yml"
);
let file = File::open(path).expect("Could not open file");
let mut buf_reader = BufReader::new(file);
let mut yaml = String::new();
buf_reader
.read_to_string(&mut yaml)
.expect("Could not read file contents");
let descriptor = DataFlowDescriptor::from_yaml(&yaml).expect("Unexpected error");
let res_flatten = async_std::task::block_on(async { descriptor.flatten().await });
assert!(res_flatten.is_err());
}
#[test]
fn test_duplicate_composite_at_same_level_not_detected_as_recursion() {
let _ = env_logger::try_init();
let path = format!(
"{}/{}/{}",
env!("CARGO_MANIFEST_DIR"),
BASE_PATH,
"data-flow-recursion-duplicate-composite.yml"
);
let file = File::open(path).expect("Could not open file");
let mut buf_reader = BufReader::new(file);
let mut yaml = String::new();
buf_reader
.read_to_string(&mut yaml)
.expect("Could not read file contents");
let descriptor = DataFlowDescriptor::from_yaml(&yaml).expect("Unexpected error");
assert!(async_std::task::block_on(async { descriptor.flatten().await }).is_ok());
}