1use crate::{State, edge::Edge};
13use indexmap::IndexMap;
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::sync::Arc;
16
17#[derive(Debug, thiserror::Error)]
22pub enum TopologyError {
23 #[error("node '{name}' already exists")]
24 DuplicateNode { name: String },
25
26 #[error("node '{name}' is invalid: {reason}")]
27 InvalidNodeName { name: String, reason: String },
28
29 #[error("no entry point set")]
30 NoEntryPoint,
31
32 #[error("edge references non-existent node '{name}'")]
33 NodeNotFound { name: String },
34
35 #[error(
36 "conditional edge from '{from}' branch '{branch}' targets non-existent node '{target}'"
37 )]
38 EdgeTargetNotFound {
39 from: String,
40 branch: String,
41 target: String,
42 },
43
44 #[error("node '{name}' has no incoming or outgoing edges (isolated)")]
45 IsolatedNode { name: String },
46
47 #[error("node '{name}' is unreachable from entry point")]
48 UnreachableNode { name: String },
49
50 #[error("potential infinite loop detected, path: {cycle:?}")]
51 PotentialInfiniteLoop { cycle: Vec<String> },
52
53 #[error(
54 "field index {index} in {context} is out of range (state has {field_count} fields: {field_names:?})"
55 )]
56 InvalidFieldReference {
57 index: usize,
58 field_count: usize,
59 field_names: &'static [&'static str],
60 context: String,
61 },
62}
63
64struct TarjanSCC {
68 index: usize,
69 stack: Vec<String>,
70 indices: HashMap<String, usize>,
71 lowlink: HashMap<String, usize>,
72 onstack: HashSet<String>,
73 sccs: Vec<Vec<String>>,
74}
75
76impl TarjanSCC {
77 fn new() -> Self {
78 Self {
79 index: 0,
80 stack: Vec::new(),
81 indices: HashMap::new(),
82 lowlink: HashMap::new(),
83 onstack: HashSet::new(),
84 sccs: Vec::new(),
85 }
86 }
87
88 fn find_sccs<S: State>(
90 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
91 edges: &[Edge<S>],
92 ) -> Vec<Vec<String>> {
93 let mut tarjan = Self::new();
94 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
95
96 for node_name in nodes.keys() {
98 adjacency.entry(node_name.clone()).or_default();
99 }
100
101 for edge in edges {
102 match edge {
103 Edge::Fixed { from, to } => {
104 if from != crate::edge::START && to != crate::edge::END {
105 adjacency.entry(from.clone()).or_default().push(to.clone());
106 }
107 }
108 Edge::Conditional { from, path_map, .. } => {
109 if from != crate::edge::START {
110 let targets = adjacency.entry(from.clone()).or_default();
111 for target in path_map.iter().map(|(_, v)| v) {
112 if target != crate::edge::END {
113 targets.push(target.clone());
114 }
115 }
116 }
117 }
118 }
119 }
120
121 for node_name in nodes.keys() {
123 if !tarjan.indices.contains_key(node_name) {
124 tarjan.visit(node_name, &adjacency);
125 }
126 }
127
128 tarjan.sccs
129 }
130
131 fn visit(&mut self, node: &str, adjacency: &HashMap<String, Vec<String>>) {
132 self.indices.insert(node.to_string(), self.index);
133 self.lowlink.insert(node.to_string(), self.index);
134 self.index += 1;
135 self.stack.push(node.to_string());
136 self.onstack.insert(node.to_string());
137
138 if let Some(neighbors) = adjacency.get(node) {
139 for neighbor in neighbors {
140 if !self.indices.contains_key(neighbor) {
141 self.visit(neighbor, adjacency);
142 let low = self.lowlink.get(node).copied().unwrap_or(0);
143 let neighbor_low = self.lowlink.get(neighbor).copied().unwrap_or(0);
144 self.lowlink.insert(node.to_string(), low.min(neighbor_low));
145 } else if self.onstack.contains(neighbor) {
146 let low = self.lowlink.get(node).copied().unwrap_or(0);
147 let neighbor_idx = self.indices.get(neighbor).copied().unwrap_or(0);
148 self.lowlink.insert(node.to_string(), low.min(neighbor_idx));
149 }
150 }
151 }
152
153 if self.lowlink.get(node) == self.indices.get(node) {
154 let mut scc = Vec::new();
155 loop {
156 let w = self.stack.pop().expect("stack should not be empty");
157 self.onstack.remove(&w);
158 scc.push(w.clone());
159 if w == node {
160 break;
161 }
162 }
163 self.sccs.push(scc);
164 }
165 }
166}
167
168pub(super) struct TopologyValidator;
173
174impl TopologyValidator {
175 pub(super) fn validate<S: State>(
181 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
182 edges: &[Edge<S>],
183 entry_point: Option<&str>,
184 ) -> Result<(), TopologyError> {
185 Self::check_entry_point(entry_point)?;
186 Self::check_edge_targets(nodes, edges)?;
187 Self::check_reachability(nodes, edges, entry_point)?;
188 Self::check_isolated_nodes(nodes, edges)?;
189 Self::check_infinite_loops(nodes, edges)?;
190
191 Ok(())
192 }
193
194 const fn check_entry_point(entry_point: Option<&str>) -> Result<(), TopologyError> {
196 if entry_point.is_none() {
197 return Err(TopologyError::NoEntryPoint);
198 }
199 Ok(())
200 }
201
202 fn check_edge_targets<S: State>(
204 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
205 edges: &[Edge<S>],
206 ) -> Result<(), TopologyError> {
207 for edge in edges {
208 match edge {
209 Edge::Fixed { from, to } => {
210 if from != crate::edge::START && !nodes.contains_key(from) {
211 return Err(TopologyError::NodeNotFound { name: from.clone() });
212 }
213 if to != crate::edge::END && !nodes.contains_key(to) {
214 return Err(TopologyError::NodeNotFound { name: to.clone() });
215 }
216 }
217 Edge::Conditional { from, path_map, .. } => {
218 if from != crate::edge::START && !nodes.contains_key(from) {
219 return Err(TopologyError::NodeNotFound { name: from.clone() });
220 }
221 for (branch, target) in path_map.iter() {
222 if target != crate::edge::END && !nodes.contains_key(target) {
223 return Err(TopologyError::EdgeTargetNotFound {
224 from: from.clone(),
225 branch: branch.clone(),
226 target: target.clone(),
227 });
228 }
229 }
230 }
231 }
232 }
233 Ok(())
234 }
235
236 fn check_reachability<S: State>(
238 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
239 edges: &[Edge<S>],
240 entry_point: Option<&str>,
241 ) -> Result<(), TopologyError> {
242 let entry = entry_point.expect("entry point should exist");
243
244 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
246 for node_name in nodes.keys() {
247 adjacency.entry(node_name.clone()).or_default();
248 }
249
250 for edge in edges {
251 match edge {
252 Edge::Fixed { from, to } => {
253 if from == crate::edge::START {
254 adjacency.entry(to.clone()).or_default();
255 } else if to != crate::edge::END {
256 adjacency.entry(from.clone()).or_default().push(to.clone());
257 }
258 }
259 Edge::Conditional { from, path_map, .. } => {
260 if from == crate::edge::START {
261 for target in path_map.iter().map(|(_, v)| v) {
262 adjacency.entry(target.clone()).or_default();
263 }
264 } else {
265 let targets = adjacency.entry(from.clone()).or_default();
266 for target in path_map.iter().map(|(_, v)| v) {
267 if target != crate::edge::END {
268 targets.push(target.clone());
269 }
270 }
271 }
272 }
273 }
274 }
275
276 let mut visited: HashSet<String> = HashSet::new();
278 let mut queue: VecDeque<String> = VecDeque::new();
279 queue.push_back(entry.to_string());
280 visited.insert(entry.to_string());
281
282 while let Some(current) = queue.pop_front() {
283 if let Some(neighbors) = adjacency.get(¤t) {
284 for neighbor in neighbors {
285 if visited.insert(neighbor.clone()) {
286 queue.push_back(neighbor.clone());
287 }
288 }
289 }
290 }
291
292 for node_name in nodes.keys() {
294 if !visited.contains(node_name) {
295 return Err(TopologyError::UnreachableNode {
296 name: node_name.clone(),
297 });
298 }
299 }
300
301 Ok(())
302 }
303
304 fn check_isolated_nodes<S: State>(
306 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
307 edges: &[Edge<S>],
308 ) -> Result<(), TopologyError> {
309 let mut has_incoming: HashSet<String> = HashSet::new();
310 let mut has_outgoing: HashSet<String> = HashSet::new();
311
312 for edge in edges {
313 match edge {
314 Edge::Fixed { from, to } => {
315 if from != crate::edge::START {
316 has_outgoing.insert(from.clone());
317 }
318 if to != crate::edge::END {
319 has_incoming.insert(to.clone());
320 }
321 }
322 Edge::Conditional { from, path_map, .. } => {
323 if from != crate::edge::START {
324 has_outgoing.insert(from.clone());
325 }
326 for target in path_map.iter().map(|(_, v)| v) {
327 if target != crate::edge::END {
328 has_incoming.insert(target.clone());
329 }
330 }
331 }
332 }
333 }
334
335 for node_name in nodes.keys() {
336 if !has_incoming.contains(node_name) && !has_outgoing.contains(node_name) {
337 return Err(TopologyError::IsolatedNode {
338 name: node_name.clone(),
339 });
340 }
341 }
342
343 Ok(())
344 }
345
346 fn check_infinite_loops<S: State>(
354 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
355 edges: &[Edge<S>],
356 ) -> Result<(), TopologyError> {
357 let sccs = TarjanSCC::find_sccs(nodes, edges);
358
359 for scc in sccs {
360 if scc.len() <= 1 {
361 continue;
362 }
363
364 let scc_set: HashSet<&str> = scc.iter().map(String::as_str).collect();
365
366 let has_exit = scc.iter().any(|node| {
369 edges.iter().any(|edge| {
370 matches!(edge, Edge::Conditional { from, path_map, .. }
371 if from == node && path_map.iter().any(|(_, target)| {
372 target == crate::edge::END || !scc_set.contains(target.as_str())
373 }))
374 })
375 });
376
377 if !has_exit {
378 return Err(TopologyError::PotentialInfiniteLoop { cycle: scc });
379 }
380 }
381
382 Ok(())
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use crate::{node::IntoNode, node::NodeFnUpdate};
390
391 #[test]
392 fn test_tarjan_scc_simple() {
393 let mut nodes: IndexMap<String, Arc<dyn crate::Node<StateDummy>>> = IndexMap::new();
394 nodes.insert("a".to_string(), mock_node("a"));
395 nodes.insert("b".to_string(), mock_node("b"));
396
397 let edges = vec![Edge::Fixed {
398 from: "a".to_string(),
399 to: "b".to_string(),
400 }];
401
402 let sccs = TarjanSCC::find_sccs(&nodes, &edges);
403 assert_eq!(sccs.len(), 2);
404 }
405
406 #[test]
407 fn test_tarjan_scc_cycle() {
408 let mut nodes: IndexMap<String, Arc<dyn crate::Node<StateDummy>>> = IndexMap::new();
409 nodes.insert("a".to_string(), mock_node("a"));
410 nodes.insert("b".to_string(), mock_node("b"));
411
412 let edges = vec![
413 Edge::Fixed {
414 from: "a".to_string(),
415 to: "b".to_string(),
416 },
417 Edge::Fixed {
418 from: "b".to_string(),
419 to: "a".to_string(),
420 },
421 ];
422
423 let sccs = TarjanSCC::find_sccs(&nodes, &edges);
424 assert_eq!(sccs.len(), 1);
425 assert_eq!(sccs[0].len(), 2);
426 }
427
428 fn mock_node(name: &str) -> Arc<dyn crate::Node<StateDummy>> {
429 NodeFnUpdate(|_s: &StateDummy| async move { Ok(StateDummyUpdate) }).into_node(name)
430 }
431
432 #[derive(Clone, Debug, Default)]
433 struct StateDummy;
434
435 impl crate::State for StateDummy {
436 type Update = StateDummyUpdate;
437 type FieldVersions = crate::state::FieldVersions;
438
439 fn apply(&mut self, _update: Self::Update) -> crate::FieldsChanged {
440 crate::FieldsChanged(0)
441 }
442
443 fn reset_ephemeral(&mut self) {}
444 }
445
446 #[derive(Clone, Debug, Default)]
447 struct StateDummyUpdate;
448}
449
450