use std::collections::BTreeMap;
use crate::dsl::graph::{GraphNodeKind, LogicalGraph, NodeId};
pub(crate) fn merge_repartition_topics(graph: &mut LogicalGraph) {
let mut by_pred: BTreeMap<NodeId, Vec<NodeId>> = BTreeMap::new();
for node in &graph.nodes {
if matches!(node.kind, GraphNodeKind::Repartition { .. })
&& let &[pred] = node.predecessors.as_slice()
{
by_pred.entry(pred).or_default().push(node.id);
}
}
for (_pred, mut rp_ids) in by_pred {
if rp_ids.len() < 2 {
continue;
}
rp_ids.sort_unstable();
let keeper = rp_ids[0];
for &redundant in &rp_ids[1..] {
graph.aliases.insert(redundant, keeper);
}
}
}
pub(crate) fn reuse_ktable_source_topics(graph: &mut LogicalGraph) {
for node in &mut graph.nodes {
if let GraphNodeKind::TableSource {
reuse_source_for_changelog,
..
} = &mut node.kind
{
*reuse_source_for_changelog = true;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dsl::graph::GraphNode;
fn repartition_node(id: NodeId, pred: NodeId) -> GraphNode {
GraphNode {
id,
name: format!("rp-{id}"),
kind: GraphNodeKind::Repartition {
topic: format!("topic-{id}"),
partitions: None,
},
predecessors: vec![pred],
children: Vec::new(),
key_changing_operation: false,
merge_node: false,
lower: None,
aux: None,
}
}
#[test]
fn two_repartitions_off_one_source_alias_to_the_lower_id() {
let mut g = LogicalGraph::default();
g.nodes.push(GraphNode {
id: 0,
name: "select-key".into(),
kind: GraphNodeKind::StatelessProcessor {
repartition_required: false,
},
predecessors: vec![],
children: vec![],
key_changing_operation: true,
merge_node: false,
lower: None,
aux: None,
});
g.nodes.push(repartition_node(1, 0));
g.nodes.push(repartition_node(2, 0));
merge_repartition_topics(&mut g);
assert_eq!(g.aliases.get(&2), Some(&1));
assert_eq!(g.aliases.get(&1), None);
}
#[test]
fn lone_repartition_is_not_aliased() {
let mut g = LogicalGraph::default();
g.nodes.push(GraphNode {
id: 0,
name: "select-key".into(),
kind: GraphNodeKind::StatelessProcessor {
repartition_required: false,
},
predecessors: vec![],
children: vec![],
key_changing_operation: true,
merge_node: false,
lower: None,
aux: None,
});
g.nodes.push(repartition_node(1, 0));
merge_repartition_topics(&mut g);
assert!(g.aliases.is_empty());
}
#[test]
fn repartitions_off_distinct_sources_do_not_merge() {
let mut g = LogicalGraph::default();
g.nodes.push(repartition_node(0, 100));
g.nodes.push(repartition_node(1, 200));
merge_repartition_topics(&mut g);
assert!(g.aliases.is_empty());
}
fn table_source_node(id: NodeId, topic: &str, store: &str) -> GraphNode {
GraphNode {
id,
name: format!("src-{id}"),
kind: GraphNodeKind::TableSource {
topic: topic.into(),
store_name: store.into(),
reuse_source_for_changelog: false,
},
predecessors: vec![],
children: vec![],
key_changing_operation: false,
merge_node: false,
lower: None,
aux: None,
}
}
#[test]
fn reuse_pass_flags_every_table_source() {
let mut g = LogicalGraph::default();
g.nodes.push(table_source_node(0, "in", "store"));
g.nodes.push(repartition_node(1, 0));
reuse_ktable_source_topics(&mut g);
match &g.nodes[0].kind {
GraphNodeKind::TableSource {
reuse_source_for_changelog,
topic,
..
} => {
assert!(reuse_source_for_changelog);
assert_eq!(topic, "in");
}
_ => panic!("node 0 should be a TableSource"),
}
assert!(matches!(g.nodes[1].kind, GraphNodeKind::Repartition { .. }));
}
}