1use crate::{State, edge::Edge};
13use indexmap::IndexMap;
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::sync::Arc;
16
17#[derive(Debug, thiserror::Error)]
22pub enum TopologyError {
23 #[error("node '{name}' already exists")]
24 DuplicateNode { name: String },
25
26 #[error("node '{name}' is invalid: {reason}")]
27 InvalidNodeName { name: String, reason: String },
28
29 #[error("no entry point set")]
30 NoEntryPoint,
31
32 #[error("edge references non-existent node '{name}'")]
33 NodeNotFound { name: String },
34
35 #[error(
36 "conditional edge from '{from}' branch '{branch}' targets non-existent node '{target}'"
37 )]
38 EdgeTargetNotFound {
39 from: String,
40 branch: String,
41 target: String,
42 },
43
44 #[error("node '{name}' has no incoming or outgoing edges (isolated)")]
45 IsolatedNode { name: String },
46
47 #[error("node '{name}' is unreachable from entry point")]
48 UnreachableNode { name: String },
49
50 #[error("potential infinite loop detected, path: {cycle:?}")]
51 PotentialInfiniteLoop { cycle: Vec<String> },
52
53 #[error(
54 "field index {index} in {context} is out of range (state has {field_count} fields: {field_names:?})"
55 )]
56 InvalidFieldReference {
57 index: usize,
58 field_count: usize,
59 field_names: &'static [&'static str],
60 context: String,
61 },
62}
63
64struct TarjanSCC {
68 index: usize,
69 stack: Vec<String>,
70 indices: HashMap<String, usize>,
71 lowlink: HashMap<String, usize>,
72 onstack: HashSet<String>,
73 sccs: Vec<Vec<String>>,
74}
75
76impl TarjanSCC {
77 fn new() -> Self {
78 Self {
79 index: 0,
80 stack: Vec::new(),
81 indices: HashMap::new(),
82 lowlink: HashMap::new(),
83 onstack: HashSet::new(),
84 sccs: Vec::new(),
85 }
86 }
87
88 fn find_sccs<S: State>(
90 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
91 edges: &[Edge<S>],
92 ) -> Vec<Vec<String>> {
93 let mut tarjan = Self::new();
94 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
95
96 for node_name in nodes.keys() {
98 adjacency.entry(node_name.clone()).or_default();
99 }
100
101 for edge in edges {
102 match edge {
103 Edge::Fixed { from, to } => {
104 if from != crate::edge::START && to != crate::edge::END {
105 adjacency.entry(from.clone()).or_default().push(to.clone());
106 }
107 }
108 Edge::Conditional { from, path_map, .. } => {
109 if from != crate::edge::START {
110 let targets = adjacency.entry(from.clone()).or_default();
111 for target in path_map.iter().map(|(_, v)| v) {
112 if target != crate::edge::END {
113 targets.push(target.clone());
114 }
115 }
116 }
117 }
118 }
119 }
120
121 for node_name in nodes.keys() {
123 if !tarjan.indices.contains_key(node_name) {
124 tarjan.visit(node_name, &adjacency);
125 }
126 }
127
128 tarjan.sccs
129 }
130
131 fn visit(&mut self, node: &str, adjacency: &HashMap<String, Vec<String>>) {
132 self.indices.insert(node.to_string(), self.index);
133 self.lowlink.insert(node.to_string(), self.index);
134 self.index += 1;
135 self.stack.push(node.to_string());
136 self.onstack.insert(node.to_string());
137
138 if let Some(neighbors) = adjacency.get(node) {
139 for neighbor in neighbors {
140 if !self.indices.contains_key(neighbor) {
141 self.visit(neighbor, adjacency);
142 let low = self.lowlink.get(node).copied().unwrap_or(0);
143 let neighbor_low = self.lowlink.get(neighbor).copied().unwrap_or(0);
144 self.lowlink.insert(node.to_string(), low.min(neighbor_low));
145 } else if self.onstack.contains(neighbor) {
146 let low = self.lowlink.get(node).copied().unwrap_or(0);
147 let neighbor_idx = self.indices.get(neighbor).copied().unwrap_or(0);
148 self.lowlink.insert(node.to_string(), low.min(neighbor_idx));
149 }
150 }
151 }
152
153 if self.lowlink.get(node) == self.indices.get(node) {
154 let mut scc = Vec::new();
155 loop {
156 let w = self.stack.pop().expect("stack should not be empty");
157 self.onstack.remove(&w);
158 scc.push(w.clone());
159 if w == node {
160 break;
161 }
162 }
163 self.sccs.push(scc);
164 }
165 }
166}
167
168pub(super) struct TopologyValidator;
173
174impl TopologyValidator {
175 pub(super) fn validate<S: State>(
181 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
182 edges: &[Edge<S>],
183 entry_point: Option<&str>,
184 builder_metadata: &IndexMap<String, crate::graph::builder::NodeMetadata>,
185 ) -> Result<(), TopologyError> {
186 Self::check_entry_point(entry_point)?;
187 Self::check_edge_targets(nodes, edges)?;
188 Self::check_reachability(nodes, edges, entry_point, builder_metadata)?;
189 Self::check_isolated_nodes(nodes, edges)?;
190 Self::check_infinite_loops(nodes, edges)?;
191
192 Ok(())
193 }
194
195 const fn check_entry_point(entry_point: Option<&str>) -> Result<(), TopologyError> {
197 if entry_point.is_none() {
198 return Err(TopologyError::NoEntryPoint);
199 }
200 Ok(())
201 }
202
203 fn check_edge_targets<S: State>(
205 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
206 edges: &[Edge<S>],
207 ) -> Result<(), TopologyError> {
208 for edge in edges {
209 match edge {
210 Edge::Fixed { from, to } => {
211 if from != crate::edge::START && !nodes.contains_key(from) {
212 return Err(TopologyError::NodeNotFound { name: from.clone() });
213 }
214 if to != crate::edge::END && !nodes.contains_key(to) {
215 return Err(TopologyError::NodeNotFound { name: to.clone() });
216 }
217 }
218 Edge::Conditional { from, path_map, .. } => {
219 if from != crate::edge::START && !nodes.contains_key(from) {
220 return Err(TopologyError::NodeNotFound { name: from.clone() });
221 }
222 for (branch, target) in path_map.iter() {
223 if target != crate::edge::END && !nodes.contains_key(target) {
224 return Err(TopologyError::EdgeTargetNotFound {
225 from: from.clone(),
226 branch: branch.clone(),
227 target: target.clone(),
228 });
229 }
230 }
231 }
232 }
233 }
234 Ok(())
235 }
236
237 fn check_reachability<S: State>(
239 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
240 edges: &[Edge<S>],
241 entry_point: Option<&str>,
242 builder_metadata: &IndexMap<String, crate::graph::builder::NodeMetadata>,
243 ) -> Result<(), TopologyError> {
244 let entry = entry_point.expect("entry point should exist");
245
246 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
248 for node_name in nodes.keys() {
249 adjacency.entry(node_name.clone()).or_default();
250 }
251
252 for edge in edges {
253 match edge {
254 Edge::Fixed { from, to } => {
255 if from == crate::edge::START {
256 adjacency.entry(to.clone()).or_default();
257 } else if to != crate::edge::END {
258 adjacency.entry(from.clone()).or_default().push(to.clone());
259 }
260 }
261 Edge::Conditional { from, path_map, .. } => {
262 if from == crate::edge::START {
263 for target in path_map.iter().map(|(_, v)| v) {
264 adjacency.entry(target.clone()).or_default();
265 }
266 } else {
267 let targets = adjacency.entry(from.clone()).or_default();
268 for target in path_map.iter().map(|(_, v)| v) {
269 if target != crate::edge::END {
270 targets.push(target.clone());
271 }
272 }
273 }
274 }
275 }
276 }
277
278 for (node_name, meta) in builder_metadata {
280 if let Some(ref fallback) = meta.fallback_node {
281 adjacency
282 .entry(node_name.clone())
283 .or_default()
284 .push(fallback.clone());
285 }
286 }
287
288 let mut visited: HashSet<String> = HashSet::new();
290 let mut queue: VecDeque<String> = VecDeque::new();
291 queue.push_back(entry.to_string());
292 visited.insert(entry.to_string());
293
294 while let Some(current) = queue.pop_front() {
295 if let Some(neighbors) = adjacency.get(¤t) {
296 for neighbor in neighbors {
297 if visited.insert(neighbor.clone()) {
298 queue.push_back(neighbor.clone());
299 }
300 }
301 }
302 }
303
304 for node_name in nodes.keys() {
306 if !visited.contains(node_name) {
307 return Err(TopologyError::UnreachableNode {
308 name: node_name.clone(),
309 });
310 }
311 }
312
313 Ok(())
314 }
315
316 fn check_isolated_nodes<S: State>(
318 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
319 edges: &[Edge<S>],
320 ) -> Result<(), TopologyError> {
321 let mut has_incoming: HashSet<String> = HashSet::new();
322 let mut has_outgoing: HashSet<String> = HashSet::new();
323
324 for edge in edges {
325 match edge {
326 Edge::Fixed { from, to } => {
327 if from != crate::edge::START {
328 has_outgoing.insert(from.clone());
329 }
330 if to != crate::edge::END {
331 has_incoming.insert(to.clone());
332 }
333 }
334 Edge::Conditional { from, path_map, .. } => {
335 if from != crate::edge::START {
336 has_outgoing.insert(from.clone());
337 }
338 for target in path_map.iter().map(|(_, v)| v) {
339 if target != crate::edge::END {
340 has_incoming.insert(target.clone());
341 }
342 }
343 }
344 }
345 }
346
347 for node_name in nodes.keys() {
348 if !has_incoming.contains(node_name) && !has_outgoing.contains(node_name) {
349 return Err(TopologyError::IsolatedNode {
350 name: node_name.clone(),
351 });
352 }
353 }
354
355 Ok(())
356 }
357
358 fn check_infinite_loops<S: State>(
366 nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
367 edges: &[Edge<S>],
368 ) -> Result<(), TopologyError> {
369 let sccs = TarjanSCC::find_sccs(nodes, edges);
370
371 for scc in sccs {
372 if scc.len() <= 1 {
373 continue;
374 }
375
376 let scc_set: HashSet<&str> = scc.iter().map(String::as_str).collect();
377
378 let has_exit = scc.iter().any(|node| {
381 edges.iter().any(|edge| {
382 matches!(edge, Edge::Conditional { from, path_map, .. }
383 if from == node && path_map.iter().any(|(_, target)| {
384 target == crate::edge::END || !scc_set.contains(target.as_str())
385 }))
386 })
387 });
388
389 if !has_exit {
390 return Err(TopologyError::PotentialInfiniteLoop { cycle: scc });
391 }
392 }
393
394 Ok(())
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::{node::IntoNode, node::NodeFnUpdate};
402
403 #[test]
404 fn test_tarjan_scc_simple() {
405 let mut nodes: IndexMap<String, Arc<dyn crate::Node<StateDummy>>> = IndexMap::new();
406 nodes.insert("a".to_string(), mock_node("a"));
407 nodes.insert("b".to_string(), mock_node("b"));
408
409 let edges = vec![Edge::Fixed {
410 from: "a".to_string(),
411 to: "b".to_string(),
412 }];
413
414 let sccs = TarjanSCC::find_sccs(&nodes, &edges);
415 assert_eq!(sccs.len(), 2);
416 }
417
418 #[test]
419 fn test_tarjan_scc_cycle() {
420 let mut nodes: IndexMap<String, Arc<dyn crate::Node<StateDummy>>> = IndexMap::new();
421 nodes.insert("a".to_string(), mock_node("a"));
422 nodes.insert("b".to_string(), mock_node("b"));
423
424 let edges = vec![
425 Edge::Fixed {
426 from: "a".to_string(),
427 to: "b".to_string(),
428 },
429 Edge::Fixed {
430 from: "b".to_string(),
431 to: "a".to_string(),
432 },
433 ];
434
435 let sccs = TarjanSCC::find_sccs(&nodes, &edges);
436 assert_eq!(sccs.len(), 1);
437 assert_eq!(sccs[0].len(), 2);
438 }
439
440 fn mock_node(name: &str) -> Arc<dyn crate::Node<StateDummy>> {
441 NodeFnUpdate(|_s: &StateDummy| async move { Ok(StateDummyUpdate) }).into_node(name)
442 }
443
444 #[derive(Clone, Debug, Default)]
445 struct StateDummy;
446
447 impl crate::State for StateDummy {
448 type Update = StateDummyUpdate;
449 type FieldVersions = crate::state::FieldVersions;
450
451 fn apply(&mut self, _update: Self::Update) -> crate::FieldsChanged {
452 crate::FieldsChanged(0)
453 }
454
455 fn reset_ephemeral(&mut self) {}
456 }
457
458 #[derive(Clone, Debug, Default)]
459 struct StateDummyUpdate;
460}
461
462