use std::any::Any;
pub(crate) type NodeId = usize;
#[allow(dead_code)]
pub(crate) type LowerFn = Box<dyn FnOnce(&mut LowerState) + Send>;
#[allow(dead_code)]
pub(crate) struct LowerState {
pub topology: crate::topology::Topology,
pub app_id: String,
pub handle_name: std::collections::HashMap<NodeId, String>,
pub reuse_changelog: std::collections::HashMap<NodeId, String>,
}
#[allow(dead_code)]
pub(crate) enum GraphNodeKind {
StreamSource {
topics: Vec<String>,
},
StatelessProcessor {
repartition_required: bool,
},
StreamSink {
topic: String,
},
Repartition {
topic: String,
partitions: Option<i32>,
},
Aggregate {
store_name: String,
changelog: bool,
},
TableSource {
topic: String,
store_name: String,
reuse_source_for_changelog: bool,
},
TableProcessor {
store_name: Option<String>,
},
GlobalSource {
topic: String,
store_name: String,
source_name: String,
processor_name: String,
},
}
#[allow(dead_code)]
pub(crate) struct GraphNode {
pub id: NodeId,
pub name: String,
pub kind: GraphNodeKind,
pub predecessors: Vec<NodeId>,
pub children: Vec<NodeId>,
#[allow(dead_code)]
pub key_changing_operation: bool,
#[allow(dead_code)]
pub merge_node: bool,
#[allow(dead_code)]
pub lower: Option<LowerFn>,
#[allow(dead_code)]
pub aux: Option<Box<dyn Any + Send>>,
}
#[derive(Default)]
pub(crate) struct LogicalGraph {
pub nodes: Vec<GraphNode>,
pub aliases: std::collections::HashMap<NodeId, NodeId>,
}
impl LogicalGraph {
pub fn add(&mut self, name: String, kind: GraphNodeKind, predecessors: Vec<NodeId>) -> NodeId {
let id = self.nodes.len();
for &p in &predecessors {
self.nodes[p].children.push(id);
}
self.nodes.push(GraphNode {
id,
name,
kind,
predecessors,
children: Vec::new(),
key_changing_operation: false,
merge_node: false,
lower: None,
aux: None,
});
id
}
}