use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::hash::{DefaultHasher, Hash, Hasher};
use hydro_lang::ir::{HydroLeaf, HydroNode, traverse_dfir};
use hydro_lang::location::LocationId;
use syn::visit::Visit;
use super::rewrites::{NetworkType, get_network_type, relevant_inputs};
use crate::partition_syn_analysis::{AnalyzeClosure, StructOrTuple, StructOrTupleIndex};
struct InputMetadata {
location: LocationId,
inputs: BTreeSet<usize>, input_parents: BTreeMap<usize, usize>,
}
fn input_analysis_node(
node: &mut HydroNode,
next_stmt_id: &mut usize,
metadata: &mut InputMetadata,
) {
match get_network_type(node, metadata.location.root().raw_id()) {
Some(NetworkType::Recv) | Some(NetworkType::SendRecv) => {
metadata.inputs.insert(*next_stmt_id);
metadata.input_parents.insert(
*next_stmt_id,
node.input_metadata().first().unwrap().id.unwrap(),
);
}
_ => {}
}
}
fn input_analysis(
ir: &mut [HydroLeaf],
location: &LocationId,
) -> (BTreeSet<usize>, BTreeMap<usize, usize>) {
let mut input_metadata = InputMetadata {
location: location.clone(),
inputs: BTreeSet::new(),
input_parents: BTreeMap::new(),
};
traverse_dfir(
ir,
|_, _| {},
|node, next_stmt_id| {
input_analysis_node(node, next_stmt_id, &mut input_metadata);
},
);
(input_metadata.inputs, input_metadata.input_parents)
}
pub struct InputDependencyMetadata {
pub location: LocationId,
pub inputs: BTreeSet<usize>,
pub input_parents: BTreeMap<usize, usize>,
pub optimistic_phase: bool,
pub input_taint: BTreeMap<usize, BTreeSet<usize>>,
pub input_dependencies: BTreeMap<usize, BTreeMap<usize, StructOrTuple>>,
pub syn_analysis: BTreeMap<usize, StructOrTuple>,
}
impl Hash for InputDependencyMetadata {
fn hash<H: Hasher>(&self, state: &mut H) {
self.input_taint.hash(state);
self.input_dependencies.hash(state);
}
}
fn union(
tuple1: Option<&StructOrTuple>,
tuple1index: &StructOrTupleIndex,
tuple2: Option<&StructOrTuple>,
tuple2index: &StructOrTupleIndex,
) -> Option<StructOrTuple> {
if let (Some(t1), Some(t2)) = (tuple1, tuple2) {
if let Some(t1_child) = t1.get_dependencies(tuple1index) {
if let Some(t2_child) = t2.get_dependencies(tuple2index) {
return StructOrTuple::union(&t1_child, &t2_child);
}
}
}
None
}
fn input_dependency_analysis_node(
node: &mut HydroNode,
next_stmt_id: &mut usize,
metadata: &mut InputDependencyMetadata,
cycle_source_to_sink_input: &HashMap<usize, usize>,
) {
if metadata.location != *node.metadata().location_kind.root() {
return;
}
println!("Analyzing node {} {:?}", next_stmt_id, node.print_root());
let parent_ids = match node {
HydroNode::CycleSource { .. } => {
vec![*cycle_source_to_sink_input.get(next_stmt_id).unwrap()]
}
HydroNode::Tee { inner, .. } => {
vec![inner.0.borrow().metadata().id.unwrap()]
}
_ => relevant_inputs(node.input_metadata(), &metadata.location),
};
let InputDependencyMetadata {
inputs,
optimistic_phase,
input_taint,
input_dependencies,
syn_analysis,
..
} = metadata;
let mut parent_input_dependencies: BTreeMap<usize, BTreeMap<usize, StructOrTuple>> =
BTreeMap::new(); let mut parent_taints: BTreeMap<usize, BTreeSet<usize>> = BTreeMap::new(); for (index, parent_id) in parent_ids.iter().enumerate() {
if inputs.contains(parent_id) {
input_taint
.entry(*next_stmt_id)
.or_default()
.insert(*parent_id);
parent_taints.entry(*parent_id).or_default().insert(index);
parent_input_dependencies
.entry(*parent_id)
.or_default()
.insert(index, StructOrTuple::new_completely_dependent());
} else if let Some(existing_parent_taints) = input_taint.get(parent_id).cloned() {
for input in &existing_parent_taints {
parent_taints.entry(*input).or_default().insert(index);
}
input_taint
.entry(*next_stmt_id)
.or_default()
.extend(existing_parent_taints);
if let Some(parent_dependencies) = input_dependencies.get(parent_id) {
for (input_id, parent_dependencies_on_input) in parent_dependencies {
parent_input_dependencies
.entry(*input_id)
.or_default()
.insert(index, parent_dependencies_on_input.clone());
}
}
}
match node {
HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
break;
}
_ => {}
}
}
println!("Parents of node {}: {:?}", next_stmt_id, parent_ids);
println!(
"Parent input dependencies for node {}: {:?}",
next_stmt_id, parent_input_dependencies
);
let input_taint_entry = input_taint.entry(*next_stmt_id).or_default();
let input_dependencies_entry = input_dependencies.entry(*next_stmt_id).or_default();
match &node {
HydroNode::CycleSource { .. }
| HydroNode::Tee { .. }
| HydroNode::Persist { .. }
| HydroNode::Unpersist { .. }
| HydroNode::Delta { .. }
| HydroNode::ResolveFutures { .. }
| HydroNode::ResolveFuturesOrdered { .. }
| HydroNode::DeferTick { .. }
| HydroNode::Unique { .. }
| HydroNode::Sort { .. }
| HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } | HydroNode::Filter { .. } | HydroNode::Inspect { .. }
| HydroNode::Network { .. }
| HydroNode::ExternalInput { .. } => {
for input_id in input_taint_entry.iter() {
if let Some(parent_dependencies_on_input) = parent_input_dependencies.get(input_id) {
if let Some(parent_dependency) = parent_dependencies_on_input.get(&0) {
input_dependencies_entry.insert(*input_id, parent_dependency.clone());
continue;
}
}
input_dependencies_entry.remove(input_id);
}
}
HydroNode::Chain { .. } => {
assert_eq!(parent_ids.len(), 2, "Node {:?} has the wrong number of parents.", node);
for (input_id, parent_positions) in parent_taints {
if let Some(parent_dependencies_on_input) = parent_input_dependencies.get(&input_id) {
let num_tainted_parents = parent_positions.len();
let parent_dependency_tuples = parent_positions.into_iter().filter_map(|pos| parent_dependencies_on_input.get(&pos)).cloned().collect::<Vec<StructOrTuple>>();
if let Some(intersection) = StructOrTuple::intersect_tuples(&parent_dependency_tuples) {
if parent_dependency_tuples.len() == num_tainted_parents || *optimistic_phase {
input_dependencies_entry.insert(input_id, intersection);
continue;
}
}
}
input_dependencies_entry.remove(&input_id);
}
}
HydroNode::CrossProduct { .. }
| HydroNode::CrossSingleton { .. } => {
assert_eq!(parent_ids.len(), 2, "Node {:?} has the wrong number of parents.", node);
for input_id in input_taint_entry.iter() {
if let Some(parent_dependencies_on_input) = parent_input_dependencies.get(input_id) {
let mut new_dependency = StructOrTuple::default();
if let Some(parent1_dependency) = parent_dependencies_on_input.get(&0) {
new_dependency.set_dependencies(&vec!["0".to_string()], parent1_dependency, &vec![]);
}
if let Some(parent2_dependency) = parent_dependencies_on_input.get(&1) {
new_dependency.set_dependencies(&vec!["1".to_string()], parent2_dependency, &vec![]);
}
if !new_dependency.is_empty() {
input_dependencies_entry.insert(*input_id, new_dependency);
continue;
}
}
input_dependencies_entry.remove(input_id);
}
}
HydroNode::Join { .. } => {
assert_eq!(parent_ids.len(), 2, "Node {:?} has the wrong number of parents.", node);
for input_id in input_taint_entry.iter() {
if let Some(parent_dependencies_on_input) = parent_input_dependencies.get(input_id) {
let mut new_dependency = StructOrTuple::default();
if let Some(union) = union(parent_dependencies_on_input.get(&0), &vec!["0".to_string()], parent_dependencies_on_input.get(&1), &vec!["0".to_string()]) {
new_dependency.set_dependencies(&vec!["0".to_string()], &union, &vec![]);
}
if let Some(parent1_dependency) = parent_dependencies_on_input.get(&0) {
new_dependency.set_dependencies(&vec!["1".to_string(),"0".to_string()], parent1_dependency, &vec!["1".to_string()]);
}
if let Some(parent2_dependency) = parent_dependencies_on_input.get(&1) {
new_dependency.set_dependencies(&vec!["1".to_string(),"1".to_string()], parent2_dependency, &vec!["1".to_string()]);
}
if !new_dependency.is_empty() {
input_dependencies_entry.insert(*input_id, new_dependency);
continue;
}
}
input_dependencies_entry.remove(input_id);
}
}
HydroNode::Enumerate { .. } => {
assert_eq!(parent_ids.len(), 1, "Node {:?} has the wrong number of parents.", node);
for input_id in input_taint_entry.iter() {
if let Some(parent_dependencies_on_input) = parent_input_dependencies.get(input_id) {
if let Some(parent_dependency) = parent_dependencies_on_input.get(&0) {
let mut new_dependency = StructOrTuple::default();
new_dependency.set_dependencies(&vec!["1".to_string()], parent_dependency, &vec![]);
input_dependencies_entry.insert(*input_id, new_dependency);
continue;
}
}
input_dependencies_entry.remove(input_id);
}
}
HydroNode::Map { f, .. }
| HydroNode::FilterMap { f, .. } => {
assert_eq!(parent_ids.len(), 1, "Node {:?} has the wrong number of parents.", node);
let syn_analysis_results = syn_analysis.entry(*next_stmt_id).or_insert_with(|| {
let mut analyzer = AnalyzeClosure::default();
analyzer.visit_expr(&f.0);
let keep_topmost_none_fields = matches!(node, HydroNode::FilterMap { .. });
analyzer.output_dependencies.remove_none_fields(keep_topmost_none_fields).unwrap_or_default()
});
for input_id in input_taint_entry.iter() {
if let Some(parent_dependencies_on_input) = parent_input_dependencies.get(input_id) {
if let Some(parent_dependency) = parent_dependencies_on_input.get(&0) {
if let Some(projected_dependencies) = StructOrTuple::project_parent(parent_dependency, syn_analysis_results) {
println!("Node {:?} input {:?} has projected dependencies: {:?}", next_stmt_id, input_id, projected_dependencies);
input_dependencies_entry.insert(*input_id, projected_dependencies);
continue;
}
}
}
input_dependencies_entry.remove(input_id);
}
}
HydroNode::ReduceKeyed { .. }
| HydroNode::FoldKeyed { .. } => {
assert_eq!(parent_ids.len(), 1, "Node {:?} has the wrong number of parents.", node);
for input_id in input_taint_entry.iter() {
if let Some(parent_dependencies_on_input) = parent_input_dependencies.get(input_id) {
if let Some(parent_dependency) = parent_dependencies_on_input.get(&0) {
let mut new_dependency = StructOrTuple::default();
new_dependency.set_dependencies(&vec!["0".to_string()], parent_dependency, &vec!["0".to_string()]);
input_dependencies_entry.insert(*input_id, new_dependency);
continue;
}
}
input_dependencies_entry.remove(input_id);
}
}
HydroNode::Reduce { .. }
| HydroNode::Fold { .. }
| HydroNode::Scan { .. }
| HydroNode::FlatMap { .. }
| HydroNode::Source { .. } => {
input_dependencies_entry.clear();
}
HydroNode::Placeholder
| HydroNode::Counter { .. } => {
panic!("Unexpected node type {:?} in input dependency analysis.", node);
}
}
println!(
"Input dependencies for node {}: {:?}",
next_stmt_id, input_dependencies_entry
);
}
fn input_dependency_analysis(
ir: &mut [HydroLeaf],
location: &LocationId,
cycle_source_to_sink_input: &HashMap<usize, usize>,
) -> InputDependencyMetadata {
let (inputs, input_parents) = input_analysis(ir, location);
let mut metadata = InputDependencyMetadata {
location: location.clone(),
inputs,
input_parents,
optimistic_phase: true,
input_taint: BTreeMap::new(),
input_dependencies: BTreeMap::new(),
syn_analysis: BTreeMap::new(),
};
println!("\nBegin input dependency analysis");
let mut num_iters = 0;
let mut prev_hash = None;
loop {
println!("Input dependency analysis iteration {}", num_iters);
traverse_dfir(
ir,
|_, _| {}, |node, next_stmt_id| {
input_dependency_analysis_node(
node,
next_stmt_id,
&mut metadata,
cycle_source_to_sink_input,
);
},
);
let mut hasher = DefaultHasher::new();
metadata.hash(&mut hasher);
let hash = hasher.finish();
if let Some(prev) = prev_hash {
if prev == hash {
if metadata.optimistic_phase {
println!("Optimistic phase reached fixpoint, starting pessimistic phase");
metadata.optimistic_phase = false;
} else {
break;
}
}
}
prev_hash = Some(hash);
num_iters += 1;
}
metadata
}
fn get_inputs_and_dependencies(
input_taint: &BTreeMap<usize, BTreeSet<usize>>,
input_dependencies: &BTreeMap<usize, BTreeMap<usize, StructOrTuple>>,
id: usize,
index: &StructOrTupleIndex,
) -> Option<(Vec<usize>, Vec<StructOrTuple>)> {
let mut ordered_input = vec![];
let mut ordered_dependencies = vec![];
if let Some(taints) = input_taint.get(&id) {
for input in taints {
if let Some(dependency) = input_dependencies
.get(&id)
.and_then(|map| map.get(input))
.and_then(|tuple| tuple.get_dependencies(index))
{
ordered_input.push(*input);
ordered_dependencies.push(dependency);
} else {
return None;
}
}
}
Some((ordered_input, ordered_dependencies))
}
fn partitioning_constraint_analysis_node(
node: &mut HydroNode,
next_stmt_id: &mut usize,
dependency_metadata: &InputDependencyMetadata,
possible_partitionings: &mut BTreeMap<usize, BTreeSet<BTreeMap<usize, StructOrTupleIndex>>>,
) {
let InputDependencyMetadata {
location,
input_taint,
input_dependencies,
..
} = dependency_metadata;
if input_taint.contains_key(next_stmt_id) {
let parent_ids = relevant_inputs(node.input_metadata(), location);
if parent_ids
.iter()
.map(|id| input_taint.get(id))
.any(|taint| taint.is_none())
{
return;
}
let mut ordered_inputs = vec![];
let mut ordered_dependencies = vec![];
match node {
HydroNode::Difference { .. } => {
if let Some((parent0_inputs, parent0_dependencies)) = get_inputs_and_dependencies(
input_taint,
input_dependencies,
parent_ids[0],
&vec![],
) {
if let Some((parent1_inputs, parent1_dependencies)) =
get_inputs_and_dependencies(
input_taint,
input_dependencies,
parent_ids[1],
&vec![],
)
{
ordered_inputs.extend(parent0_inputs);
ordered_dependencies.extend(parent0_dependencies);
ordered_inputs.extend(parent1_inputs);
ordered_dependencies.extend(parent1_dependencies);
}
}
}
HydroNode::AntiJoin { .. } => {
if let Some((parent0_inputs, parent0_dependencies)) = get_inputs_and_dependencies(
input_taint,
input_dependencies,
parent_ids[0],
&vec!["0".to_string()],
) {
if let Some((parent1_inputs, parent1_dependencies)) =
get_inputs_and_dependencies(
input_taint,
input_dependencies,
parent_ids[1],
&vec![],
)
{
ordered_inputs.extend(parent0_inputs);
ordered_dependencies.extend(parent0_dependencies);
ordered_inputs.extend(parent1_inputs);
ordered_dependencies.extend(parent1_dependencies);
}
}
}
HydroNode::Join { .. } => {
if let Some((parent0_inputs, parent0_dependencies)) = get_inputs_and_dependencies(
input_taint,
input_dependencies,
parent_ids[0],
&vec!["0".to_string()],
) {
if let Some((parent1_inputs, parent1_dependencies)) =
get_inputs_and_dependencies(
input_taint,
input_dependencies,
parent_ids[1],
&vec!["0".to_string()],
)
{
ordered_inputs.extend(parent0_inputs);
ordered_dependencies.extend(parent0_dependencies);
ordered_inputs.extend(parent1_inputs);
ordered_dependencies.extend(parent1_dependencies);
}
}
}
HydroNode::ReduceKeyed { .. } | HydroNode::FoldKeyed { .. } => {
if let Some((inputs, dependencies)) = get_inputs_and_dependencies(
input_taint,
input_dependencies,
*next_stmt_id,
&vec![],
) {
ordered_inputs.extend(inputs);
ordered_dependencies.extend(dependencies);
}
}
HydroNode::Reduce { .. }
| HydroNode::Fold { .. }
| HydroNode::Enumerate { .. }
| HydroNode::CrossProduct { .. }
| HydroNode::CrossSingleton { .. } => {} _ => {
return;
}
}
let intersection =
StructOrTuple::intersect_dependencies_with_matching_fields(&ordered_dependencies);
let mut possible_partitionings_for_node = BTreeSet::new();
for possible_set in intersection {
let mut possible_set_mapped_to_input = BTreeMap::new();
for (pos, dependency) in possible_set.iter().enumerate() {
let input = ordered_inputs.get(pos).unwrap();
possible_set_mapped_to_input.insert(*input, dependency.clone());
}
possible_partitionings_for_node.insert(possible_set_mapped_to_input);
}
possible_partitionings.insert(*next_stmt_id, possible_partitionings_for_node);
}
}
#[expect(clippy::type_complexity, reason = "internal optimization code")]
pub fn partitioning_analysis(
ir: &mut [HydroLeaf],
location: &LocationId,
cycle_source_to_sink_input: &HashMap<usize, usize>,
) -> Option<(
Vec<BTreeMap<usize, StructOrTupleIndex>>,
BTreeMap<usize, usize>,
)> {
let dependency_metadata = input_dependency_analysis(ir, location, cycle_source_to_sink_input);
let mut possible_partitionings = BTreeMap::new();
println!("\nBegin partitioning constraint analysis");
traverse_dfir(
ir,
|_, _| {},
|node, next_op_id| {
partitioning_constraint_analysis_node(
node,
next_op_id,
&dependency_metadata,
&mut possible_partitionings,
);
},
);
for (op_id, partitioning_options) in &possible_partitionings {
println!(
"Partitioning options for op {}: {:?}",
op_id, partitioning_options
);
}
println!("\nBegin partitioning analysis");
let mut partitioning_impossible = false;
for (op_id, partitioning_options) in &possible_partitionings {
if partitioning_options.is_empty() {
println!("No partitioning possible due to op_id {}", op_id);
partitioning_impossible = true;
}
}
if partitioning_impossible {
return None;
}
let mut prev_global_partitionings = vec![]; let mut next_global_partitionings = vec![];
if let Some((op_id, partitioning_options)) = possible_partitionings.pop_first() {
for partitioning in partitioning_options {
assert!(
!partitioning.is_empty(),
"Op {} has partitioning constraints yet no specific input fields for those constraints",
op_id
);
prev_global_partitionings.push(partitioning.clone());
}
} else {
println!("No restrictions on partitioning");
return Some((Vec::new(), dependency_metadata.input_parents));
}
for (_op_id, partitioning_options) in possible_partitionings {
for constraint in partitioning_options {
for prev_partitioning in &prev_global_partitionings {
let mut next_partitioning = BTreeMap::new();
let mut constraint_satisfiable = true;
for (input, tuple_index) in prev_partitioning {
if let Some(constraint_index) = constraint.get(input) {
if let Some(index_intersection) =
StructOrTuple::index_intersection(tuple_index, constraint_index)
{
next_partitioning.insert(*input, index_intersection);
} else {
constraint_satisfiable = false;
break;
}
} else {
next_partitioning.insert(*input, tuple_index.clone());
}
}
for (input, constraint_index) in &constraint {
if !next_partitioning.contains_key(input) {
next_partitioning.insert(*input, constraint_index.clone());
}
}
if constraint_satisfiable {
next_global_partitionings.push(next_partitioning);
}
}
}
prev_global_partitionings = std::mem::take(&mut next_global_partitionings);
}
println!(
"Found {} possible global partitionings",
prev_global_partitionings.len()
);
for (partitioning_index, partitioning) in prev_global_partitionings.iter().enumerate() {
println!("Partitioning {}: {:?}", partitioning_index, partitioning);
}
if prev_global_partitionings.is_empty() {
return None; }
Some((prev_global_partitionings, dependency_metadata.input_parents))
}
#[expect(clippy::type_complexity, reason = "internal optimization code")]
pub fn nodes_to_partition(
analysis_results: Option<(
Vec<BTreeMap<usize, StructOrTupleIndex>>,
BTreeMap<usize, usize>,
)>,
) -> Option<HashMap<usize, StructOrTupleIndex>> {
if let Some((possible_partitionings, input_parents)) = analysis_results {
let default_partitioning = BTreeMap::new();
let partitioning = possible_partitionings
.first()
.unwrap_or(&default_partitioning);
let mut nodes_to_partition = HashMap::new();
for (input, parent) in input_parents {
if let Some(partition_index) = partitioning.get(&input) {
nodes_to_partition.insert(parent, partition_index.clone());
} else {
nodes_to_partition.insert(parent, vec![]);
}
}
return Some(nodes_to_partition);
}
None
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet, HashMap};
use hydro_lang::deploy::HydroDeploy;
use hydro_lang::ir::deep_clone;
use hydro_lang::location::LocationId;
use hydro_lang::rewrites::persist_pullup::persist_pullup;
use hydro_lang::{Bounded, FlowBuilder, Location, NoOrder, Stream};
use stageleft::q;
use crate::partition_node_analysis::{
InputDependencyMetadata, input_dependency_analysis, partitioning_analysis,
};
use crate::partition_syn_analysis::{StructOrTuple, StructOrTupleIndex};
use crate::repair::{cycle_source_to_sink_input, inject_id, inject_location};
fn test_input(
builder: FlowBuilder<'_>,
location: LocationId,
expected_taint: BTreeMap<usize, BTreeSet<usize>>,
expected_dependencies: BTreeMap<usize, BTreeMap<usize, StructOrTuple>>,
) {
let mut cycle_data = HashMap::new();
let built = builder
.optimize_with(persist_pullup)
.optimize_with(inject_id)
.optimize_with(|ir| {
cycle_data = cycle_source_to_sink_input(ir);
inject_location(ir, &cycle_data);
})
.into_deploy::<HydroDeploy>();
let mut ir = deep_clone(built.ir());
let InputDependencyMetadata {
input_taint: actual_taint,
input_dependencies: actual_dependencies,
..
} = input_dependency_analysis(&mut ir, &location, &cycle_data);
println!("Actual taint: {:?}", actual_taint);
println!("Actual dependencies: {:?}", actual_dependencies);
assert_eq!(actual_taint, expected_taint);
assert_eq!(actual_dependencies, expected_dependencies);
}
fn test_input_partitionable(
builder: FlowBuilder<'_>,
location: LocationId,
expected_partitionings: Option<Vec<BTreeMap<usize, StructOrTupleIndex>>>,
) {
let mut cycle_data = HashMap::new();
let built = builder
.optimize_with(persist_pullup)
.optimize_with(inject_id)
.optimize_with(|ir| {
cycle_data = cycle_source_to_sink_input(ir);
inject_location(ir, &cycle_data);
})
.into_deploy::<HydroDeploy>();
let mut ir = deep_clone(built.ir());
let partitioning = partitioning_analysis(&mut ir, &location, &cycle_data);
if expected_partitionings.is_none() {
assert!(partitioning.is_none());
return;
}
assert_eq!(partitioning.unwrap().0, expected_partitionings.unwrap());
}
#[test]
fn test_map() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.map(q!(|(a, b)| (b, a + 2)))
.for_each(q!(|(b, a2)| {
println!("b: {}, a+2: {}", b, a2);
}));
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut map_expected_dependencies = StructOrTuple::default();
map_expected_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies)])),
(4, BTreeMap::from([(2, map_expected_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_map_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.map(q!(|(a, b)| (b, a + 2)))
.for_each(q!(|(b, a2)| {
println!("b: {}, a+2: {}", b, a2);
}));
let expected_partitionings = Some(Vec::new());
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_map_complex() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
cluster1
.source_iter(q!([(1, (2, (3, 4)))]))
.broadcast_bincode_anonymous(&cluster2)
.map(q!(|(a, b)| (b.1, a, b.0 - a)))
.map(q!(|(b1, _a, b0a)| (b0a, b1.0)))
.for_each(q!(|(b0a, b10)| {
println!("b.0 - a: {}, b.1.0: {}", b0a, b10);
}));
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), (5, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut map1_expected_dependencies = StructOrTuple::default();
map1_expected_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
map1_expected_dependencies.add_dependency(
&vec!["1".to_string()],
vec!["1".to_string(), "0".to_string()],
);
let mut map2_expected_dependencies = StructOrTuple::default();
map2_expected_dependencies.add_dependency(
&vec!["1".to_string()],
vec![
"1".to_string(),
"1".to_string(),
"1".to_string(),
"0".to_string(),
],
);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies)])),
(4, BTreeMap::from([(2, map1_expected_dependencies)])),
(5, BTreeMap::from([(2, map2_expected_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_filter_map() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.filter_map(q!(|(a, b)| { if a > 1 { Some((b, a + 2)) } else { None } }))
.for_each(q!(|(b, a2)| {
println!("b: {}, a+2: {}", b, a2);
}));
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut map_expected_dependencies = StructOrTuple::default();
map_expected_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies)])),
(4, BTreeMap::from([(2, map_expected_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_filter_map_remove_none() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.filter_map(q!(|(a, b)| {
if a > 1 {
Some((None, a + 2))
} else if a < 1 {
Some((Some(b), a + 2))
} else {
None
}
}))
.for_each(q!(|(none, a2)| {
println!("None: {:?}, a+2: {}", none, a2);
}));
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies)])),
(4, BTreeMap::new()),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_delta() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
unsafe {
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.tick_batch(&cluster2.tick())
.delta()
.all_ticks()
.for_each(q!(|(a, b)| {
println!("a: {}, b: {}", a, b);
}));
}
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(4, BTreeMap::from([(2, implicit_map_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_delta_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
unsafe {
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.tick_batch(&cluster2.tick())
.delta()
.all_ticks()
.for_each(q!(|(a, b)| {
println!("a: {}, b: {}", a, b);
}));
}
let expected_partitionings = Some(Vec::new()); test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_chain() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, (2, 3))]))
.broadcast_bincode_anonymous(&cluster2);
let stream1 = input.clone().map(q!(|(a, b)| (b, a + 2)));
let stream2 = input.map(q!(|(a, b)| ((b.1, b.1), a + 3)));
let tick = cluster2.tick();
unsafe {
stream2
.tick_batch(&tick)
.chain(stream1.tick_batch(&tick))
.all_ticks()
.for_each(q!(|((x, b1), y)| {
println!("x: {}, b.1: {}, y: {}", x, b1, y);
}));
}
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), (5, BTreeSet::from([2])), (6, BTreeSet::from([2])), (7, BTreeSet::from([2])), (8, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut stream1_map_dependencies = StructOrTuple::default();
stream1_map_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let mut stream2_map_dependencies = StructOrTuple::default();
stream2_map_dependencies.add_dependency(
&vec!["0".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
stream2_map_dependencies.add_dependency(
&vec!["0".to_string(), "1".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
let mut chain_dependencies = StructOrTuple::default();
chain_dependencies.add_dependency(
&vec!["0".to_string(), "1".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(4, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(5, BTreeMap::from([(2, stream2_map_dependencies)])),
(6, BTreeMap::from([(2, implicit_map_dependencies)])),
(7, BTreeMap::from([(2, stream1_map_dependencies)])),
(8, BTreeMap::from([(2, chain_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_chain_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, (2, 3))]))
.broadcast_bincode_anonymous(&cluster2);
let stream1 = input.clone().map(q!(|(a, b)| (b, a + 2)));
let stream2 = input.map(q!(|(a, b)| ((b.1, b.1), a + 3)));
let tick = cluster2.tick();
unsafe {
stream2
.tick_batch(&tick)
.chain(stream1.tick_batch(&tick))
.all_ticks()
.for_each(q!(|((x, b1), y)| {
println!("x: {}, b.1: {}, y: {}", x, b1, y);
}));
}
let expected_partitionings = Some(Vec::new()); test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_cross_product() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, (2, 3))]))
.broadcast_bincode_anonymous(&cluster2);
let stream1 = input.clone().map(q!(|(a, b)| (b, a + 2)));
let stream2 = input.map(q!(|(a, b)| ((b.1, b.1), a + 3)));
let tick = cluster2.tick();
unsafe {
stream2
.tick_batch(&tick)
.cross_product(stream1.tick_batch(&tick))
.all_ticks()
.for_each(q!(|(((b1, b1_again), a3), (b, a2))| {
println!("((({}, {}), {}), ({:?}, {}))", b1, b1_again, a3, b, a2);
}));
}
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), (5, BTreeSet::from([2])), (6, BTreeSet::from([2])), (7, BTreeSet::from([2])), (8, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut stream1_map_dependencies = StructOrTuple::default();
stream1_map_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let mut stream2_map_dependencies = StructOrTuple::default();
stream2_map_dependencies.add_dependency(
&vec!["0".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
stream2_map_dependencies.add_dependency(
&vec!["0".to_string(), "1".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
let mut cross_product_dependencies = StructOrTuple::default();
cross_product_dependencies.add_dependency(
&vec!["0".to_string(), "0".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
cross_product_dependencies.add_dependency(
&vec!["0".to_string(), "0".to_string(), "1".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
cross_product_dependencies.add_dependency(
&vec!["1".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(4, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(5, BTreeMap::from([(2, stream2_map_dependencies)])),
(6, BTreeMap::from([(2, implicit_map_dependencies)])),
(7, BTreeMap::from([(2, stream1_map_dependencies)])),
(8, BTreeMap::from([(2, cross_product_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_cross_product_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, (2, 3))]))
.broadcast_bincode_anonymous(&cluster2);
let stream1 = input.clone().map(q!(|(a, b)| (b, a + 2)));
let stream2 = input.map(q!(|(a, b)| ((b.1, b.1), a + 3)));
let tick = cluster2.tick();
unsafe {
stream2
.tick_batch(&tick)
.cross_product(stream1.tick_batch(&tick))
.all_ticks()
.for_each(q!(|(((b1, b1_again), a3), (b, a2))| {
println!("((({}, {}), {}), ({:?}, {}))", b1, b1_again, a3, b, a2);
}));
}
let expected_partitionings = None;
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_join() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, (2, 3))]))
.broadcast_bincode_anonymous(&cluster2);
let stream1 = input.clone().map(q!(|(a, b)| (b, a)));
let stream2 = input.map(q!(|(a, b)| ((b.1, b.1), a + 3)));
let tick = cluster2.tick();
unsafe {
stream2
.tick_batch(&tick)
.join(stream1.tick_batch(&tick))
.all_ticks()
.for_each(q!(|((b1, b1_again), (a3, a))| {
println!("(({}, {}), {}, {})", b1, b1_again, a3, a);
}));
}
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), (5, BTreeSet::from([2])), (6, BTreeSet::from([2])), (7, BTreeSet::from([2])), (8, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut stream1_map_dependencies = StructOrTuple::default();
stream1_map_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
stream1_map_dependencies.add_dependency(
&vec!["1".to_string()],
vec!["1".to_string(), "0".to_string()],
);
let mut stream2_map_dependencies = StructOrTuple::default();
stream2_map_dependencies.add_dependency(
&vec!["0".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
stream2_map_dependencies.add_dependency(
&vec!["0".to_string(), "1".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
let mut join_dependencies = StructOrTuple::default();
join_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
join_dependencies.add_dependency(
&vec!["0".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string(), "0".to_string()],
); join_dependencies.add_dependency(
&vec!["0".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
join_dependencies.add_dependency(
&vec!["0".to_string(), "1".to_string()],
vec!["1".to_string(), "1".to_string(), "1".to_string()],
);
join_dependencies.add_dependency(
&vec!["1".to_string(), "1".to_string()],
vec!["1".to_string(), "0".to_string()],
);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(4, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(5, BTreeMap::from([(2, stream2_map_dependencies)])),
(6, BTreeMap::from([(2, implicit_map_dependencies)])),
(7, BTreeMap::from([(2, stream1_map_dependencies)])),
(8, BTreeMap::from([(2, join_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_join_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, (2, 3))]))
.broadcast_bincode_anonymous(&cluster2);
let stream1 = input.clone().map(q!(|(a, b)| (b, a)));
let stream2 = input.map(q!(|(a, b)| ((b.1, b.1), a + 3)));
let tick = cluster2.tick();
unsafe {
stream2
.tick_batch(&tick)
.join(stream1.tick_batch(&tick))
.all_ticks()
.for_each(q!(|((b1, b1_again), (a3, a))| {
println!("(({}, {}), {}, {})", b1, b1_again, a3, a);
}));
}
let expected_partitionings = Some(vec![
BTreeMap::from([(2, vec!["1".to_string(), "1".to_string(), "0".to_string()])]),
BTreeMap::from([(2, vec!["1".to_string(), "1".to_string(), "1".to_string()])]),
]);
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_enumerate() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
unsafe {
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.assume_ordering()
.enumerate()
.for_each(q!(|(i, (a, b))| {
println!("i: {}, a: {}, b: {}", i, a, b);
}));
}
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut enumerate_expected_dependencies = StructOrTuple::default();
enumerate_expected_dependencies
.add_dependency(&vec!["1".to_string()], vec!["1".to_string()]);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies)])),
(4, BTreeMap::from([(2, enumerate_expected_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_enumerate_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
unsafe {
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.assume_ordering()
.enumerate()
.for_each(q!(|(i, (a, b))| {
println!("i: {}, a: {}, b: {}", i, a, b);
}));
}
let expected_partitionings = None;
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_reduce_keyed() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
unsafe {
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.tick_batch(&cluster2.tick())
.reduce_keyed_commutative(q!(|acc, b| *acc += b))
.all_ticks()
.for_each(q!(|(a, b_sum)| {
println!("a: {}, b_sum: {}", a, b_sum);
}));
}
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut reduce_keyed_expected_dependencies = StructOrTuple::default();
reduce_keyed_expected_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "0".to_string()],
);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies)])),
(4, BTreeMap::from([(2, reduce_keyed_expected_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_reduce_keyed_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
unsafe {
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.tick_batch(&cluster2.tick())
.reduce_keyed_commutative(q!(|acc, b| *acc += b))
.all_ticks()
.for_each(q!(|(a, b_sum)| {
println!("a: {}, b_sum: {}", a, b_sum);
}));
}
let expected_partitionings = Some(vec![BTreeMap::from([(
2,
vec!["1".to_string(), "0".to_string()],
)])]);
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_reduce() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
unsafe {
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.tick_batch(&cluster2.tick())
.reduce_commutative(q!(|(acc_a, acc_b), (a, b)| {
*acc_a += a;
*acc_b += b;
}))
.all_ticks()
.for_each(q!(|(a_sum, b_sum)| {
println!("a_sum: {}, b_sum: {}", a_sum, b_sum);
}));
}
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies)])),
(4, BTreeMap::new()),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_reduce_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
unsafe {
cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2)
.tick_batch(&cluster2.tick())
.reduce_commutative(q!(|(acc_a, acc_b), (a, b)| {
*acc_a += a;
*acc_b += b;
}))
.all_ticks()
.for_each(q!(|(a_sum, b_sum)| {
println!("a_sum: {}, b_sum: {}", a_sum, b_sum);
}));
}
let expected_partitionings = None;
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_cycle() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let cluster2_tick = cluster2.tick();
let (complete_cycle, cycle) =
cluster2_tick.cycle::<Stream<(usize, usize), _, Bounded, NoOrder>>();
let prev_tick_input = cycle
.clone()
.filter(q!(|(a, _b)| *a > 2))
.map(q!(|(a, b)| (a, b + 2)));
unsafe {
complete_cycle
.complete_next_tick(prev_tick_input.chain(input.tick_batch(&cluster2_tick)));
}
cycle.all_ticks().for_each(q!(|(a, b)| {
println!("a: {}, b: {}", a, b);
}));
let expected_taint = BTreeMap::from([
(0, BTreeSet::from([6])), (1, BTreeSet::from([6])), (2, BTreeSet::from([6])), (3, BTreeSet::from([6])), (6, BTreeSet::from([])), (7, BTreeSet::from([6])), (8, BTreeSet::from([6])), (9, BTreeSet::from([6])), (10, BTreeSet::from([6])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut cycle_dependencies = StructOrTuple::default();
cycle_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "0".to_string()],
);
let expected_dependencies = BTreeMap::from([
(0, BTreeMap::from([(6, cycle_dependencies.clone())])),
(1, BTreeMap::from([(6, cycle_dependencies.clone())])),
(2, BTreeMap::from([(6, cycle_dependencies.clone())])),
(3, BTreeMap::from([(6, cycle_dependencies.clone())])),
(6, BTreeMap::new()),
(7, BTreeMap::from([(6, implicit_map_dependencies)])),
(8, BTreeMap::from([(6, cycle_dependencies.clone())])),
(9, BTreeMap::from([(6, cycle_dependencies.clone())])),
(10, BTreeMap::from([(6, cycle_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_cycle_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let cluster2_tick = cluster2.tick();
let (complete_cycle, cycle) =
cluster2_tick.cycle::<Stream<(usize, usize), _, Bounded, NoOrder>>();
let prev_tick_input = cycle
.clone()
.filter(q!(|(a, _b)| *a > 2))
.map(q!(|(a, b)| (a, b + 2)));
unsafe {
complete_cycle
.complete_next_tick(prev_tick_input.chain(input.tick_batch(&cluster2_tick)));
}
cycle.all_ticks().for_each(q!(|(a, b)| {
println!("a: {}, b: {}", a, b);
}));
let expected_partitionings = Some(Vec::new());
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_nested_cycle() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let cluster2_tick = cluster2.tick();
let (complete_cycle1, cycle1) =
cluster2_tick.cycle::<Stream<(usize, usize), _, Bounded, NoOrder>>();
let (complete_cycle2, cycle2) =
cluster2_tick.cycle::<Stream<(usize, usize), _, Bounded, NoOrder>>();
let chained = unsafe {
cycle1.join(input.tick_batch(&cluster2_tick))
.map(q!(|(_, (b1,b2))| (b1,b2))) .chain(cycle2)
};
complete_cycle1.complete_next_tick(chained.clone());
let cycle2_out = chained.map(q!(|(_a, b)| (b, b)));
complete_cycle2.complete_next_tick(cycle2_out.clone());
cycle2_out.all_ticks().for_each(q!(|(b, _)| {
println!("b: {}", b);
}));
let expected_taint = BTreeMap::from([
(0, BTreeSet::from([3])), (3, BTreeSet::from([])), (4, BTreeSet::from([3])), (5, BTreeSet::from([3])), (6, BTreeSet::from([3])), (7, BTreeSet::from([3])), (8, BTreeSet::from([3])), (9, BTreeSet::from([3])), (10, BTreeSet::from([3])), (11, BTreeSet::from([3])), (12, BTreeSet::from([3])), (13, BTreeSet::from([3])), (14, BTreeSet::from([3])), (15, BTreeSet::from([3])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut join_dependencies = StructOrTuple::default();
join_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "0".to_string()],
);
join_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
join_dependencies.add_dependency(
&vec!["1".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
join_dependencies.add_dependency(
&vec!["1".to_string(), "1".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let mut other_dependencies = StructOrTuple::default();
other_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
other_dependencies.add_dependency(
&vec!["1".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let expected_dependencies = BTreeMap::from([
(0, BTreeMap::from([(3, other_dependencies.clone())])),
(3, BTreeMap::new()),
(4, BTreeMap::from([(3, implicit_map_dependencies)])),
(5, BTreeMap::from([(3, join_dependencies)])),
(6, BTreeMap::from([(3, other_dependencies.clone())])),
(7, BTreeMap::from([(3, other_dependencies.clone())])),
(8, BTreeMap::from([(3, other_dependencies.clone())])),
(9, BTreeMap::from([(3, other_dependencies.clone())])),
(10, BTreeMap::from([(3, other_dependencies.clone())])),
(11, BTreeMap::from([(3, other_dependencies.clone())])),
(12, BTreeMap::from([(3, other_dependencies.clone())])),
(13, BTreeMap::from([(3, other_dependencies.clone())])),
(14, BTreeMap::from([(3, other_dependencies.clone())])),
(15, BTreeMap::from([(3, other_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_nested_cycle_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let cluster2_tick = cluster2.tick();
let (complete_cycle1, cycle1) =
cluster2_tick.cycle::<Stream<(usize, usize), _, Bounded, NoOrder>>();
let (complete_cycle2, cycle2) =
cluster2_tick.cycle::<Stream<(usize, usize), _, Bounded, NoOrder>>();
let chained = unsafe {
cycle1.join(input.tick_batch(&cluster2_tick))
.map(q!(|(_, (b1,b2))| (b1,b2))) .chain(cycle2)
};
complete_cycle1.complete_next_tick(chained.clone());
let cycle2_out = chained.map(q!(|(_a, b)| (b, b)));
complete_cycle2.complete_next_tick(cycle2_out.clone());
cycle2_out.all_ticks().for_each(q!(|(b, _)| {
println!("b: {}", b);
}));
let expected_partitionings = Some(vec![BTreeMap::from([(
3,
vec!["1".to_string(), "0".to_string()],
)])]);
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_source_iter() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let tick = cluster2.tick();
let stream1 = input.map(q!(|(a, b)| (b, a + 2)));
let stream2 = cluster2.source_iter(q!([(3, 4)]));
unsafe {
stream2
.tick_batch(&tick)
.chain(stream1.tick_batch(&tick))
.all_ticks()
.for_each(q!(|_| {
println!("No dependencies");
}));
}
let expected_taint = BTreeMap::from([
(0, BTreeSet::from([])), (3, BTreeSet::from([])), (4, BTreeSet::from([3])),
(5, BTreeSet::from([3])), (6, BTreeSet::from([3])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut map_dependencies = StructOrTuple::default();
map_dependencies.add_dependency(
&vec!["0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let expected_dependencies = BTreeMap::from([
(0, BTreeMap::new()),
(3, BTreeMap::new()),
(4, BTreeMap::from([(3, implicit_map_dependencies)])),
(5, BTreeMap::from([(3, map_dependencies.clone())])),
(6, BTreeMap::from([(3, map_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_source_iter_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let tick = cluster2.tick();
let stream1 = input.map(q!(|(a, b)| (b, a + 2)));
let stream2 = cluster2.source_iter(q!([(3, 4)]));
unsafe {
stream2
.tick_batch(&tick)
.chain(stream1.tick_batch(&tick))
.all_ticks()
.for_each(q!(|_| {
println!("No dependencies");
}));
}
let expected_partitionings = Some(Vec::new());
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
#[test]
fn test_multiple_inputs() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input1 = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let input2 = cluster1
.source_iter(q!([(3, 4)]))
.broadcast_bincode_anonymous(&cluster2);
let tick = cluster2.tick();
unsafe {
let stream1 = input1.map(q!(|(a, b)| (a * 2, b))).tick_batch(&tick);
let stream2 = input2.map(q!(|(a, b)| (-a, b))).tick_batch(&tick);
stream2
.clone()
.chain(stream1.clone())
.all_ticks()
.for_each(q!(|_| {
println!("Dependent on both input1.b and input2.b");
}));
stream2
.join(stream1)
.all_ticks()
.for_each(q!(|(_, (b1, b2))| {
println!("b from input 1: {}, b from input 2: {}", b1, b2);
}));
}
let expected_taint = BTreeMap::from([
(2, BTreeSet::from([])), (3, BTreeSet::from([2])),
(4, BTreeSet::from([2])), (5, BTreeSet::from([2])), (8, BTreeSet::from([])), (9, BTreeSet::from([8])),
(10, BTreeSet::from([8])), (11, BTreeSet::from([8])), (12, BTreeSet::from([2, 8])), (14, BTreeSet::from([2])), (15, BTreeSet::from([8])), (16, BTreeSet::from([2, 8])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let mut input_map_dependencies = StructOrTuple::default();
input_map_dependencies.add_dependency(
&vec!["1".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let mut join_input1_dependencies = StructOrTuple::default();
join_input1_dependencies.add_dependency(
&vec!["1".to_string(), "1".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let mut join_input2_dependencies = StructOrTuple::default();
join_input2_dependencies.add_dependency(
&vec!["1".to_string(), "0".to_string()],
vec!["1".to_string(), "1".to_string()],
);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(4, BTreeMap::from([(2, input_map_dependencies.clone())])),
(5, BTreeMap::from([(2, input_map_dependencies.clone())])),
(8, BTreeMap::new()),
(9, BTreeMap::from([(8, implicit_map_dependencies)])),
(10, BTreeMap::from([(8, input_map_dependencies.clone())])),
(11, BTreeMap::from([(8, input_map_dependencies.clone())])),
(
12,
BTreeMap::from([
(2, input_map_dependencies.clone()),
(8, input_map_dependencies.clone()),
]),
),
(14, BTreeMap::from([(2, input_map_dependencies.clone())])),
(15, BTreeMap::from([(8, input_map_dependencies)])),
(
16,
BTreeMap::from([(2, join_input2_dependencies), (8, join_input1_dependencies)]),
),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_multiple_inputs_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input1 = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let input2 = cluster1
.source_iter(q!([(3, 4)]))
.broadcast_bincode_anonymous(&cluster2);
let tick = cluster2.tick();
unsafe {
let stream1 = input1.map(q!(|(a, b)| (b, a * 2))).tick_batch(&tick);
let stream2 = input2.map(q!(|(a, b)| (b, -a))).tick_batch(&tick);
stream2
.clone()
.chain(stream1.clone())
.all_ticks()
.for_each(q!(|_| {
println!("Dependent on both input1.b and input2.b");
}));
stream2
.join(stream1)
.all_ticks()
.for_each(q!(|(_, (a1, a2))| {
println!("a*2 from input 1: {}, -a from input 2: {}", a1, a2);
}));
}
let expected_partitioning = Some(vec![BTreeMap::from([
(2, vec!["1".to_string(), "1".to_string()]),
(8, vec!["1".to_string(), "1".to_string()]),
])]);
test_input_partitionable(builder, cluster2.id(), expected_partitioning);
}
#[test]
fn test_difference() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input1 = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let input2 = cluster1
.source_iter(q!([(3, 4)]))
.broadcast_bincode_anonymous(&cluster2);
let tick = cluster2.tick();
unsafe {
input1
.tick_batch(&tick)
.filter_not_in(input2.tick_batch(&tick))
}
.all_ticks()
.for_each(q!(|(a, b)| {
println!("a: {}, b: {}", a, b);
}));
let expected_taint = BTreeMap::from([
(2, BTreeSet::new()), (3, BTreeSet::from([2])),
(6, BTreeSet::new()), (7, BTreeSet::from([6])),
(8, BTreeSet::from([2])), ]);
let mut implicit_map_dependencies = StructOrTuple::default();
implicit_map_dependencies.add_dependency(&vec![], vec!["1".to_string()]);
let expected_dependencies = BTreeMap::from([
(2, BTreeMap::new()),
(3, BTreeMap::from([(2, implicit_map_dependencies.clone())])),
(6, BTreeMap::new()),
(7, BTreeMap::from([(6, implicit_map_dependencies.clone())])),
(8, BTreeMap::from([(2, implicit_map_dependencies)])),
]);
test_input(
builder,
cluster2.id(),
expected_taint,
expected_dependencies,
);
}
#[test]
fn test_difference_partitionable() {
let builder = FlowBuilder::new();
let cluster1 = builder.cluster::<()>();
let cluster2 = builder.cluster::<()>();
let input1 = cluster1
.source_iter(q!([(1, 2)]))
.broadcast_bincode_anonymous(&cluster2);
let input2 = cluster1
.source_iter(q!([(3, 4)]))
.broadcast_bincode_anonymous(&cluster2);
let tick = cluster2.tick();
unsafe {
input1
.tick_batch(&tick)
.filter_not_in(input2.tick_batch(&tick))
}
.all_ticks()
.for_each(q!(|(a, b)| {
println!("a: {}, b: {}", a, b);
}));
let expected_partitionings = Some(vec![BTreeMap::from([
(2, vec!["1".to_string()]),
(6, vec!["1".to_string()]),
])]);
test_input_partitionable(builder, cluster2.id(), expected_partitionings);
}
}