1use std::collections::VecDeque;
8
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Topology {
14 pub nodes: Vec<TopologyNode>,
16 pub edges: Vec<TopologyEdge>,
18 pub sources: Vec<u32>,
20 pub sinks: Vec<u32>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct TopologyNode {
27 pub id: u32,
29 pub name: String,
31 pub node_type: NodeType,
33 pub operation_count: usize,
35 pub operation_summary: String,
37 pub metrics: NodeMetrics,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct TopologyEdge {
44 pub from: u32,
46 pub to: u32,
48 pub edge_type: EdgeType,
50 pub label: String,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum NodeType {
57 Source,
59 Stream,
61 Join,
63 Merge,
65 Timer,
67 Sink,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
73pub enum EdgeType {
74 EventType,
76 StreamOutput,
78 MergeInput,
80 JoinInput,
82}
83
84#[derive(Debug, Clone, Default, Serialize, Deserialize)]
86pub struct NodeMetrics {
87 pub events_received: u64,
89 pub events_emitted: u64,
91 pub events_filtered: u64,
93 pub processing_time_ns: u64,
95}
96
97impl Topology {
98 pub fn topological_order(&self) -> Option<Vec<u32>> {
103 let node_ids: Vec<u32> = self.nodes.iter().map(|n| n.id).collect();
104 let max_id = node_ids.iter().copied().max().unwrap_or(0) as usize;
105
106 let mut in_degree = vec![0u32; max_id + 1];
108 let mut adj: Vec<Vec<u32>> = vec![Vec::new(); max_id + 1];
109
110 for edge in &self.edges {
111 in_degree[edge.to as usize] += 1;
112 adj[edge.from as usize].push(edge.to);
113 }
114
115 let mut queue: VecDeque<u32> = VecDeque::new();
117 for &id in &node_ids {
118 if in_degree[id as usize] == 0 {
119 queue.push_back(id);
120 }
121 }
122
123 let mut order = Vec::with_capacity(node_ids.len());
124
125 while let Some(node) = queue.pop_front() {
126 order.push(node);
127 for &neighbor in &adj[node as usize] {
128 in_degree[neighbor as usize] -= 1;
129 if in_degree[neighbor as usize] == 0 {
130 queue.push_back(neighbor);
131 }
132 }
133 }
134
135 if order.len() == node_ids.len() {
136 Some(order)
137 } else {
138 None }
140 }
141
142 pub fn validate(&self) -> Result<(), String> {
147 if self.topological_order().is_none() {
148 return Err("Topology contains a cycle".to_string());
149 }
150 Ok(())
151 }
152
153 pub fn to_json(&self) -> Result<String, serde_json::Error> {
155 serde_json::to_string_pretty(self)
156 }
157
158 pub fn get_node(&self, id: u32) -> Option<&TopologyNode> {
160 self.nodes.iter().find(|n| n.id == id)
161 }
162
163 pub fn outgoing_edges(&self, node_id: u32) -> Vec<&TopologyEdge> {
165 self.edges.iter().filter(|e| e.from == node_id).collect()
166 }
167
168 pub fn incoming_edges(&self, node_id: u32) -> Vec<&TopologyEdge> {
170 self.edges.iter().filter(|e| e.to == node_id).collect()
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 fn make_node(id: u32, name: &str, node_type: NodeType) -> TopologyNode {
179 TopologyNode {
180 id,
181 name: name.to_string(),
182 node_type,
183 operation_count: 0,
184 operation_summary: String::new(),
185 metrics: NodeMetrics::default(),
186 }
187 }
188
189 fn make_edge(from: u32, to: u32) -> TopologyEdge {
190 TopologyEdge {
191 from,
192 to,
193 edge_type: EdgeType::EventType,
194 label: String::new(),
195 }
196 }
197
198 #[test]
199 fn test_empty_topology() {
200 let topo = Topology {
201 nodes: vec![],
202 edges: vec![],
203 sources: vec![],
204 sinks: vec![],
205 };
206 assert_eq!(topo.topological_order(), Some(vec![]));
207 assert!(topo.validate().is_ok());
208 }
209
210 #[test]
211 fn test_linear_topology() {
212 let topo = Topology {
214 nodes: vec![
215 make_node(0, "A", NodeType::Source),
216 make_node(1, "B", NodeType::Stream),
217 make_node(2, "C", NodeType::Sink),
218 ],
219 edges: vec![make_edge(0, 1), make_edge(1, 2)],
220 sources: vec![0],
221 sinks: vec![2],
222 };
223
224 let order = topo.topological_order().unwrap();
225 assert_eq!(order, vec![0, 1, 2]);
226 assert!(topo.validate().is_ok());
227 }
228
229 #[test]
230 fn test_fanout_topology() {
231 let topo = Topology {
233 nodes: vec![
234 make_node(0, "Source", NodeType::Source),
235 make_node(1, "Stream1", NodeType::Stream),
236 make_node(2, "Stream2", NodeType::Stream),
237 ],
238 edges: vec![make_edge(0, 1), make_edge(0, 2)],
239 sources: vec![0],
240 sinks: vec![1, 2],
241 };
242
243 let order = topo.topological_order().unwrap();
244 assert_eq!(order[0], 0); assert!(topo.validate().is_ok());
246 }
247
248 #[test]
249 fn test_cycle_detection() {
250 let topo = Topology {
252 nodes: vec![
253 make_node(0, "A", NodeType::Stream),
254 make_node(1, "B", NodeType::Stream),
255 ],
256 edges: vec![make_edge(0, 1), make_edge(1, 0)],
257 sources: vec![],
258 sinks: vec![],
259 };
260
261 assert!(topo.topological_order().is_none());
262 assert!(topo.validate().is_err());
263 }
264
265 #[test]
266 fn test_json_serialization() {
267 let topo = Topology {
268 nodes: vec![make_node(0, "Source", NodeType::Source)],
269 edges: vec![],
270 sources: vec![0],
271 sinks: vec![],
272 };
273
274 let json = topo.to_json().unwrap();
275 assert!(json.contains("Source"));
276 let roundtrip: Topology = serde_json::from_str(&json).unwrap();
277 assert_eq!(roundtrip.nodes.len(), 1);
278 }
279
280 #[test]
281 fn test_edge_queries() {
282 let topo = Topology {
283 nodes: vec![
284 make_node(0, "A", NodeType::Source),
285 make_node(1, "B", NodeType::Stream),
286 make_node(2, "C", NodeType::Sink),
287 ],
288 edges: vec![make_edge(0, 1), make_edge(1, 2)],
289 sources: vec![0],
290 sinks: vec![2],
291 };
292
293 assert_eq!(topo.outgoing_edges(0).len(), 1);
294 assert_eq!(topo.incoming_edges(1).len(), 1);
295 assert_eq!(topo.outgoing_edges(2).len(), 0);
296 }
297}