use std::collections::{HashMap, HashSet};
use super::builder::TopologyError;
#[derive(Debug, Clone)]
pub(crate) enum NodeKind {
Source {
topics: Vec<String>,
},
Processor {
predecessors: Vec<String>,
},
Sink {
topic: String,
predecessors: Vec<String>,
},
}
#[derive(Debug, Clone)]
pub(crate) struct Node {
pub name: String,
pub kind: NodeKind,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ChangelogKind {
Kv,
AggWindow { retention_ms: i64 },
JoinWindow { retention_ms: i64 },
}
#[derive(Debug, Clone)]
pub(crate) struct StoreEntry {
pub name: String,
pub processors: Vec<String>,
pub changelog_override: Option<String>,
pub changelog_kind: ChangelogKind,
}
#[derive(Debug, Default)]
pub(crate) struct NodeRegistry {
pub nodes: Vec<Node>,
pub index: HashMap<String, usize>,
pub stores: Vec<StoreEntry>,
pub repartition_topics: HashSet<String>,
pub global_source_topics: HashSet<String>,
pub copartition_groups: Vec<Vec<String>>,
}
impl NodeRegistry {
fn insert(&mut self, node: Node) -> Result<(), TopologyError> {
if self.index.contains_key(&node.name) {
return Err(TopologyError::DuplicateNode(node.name));
}
self.index.insert(node.name.clone(), self.nodes.len());
self.nodes.push(node);
Ok(())
}
fn require_predecessors_exist(
&self,
node: &str,
predecessors: &[String],
) -> Result<(), TopologyError> {
for p in predecessors {
if !self.index.contains_key(p) {
return Err(TopologyError::UnknownPredecessor {
node: node.to_string(),
predecessor: p.clone(),
});
}
}
Ok(())
}
pub fn add_source(&mut self, name: &str, topics: Vec<String>) -> Result<(), TopologyError> {
self.insert(Node {
name: name.to_string(),
kind: NodeKind::Source { topics },
})
}
pub fn add_processor(
&mut self,
name: &str,
predecessors: Vec<String>,
) -> Result<(), TopologyError> {
self.require_predecessors_exist(name, &predecessors)?;
self.insert(Node {
name: name.to_string(),
kind: NodeKind::Processor { predecessors },
})
}
pub fn add_sink(
&mut self,
name: &str,
topic: String,
predecessors: Vec<String>,
) -> Result<(), TopologyError> {
self.require_predecessors_exist(name, &predecessors)?;
self.insert(Node {
name: name.to_string(),
kind: NodeKind::Sink {
topic,
predecessors,
},
})
}
pub fn add_store(
&mut self,
name: &str,
processors: Vec<String>,
changelog_override: Option<String>,
) {
self.stores.push(StoreEntry {
name: name.to_string(),
processors,
changelog_override,
changelog_kind: ChangelogKind::Kv,
});
}
pub fn add_window_store(
&mut self,
name: &str,
processors: Vec<String>,
changelog_override: Option<String>,
retention_ms: i64,
) {
self.stores.push(StoreEntry {
name: name.to_string(),
processors,
changelog_override,
changelog_kind: ChangelogKind::AggWindow { retention_ms },
});
}
pub fn add_join_window_store(
&mut self,
name: &str,
processors: Vec<String>,
changelog_override: Option<String>,
retention_ms: i64,
) {
self.stores.push(StoreEntry {
name: name.to_string(),
processors,
changelog_override,
changelog_kind: ChangelogKind::JoinWindow { retention_ms },
});
}
pub fn add_copartition_group(&mut self, topics: Vec<String>) {
self.copartition_groups.push(topics);
}
pub fn add_global_source(&mut self, topic: &str) {
self.global_source_topics.insert(topic.to_string());
}
pub fn connect_processor_store(&mut self, processor: &str, store: &str) {
if let Some(e) = self.stores.iter_mut().find(|e| e.name == store)
&& !e.processors.iter().any(|p| p == processor)
{
e.processors.push(processor.to_string());
}
}
pub fn validate_predecessors(&self) -> Result<(), TopologyError> {
for node in &self.nodes {
let preds = match &node.kind {
NodeKind::Processor { predecessors } | NodeKind::Sink { predecessors, .. } => {
predecessors
}
NodeKind::Source { .. } => continue,
};
for p in preds {
if !self.index.contains_key(p) {
return Err(TopologyError::UnknownPredecessor {
node: node.name.clone(),
predecessor: p.clone(),
});
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::check;
#[test]
fn nodes_preserve_insertion_order() {
let mut reg = NodeRegistry::default();
reg.add_source("src", vec!["t".into()]).unwrap();
reg.add_processor("p", vec!["src".into()]).unwrap();
reg.add_sink("snk", "out".into(), vec!["p".into()]).unwrap();
let names: Vec<&str> = reg.nodes.iter().map(|n| n.name.as_str()).collect();
check!(names == vec!["src", "p", "snk"]);
}
#[test]
fn duplicate_node_is_rejected() {
let mut reg = NodeRegistry::default();
reg.add_source("a", vec!["t".into()]).unwrap();
check!(reg.add_processor("a", vec![]).is_err());
}
#[test]
fn add_global_source_records_topic() {
let mut reg = NodeRegistry::default();
reg.add_global_source("global");
check!(reg.global_source_topics.contains("global"));
check!(reg.global_source_topics.len() == 1);
}
}