1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//! Logical DSL graph: a `GraphNode` per JVM `GraphNode`, auto-named at build
//! time, optimized then lowered to the Processor-API `Topology`.
use std::any::Any;
pub(crate) type NodeId = usize;
/// A boxed lowering thunk attached at op-call time. Lowering calls it to attach
/// the typed processor to the Processor-API builder. Erased here because each op
/// has different K/V types.
#[allow(dead_code)]
pub(crate) type LowerFn = Box<dyn FnOnce(&mut LowerState) + Send>;
/// Threaded through lowering: the Processor-API `Topology` under construction +
/// the app id + the Processor-API node NAME each logical node lowered to.
#[allow(dead_code)]
pub(crate) struct LowerState {
pub topology: crate::topology::Topology,
pub app_id: String,
pub handle_name: std::collections::HashMap<NodeId, String>,
/// `REUSE_KTABLE_SOURCE_TOPICS`: node id → the source topic its store should
/// reuse as its changelog. A `TableSource` thunk consults this at lowering
/// time; if present it registers its store with that changelog topic
/// (via `add_state_store_with_changelog`) instead of the default
/// `<app>-<store>-changelog`. Empty unless the optimizer ran.
pub reuse_changelog: std::collections::HashMap<NodeId, String>,
}
#[allow(dead_code)]
pub(crate) enum GraphNodeKind {
StreamSource {
topics: Vec<String>,
},
StatelessProcessor {
repartition_required: bool,
},
StreamSink {
topic: String,
},
Repartition {
topic: String,
partitions: Option<i32>,
},
Aggregate {
store_name: String,
changelog: bool,
},
TableSource {
topic: String,
store_name: String,
reuse_source_for_changelog: bool,
},
TableProcessor {
store_name: Option<String>,
},
/// A `GlobalKTable` source. Invisible in the wire (no subtopology, no
/// changelog); its lowering thunk calls `Topology::add_global_store`, which
/// registers a source + update-processor (consuming a node-group index) and a
/// separate global KV store factory.
GlobalSource {
topic: String,
store_name: String,
source_name: String,
processor_name: String,
},
}
#[allow(dead_code)]
pub(crate) struct GraphNode {
pub id: NodeId,
pub name: String,
pub kind: GraphNodeKind,
pub predecessors: Vec<NodeId>,
pub children: Vec<NodeId>,
#[allow(dead_code)]
pub key_changing_operation: bool,
#[allow(dead_code)]
pub merge_node: bool,
/// Typed lowering thunk (None for nodes lowered structurally).
#[allow(dead_code)]
pub lower: Option<LowerFn>,
/// Erased payload some passes/lowering inspect (e.g. source `Consumed`).
#[allow(dead_code)]
pub aux: Option<Box<dyn Any + Send>>,
}
#[derive(Default)]
pub(crate) struct LogicalGraph {
pub nodes: Vec<GraphNode>,
/// Optimizer-installed redundant→keeper redirects. When `aliases[&b] == a`,
/// the lowering driver does not run node `b`'s thunk; instead it points
/// `handle_name[&b]` at whatever `a` lowered to. The `MERGE_REPARTITION_TOPICS`
/// pass uses this to collapse two repartition nodes (off the same key-changing
/// source) onto a single shared repartition topic. The keeper always has the
/// lower id, so id-order lowering visits it first.
pub aliases: std::collections::HashMap<NodeId, NodeId>,
}
impl LogicalGraph {
pub fn add(&mut self, name: String, kind: GraphNodeKind, predecessors: Vec<NodeId>) -> NodeId {
let id = self.nodes.len();
for &p in &predecessors {
self.nodes[p].children.push(id);
}
self.nodes.push(GraphNode {
id,
name,
kind,
predecessors,
children: Vec::new(),
key_changing_operation: false,
merge_node: false,
lower: None,
aux: None,
});
id
}
}