Skip to main content

varpulis_runtime/engine/
topology.rs

1//! DAG topology representation for Varpulis stream processing
2//!
3//! Provides an introspectable graph of inter-stream relationships for
4//! visualization, metrics collection, and execution ordering. Intra-stream
5//! operations remain as linear `Vec<RuntimeOp>` to preserve the hot path.
6
7use std::collections::VecDeque;
8
9use serde::{Deserialize, Serialize};
10
11/// The stream processing topology — a DAG of nodes and edges.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Topology {
14    /// All nodes in the topology
15    pub nodes: Vec<TopologyNode>,
16    /// All edges (data flow connections) between nodes
17    pub edges: Vec<TopologyEdge>,
18    /// Node IDs that are pure sources (no incoming edges)
19    pub sources: Vec<u32>,
20    /// Node IDs that are pure sinks (no outgoing edges)
21    pub sinks: Vec<u32>,
22}
23
24/// A single node in the topology.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct TopologyNode {
27    /// Unique node identifier
28    pub id: u32,
29    /// Human-readable name (stream name or connector name)
30    pub name: String,
31    /// Type of this node
32    pub node_type: NodeType,
33    /// Number of operations in this stream's pipeline
34    pub operation_count: usize,
35    /// Summary of operations (e.g., "Filter → Window → Aggregate → Emit")
36    pub operation_summary: String,
37    /// Runtime metrics for this node
38    pub metrics: NodeMetrics,
39}
40
41/// Edge connecting two topology nodes.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct TopologyEdge {
44    /// Source node ID
45    pub from: u32,
46    /// Target node ID
47    pub to: u32,
48    /// Type of this edge
49    pub edge_type: EdgeType,
50    /// Label for the edge (e.g., event type name)
51    pub label: String,
52}
53
54/// Classification of topology nodes.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum NodeType {
57    /// External event source (connector or event type)
58    Source,
59    /// Stream processing pipeline
60    Stream,
61    /// Join of multiple streams
62    Join,
63    /// Merge of multiple event sources
64    Merge,
65    /// Periodic timer
66    Timer,
67    /// External sink connector
68    Sink,
69}
70
71/// Classification of topology edges.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
73pub enum EdgeType {
74    /// Event type routing (EventRouter fanout)
75    EventType,
76    /// Stream output feeding into another stream
77    StreamOutput,
78    /// Input to a merge node
79    MergeInput,
80    /// Input to a join node
81    JoinInput,
82}
83
84/// Runtime metrics for a topology node.
85#[derive(Debug, Clone, Default, Serialize, Deserialize)]
86pub struct NodeMetrics {
87    /// Total events received by this node
88    pub events_received: u64,
89    /// Total events emitted by this node
90    pub events_emitted: u64,
91    /// Total events filtered out by this node
92    pub events_filtered: u64,
93    /// Total processing time in nanoseconds
94    pub processing_time_ns: u64,
95}
96
97impl Topology {
98    /// Compute a topological ordering of nodes using Kahn's algorithm.
99    ///
100    /// Returns node IDs in execution order (sources first, sinks last).
101    /// Returns `None` if the graph contains a cycle.
102    pub fn topological_order(&self) -> Option<Vec<u32>> {
103        let node_ids: Vec<u32> = self.nodes.iter().map(|n| n.id).collect();
104        let max_id = node_ids.iter().copied().max().unwrap_or(0) as usize;
105
106        // Build in-degree map and adjacency list
107        let mut in_degree = vec![0u32; max_id + 1];
108        let mut adj: Vec<Vec<u32>> = vec![Vec::new(); max_id + 1];
109
110        for edge in &self.edges {
111            in_degree[edge.to as usize] += 1;
112            adj[edge.from as usize].push(edge.to);
113        }
114
115        // Initialize queue with zero in-degree nodes
116        let mut queue: VecDeque<u32> = VecDeque::new();
117        for &id in &node_ids {
118            if in_degree[id as usize] == 0 {
119                queue.push_back(id);
120            }
121        }
122
123        let mut order = Vec::with_capacity(node_ids.len());
124
125        while let Some(node) = queue.pop_front() {
126            order.push(node);
127            for &neighbor in &adj[node as usize] {
128                in_degree[neighbor as usize] -= 1;
129                if in_degree[neighbor as usize] == 0 {
130                    queue.push_back(neighbor);
131                }
132            }
133        }
134
135        if order.len() == node_ids.len() {
136            Some(order)
137        } else {
138            None // Cycle detected
139        }
140    }
141
142    /// Validate the topology (check for cycles).
143    ///
144    /// Returns `Ok(())` if the topology is a valid DAG, or `Err` with a description
145    /// of the problem.
146    pub fn validate(&self) -> Result<(), String> {
147        if self.topological_order().is_none() {
148            return Err("Topology contains a cycle".to_string());
149        }
150        Ok(())
151    }
152
153    /// Serialize the topology to JSON for UI consumption.
154    pub fn to_json(&self) -> Result<String, serde_json::Error> {
155        serde_json::to_string_pretty(self)
156    }
157
158    /// Get a node by ID.
159    pub fn get_node(&self, id: u32) -> Option<&TopologyNode> {
160        self.nodes.iter().find(|n| n.id == id)
161    }
162
163    /// Get all edges originating from a node.
164    pub fn outgoing_edges(&self, node_id: u32) -> Vec<&TopologyEdge> {
165        self.edges.iter().filter(|e| e.from == node_id).collect()
166    }
167
168    /// Get all edges targeting a node.
169    pub fn incoming_edges(&self, node_id: u32) -> Vec<&TopologyEdge> {
170        self.edges.iter().filter(|e| e.to == node_id).collect()
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    fn make_node(id: u32, name: &str, node_type: NodeType) -> TopologyNode {
179        TopologyNode {
180            id,
181            name: name.to_string(),
182            node_type,
183            operation_count: 0,
184            operation_summary: String::new(),
185            metrics: NodeMetrics::default(),
186        }
187    }
188
189    fn make_edge(from: u32, to: u32) -> TopologyEdge {
190        TopologyEdge {
191            from,
192            to,
193            edge_type: EdgeType::EventType,
194            label: String::new(),
195        }
196    }
197
198    #[test]
199    fn test_empty_topology() {
200        let topo = Topology {
201            nodes: vec![],
202            edges: vec![],
203            sources: vec![],
204            sinks: vec![],
205        };
206        assert_eq!(topo.topological_order(), Some(vec![]));
207        assert!(topo.validate().is_ok());
208    }
209
210    #[test]
211    fn test_linear_topology() {
212        // A → B → C
213        let topo = Topology {
214            nodes: vec![
215                make_node(0, "A", NodeType::Source),
216                make_node(1, "B", NodeType::Stream),
217                make_node(2, "C", NodeType::Sink),
218            ],
219            edges: vec![make_edge(0, 1), make_edge(1, 2)],
220            sources: vec![0],
221            sinks: vec![2],
222        };
223
224        let order = topo.topological_order().unwrap();
225        assert_eq!(order, vec![0, 1, 2]);
226        assert!(topo.validate().is_ok());
227    }
228
229    #[test]
230    fn test_fanout_topology() {
231        // A → B, A → C
232        let topo = Topology {
233            nodes: vec![
234                make_node(0, "Source", NodeType::Source),
235                make_node(1, "Stream1", NodeType::Stream),
236                make_node(2, "Stream2", NodeType::Stream),
237            ],
238            edges: vec![make_edge(0, 1), make_edge(0, 2)],
239            sources: vec![0],
240            sinks: vec![1, 2],
241        };
242
243        let order = topo.topological_order().unwrap();
244        assert_eq!(order[0], 0); // Source must come first
245        assert!(topo.validate().is_ok());
246    }
247
248    #[test]
249    fn test_cycle_detection() {
250        // A → B → A (cycle)
251        let topo = Topology {
252            nodes: vec![
253                make_node(0, "A", NodeType::Stream),
254                make_node(1, "B", NodeType::Stream),
255            ],
256            edges: vec![make_edge(0, 1), make_edge(1, 0)],
257            sources: vec![],
258            sinks: vec![],
259        };
260
261        assert!(topo.topological_order().is_none());
262        assert!(topo.validate().is_err());
263    }
264
265    #[test]
266    fn test_json_serialization() {
267        let topo = Topology {
268            nodes: vec![make_node(0, "Source", NodeType::Source)],
269            edges: vec![],
270            sources: vec![0],
271            sinks: vec![],
272        };
273
274        let json = topo.to_json().unwrap();
275        assert!(json.contains("Source"));
276        let roundtrip: Topology = serde_json::from_str(&json).unwrap();
277        assert_eq!(roundtrip.nodes.len(), 1);
278    }
279
280    #[test]
281    fn test_edge_queries() {
282        let topo = Topology {
283            nodes: vec![
284                make_node(0, "A", NodeType::Source),
285                make_node(1, "B", NodeType::Stream),
286                make_node(2, "C", NodeType::Sink),
287            ],
288            edges: vec![make_edge(0, 1), make_edge(1, 2)],
289            sources: vec![0],
290            sinks: vec![2],
291        };
292
293        assert_eq!(topo.outgoing_edges(0).len(), 1);
294        assert_eq!(topo.incoming_edges(1).len(), 1);
295        assert_eq!(topo.outgoing_edges(2).len(), 0);
296    }
297}