use std::collections::HashMap;
use super::node::{ChangelogKind, NodeKind, NodeRegistry};
#[derive(Debug, Clone, Default)]
pub(crate) struct GroupTopics {
pub id: String,
pub source_topics: Vec<String>,
pub repartition_source_topics: Vec<String>,
pub repartition_sink_topics: Vec<String>,
pub changelog_stores: Vec<(String, Option<String>, ChangelogKind)>,
pub copartition_groups: Vec<Vec<String>>,
}
struct QuickUnion {
parent: Vec<usize>,
}
impl QuickUnion {
fn new(n: usize) -> Self {
Self {
parent: (0..n).collect(),
}
}
fn find(&mut self, mut x: usize) -> usize {
while self.parent[x] != x {
self.parent[x] = self.parent[self.parent[x]];
x = self.parent[x];
}
x
}
fn unite(&mut self, a: usize, b: usize) {
let ra = self.find(a);
let rb = self.find(b);
if ra != rb {
self.parent[ra] = rb;
}
}
}
pub(crate) fn group_nodes(reg: &NodeRegistry) -> Vec<GroupTopics> {
let n = reg.nodes.len();
let mut uf = QuickUnion::new(n);
for (i, node) in reg.nodes.iter().enumerate() {
let preds = match &node.kind {
NodeKind::Processor { predecessors } | NodeKind::Sink { predecessors, .. } => {
predecessors
}
NodeKind::Source { .. } => continue,
};
for p in preds {
if let Some(&j) = reg.index.get(p) {
uf.unite(i, j);
}
}
}
for store in ®.stores {
let mut iter = store
.processors
.iter()
.filter_map(|p| reg.index.get(p).copied());
if let Some(first) = iter.next() {
for other in iter {
uf.unite(first, other);
}
}
}
let mut root_to_id: HashMap<usize, usize> = HashMap::new();
let mut next_id = 0usize;
let mut groups: HashMap<usize, GroupTopics> = HashMap::new();
let mut order: Vec<usize> = Vec::new();
for i in 0..n {
let root = uf.find(i);
let id = *root_to_id.entry(root).or_insert_with(|| {
let id = next_id;
next_id += 1;
order.push(id);
id
});
let entry = groups.entry(id).or_insert_with(|| GroupTopics {
id: id.to_string(),
..Default::default()
});
match ®.nodes[i].kind {
NodeKind::Source { topics } => {
for t in topics {
if reg.global_source_topics.contains(t) {
continue;
}
if reg.repartition_topics.contains(t) {
entry.repartition_source_topics.push(t.clone());
} else {
entry.source_topics.push(t.clone());
}
}
}
NodeKind::Sink { topic, .. } => {
if reg.repartition_topics.contains(topic) {
entry.repartition_sink_topics.push(topic.clone());
}
}
NodeKind::Processor { .. } => {}
}
}
for store in ®.stores {
if let Some(&first) = store.processors.first().and_then(|p| reg.index.get(p)) {
let root = uf.find(first);
if let Some(&id) = root_to_id.get(&root)
&& let Some(g) = groups.get_mut(&id)
{
g.changelog_stores.push((
store.name.clone(),
store.changelog_override.clone(),
store.changelog_kind,
));
}
}
}
for members in ®.copartition_groups {
if let Some(g) = groups.values_mut().find(|g| {
members
.iter()
.all(|m| g.source_topics.contains(m) || g.repartition_source_topics.contains(m))
}) {
g.copartition_groups.push(members.clone());
}
}
order
.into_iter()
.filter_map(|id| groups.remove(&id))
.filter(|g| !g.source_topics.is_empty() || !g.repartition_source_topics.is_empty())
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::topology::node::{ChangelogKind, NodeRegistry};
use assert2::check;
fn ids(groups: &[GroupTopics]) -> Vec<&str> {
groups.iter().map(|g| g.id.as_str()).collect()
}
#[test]
fn single_source_sink_is_one_subtopology() {
let mut reg = NodeRegistry::default();
reg.add_source("src", vec!["in".into()]).unwrap();
reg.add_sink("snk", "out".into(), vec!["src".into()])
.unwrap();
let groups = group_nodes(®);
check!(ids(&groups) == vec!["0"]);
check!(groups[0].source_topics == vec!["in".to_string()]);
}
#[test]
fn repartition_chain_is_two_subtopologies() {
let mut reg = NodeRegistry::default();
reg.repartition_topics.insert("rp".into());
reg.add_source("src", vec!["in".into()]).unwrap();
reg.add_sink("rsink", "rp".into(), vec!["src".into()])
.unwrap();
reg.add_source("rsrc", vec!["rp".into()]).unwrap();
reg.add_sink("snk2", "out".into(), vec!["rsrc".into()])
.unwrap();
let groups = group_nodes(®);
check!(ids(&groups) == vec!["0", "1"]);
check!(groups[0].source_topics == vec!["in".to_string()]);
check!(groups[0].repartition_sink_topics == vec!["rp".to_string()]);
check!(groups[1].repartition_source_topics == vec!["rp".to_string()]);
}
#[test]
fn shared_state_store_unites_processors_into_one_group() {
let mut reg = NodeRegistry::default();
reg.add_source("s1", vec!["a".into()]).unwrap();
reg.add_processor("p1", vec!["s1".into()]).unwrap();
reg.add_source("s2", vec!["b".into()]).unwrap();
reg.add_processor("p2", vec!["s2".into()]).unwrap();
reg.add_store("store", vec!["p1".into(), "p2".into()], None);
let groups = group_nodes(®);
check!(ids(&groups) == vec!["0"]);
let mut srcs = groups[0].source_topics.clone();
srcs.sort();
check!(srcs == vec!["a".to_string(), "b".to_string()]);
check!(groups[0].changelog_stores == vec![("store".to_string(), None, ChangelogKind::Kv)]);
}
#[test]
fn source_less_group_is_dropped_but_consumes_an_index() {
let mut reg = NodeRegistry::default();
reg.add_processor("orphan_proc", vec![]).unwrap();
reg.add_sink("orphan_sink", "x".into(), vec!["orphan_proc".into()])
.unwrap();
reg.add_source("src", vec!["in".into()]).unwrap();
reg.add_sink("snk", "out".into(), vec!["src".into()])
.unwrap();
let groups = group_nodes(®);
check!(ids(&groups) == vec!["1"]);
}
#[test]
fn global_source_is_invisible_but_consumes_an_index() {
let mut reg = NodeRegistry::default();
reg.add_source("gsrc", vec!["global".into()]).unwrap();
reg.add_processor("gproc", vec!["gsrc".into()]).unwrap();
reg.add_global_source("global");
reg.add_source("src", vec!["in".into()]).unwrap();
reg.add_sink("snk", "out".into(), vec!["src".into()])
.unwrap();
let groups = group_nodes(®);
check!(groups.len() == 1);
check!(ids(&groups) == vec!["1"]);
check!(groups[0].source_topics == vec!["in".to_string()]);
}
}