Skip to main content

juncture_core/graph/
topology.rs

1//! Topology validation for `StateGraph`
2//!
3//! Provides comprehensive validation of graph structure including:
4//! - Entry point verification
5//! - Node existence checks for all edges
6//! - Conditional edge path map validation
7//! - Reachability analysis via BFS
8//! - Isolated node detection
9//! - Unreachable node detection
10//! - SCC-based infinite loop detection (Tarjan's algorithm)
11
12use crate::{State, edge::Edge};
13use indexmap::IndexMap;
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::sync::Arc;
16
17/// Topology validation errors
18///
19/// These errors indicate structural problems with a graph that would
20/// prevent correct execution.
21#[derive(Debug, thiserror::Error)]
22pub enum TopologyError {
23    #[error("node '{name}' already exists")]
24    DuplicateNode { name: String },
25
26    #[error("node '{name}' is invalid: {reason}")]
27    InvalidNodeName { name: String, reason: String },
28
29    #[error("no entry point set")]
30    NoEntryPoint,
31
32    #[error("edge references non-existent node '{name}'")]
33    NodeNotFound { name: String },
34
35    #[error(
36        "conditional edge from '{from}' branch '{branch}' targets non-existent node '{target}'"
37    )]
38    EdgeTargetNotFound {
39        from: String,
40        branch: String,
41        target: String,
42    },
43
44    #[error("node '{name}' has no incoming or outgoing edges (isolated)")]
45    IsolatedNode { name: String },
46
47    #[error("node '{name}' is unreachable from entry point")]
48    UnreachableNode { name: String },
49
50    #[error("potential infinite loop detected, path: {cycle:?}")]
51    PotentialInfiniteLoop { cycle: Vec<String> },
52
53    #[error(
54        "field index {index} in {context} is out of range (state has {field_count} fields: {field_names:?})"
55    )]
56    InvalidFieldReference {
57        index: usize,
58        field_count: usize,
59        field_names: &'static [&'static str],
60        context: String,
61    },
62}
63
64/// Strongly connected components finder using `Tarjan`'s algorithm
65///
66/// Used to detect cycles in the graph that could cause infinite loops.
67struct TarjanSCC {
68    index: usize,
69    stack: Vec<String>,
70    indices: HashMap<String, usize>,
71    lowlink: HashMap<String, usize>,
72    onstack: HashSet<String>,
73    sccs: Vec<Vec<String>>,
74}
75
76impl TarjanSCC {
77    fn new() -> Self {
78        Self {
79            index: 0,
80            stack: Vec::new(),
81            indices: HashMap::new(),
82            lowlink: HashMap::new(),
83            onstack: HashSet::new(),
84            sccs: Vec::new(),
85        }
86    }
87
88    /// Find all strongly connected components in the graph
89    fn find_sccs<S: State>(
90        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
91        edges: &[Edge<S>],
92    ) -> Vec<Vec<String>> {
93        let mut tarjan = Self::new();
94        let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
95
96        // Build adjacency list
97        for node_name in nodes.keys() {
98            adjacency.entry(node_name.clone()).or_default();
99        }
100
101        for edge in edges {
102            match edge {
103                Edge::Fixed { from, to } => {
104                    if from != crate::edge::START && to != crate::edge::END {
105                        adjacency.entry(from.clone()).or_default().push(to.clone());
106                    }
107                }
108                Edge::Conditional { from, path_map, .. } => {
109                    if from != crate::edge::START {
110                        let targets = adjacency.entry(from.clone()).or_default();
111                        for target in path_map.iter().map(|(_, v)| v) {
112                            if target != crate::edge::END {
113                                targets.push(target.clone());
114                            }
115                        }
116                    }
117                }
118            }
119        }
120
121        // Run Tarjan's algorithm
122        for node_name in nodes.keys() {
123            if !tarjan.indices.contains_key(node_name) {
124                tarjan.visit(node_name, &adjacency);
125            }
126        }
127
128        tarjan.sccs
129    }
130
131    fn visit(&mut self, node: &str, adjacency: &HashMap<String, Vec<String>>) {
132        self.indices.insert(node.to_string(), self.index);
133        self.lowlink.insert(node.to_string(), self.index);
134        self.index += 1;
135        self.stack.push(node.to_string());
136        self.onstack.insert(node.to_string());
137
138        if let Some(neighbors) = adjacency.get(node) {
139            for neighbor in neighbors {
140                if !self.indices.contains_key(neighbor) {
141                    self.visit(neighbor, adjacency);
142                    let low = self.lowlink.get(node).copied().unwrap_or(0);
143                    let neighbor_low = self.lowlink.get(neighbor).copied().unwrap_or(0);
144                    self.lowlink.insert(node.to_string(), low.min(neighbor_low));
145                } else if self.onstack.contains(neighbor) {
146                    let low = self.lowlink.get(node).copied().unwrap_or(0);
147                    let neighbor_idx = self.indices.get(neighbor).copied().unwrap_or(0);
148                    self.lowlink.insert(node.to_string(), low.min(neighbor_idx));
149                }
150            }
151        }
152
153        if self.lowlink.get(node) == self.indices.get(node) {
154            let mut scc = Vec::new();
155            loop {
156                let w = self.stack.pop().expect("stack should not be empty");
157                self.onstack.remove(&w);
158                scc.push(w.clone());
159                if w == node {
160                    break;
161                }
162            }
163            self.sccs.push(scc);
164        }
165    }
166}
167
168/// Validates the topology of a `StateGraph`
169///
170/// Performs comprehensive checks to ensure the graph structure is valid
171/// and can be executed correctly.
172pub(super) struct TopologyValidator;
173
174impl TopologyValidator {
175    /// Validate the complete graph topology
176    ///
177    /// # Errors
178    ///
179    /// Returns [`TopologyError`] if any validation check fails.
180    pub(super) fn validate<S: State>(
181        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
182        edges: &[Edge<S>],
183        entry_point: Option<&str>,
184        builder_metadata: &IndexMap<String, crate::graph::builder::NodeMetadata>,
185    ) -> Result<(), TopologyError> {
186        Self::check_entry_point(entry_point)?;
187        Self::check_edge_targets(nodes, edges)?;
188        Self::check_reachability(nodes, edges, entry_point, builder_metadata)?;
189        Self::check_isolated_nodes(nodes, edges)?;
190        Self::check_infinite_loops(nodes, edges)?;
191
192        Ok(())
193    }
194
195    /// Check that entry point is set
196    const fn check_entry_point(entry_point: Option<&str>) -> Result<(), TopologyError> {
197        if entry_point.is_none() {
198            return Err(TopologyError::NoEntryPoint);
199        }
200        Ok(())
201    }
202
203    /// Check that all edge references point to existing nodes
204    fn check_edge_targets<S: State>(
205        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
206        edges: &[Edge<S>],
207    ) -> Result<(), TopologyError> {
208        for edge in edges {
209            match edge {
210                Edge::Fixed { from, to } => {
211                    if from != crate::edge::START && !nodes.contains_key(from) {
212                        return Err(TopologyError::NodeNotFound { name: from.clone() });
213                    }
214                    if to != crate::edge::END && !nodes.contains_key(to) {
215                        return Err(TopologyError::NodeNotFound { name: to.clone() });
216                    }
217                }
218                Edge::Conditional { from, path_map, .. } => {
219                    if from != crate::edge::START && !nodes.contains_key(from) {
220                        return Err(TopologyError::NodeNotFound { name: from.clone() });
221                    }
222                    for (branch, target) in path_map.iter() {
223                        if target != crate::edge::END && !nodes.contains_key(target) {
224                            return Err(TopologyError::EdgeTargetNotFound {
225                                from: from.clone(),
226                                branch: branch.clone(),
227                                target: target.clone(),
228                            });
229                        }
230                    }
231                }
232            }
233        }
234        Ok(())
235    }
236
237    /// Check that all nodes are reachable from the entry point
238    fn check_reachability<S: State>(
239        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
240        edges: &[Edge<S>],
241        entry_point: Option<&str>,
242        builder_metadata: &IndexMap<String, crate::graph::builder::NodeMetadata>,
243    ) -> Result<(), TopologyError> {
244        let entry = entry_point.expect("entry point should exist");
245
246        // Build adjacency list for BFS
247        let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
248        for node_name in nodes.keys() {
249            adjacency.entry(node_name.clone()).or_default();
250        }
251
252        for edge in edges {
253            match edge {
254                Edge::Fixed { from, to } => {
255                    if from == crate::edge::START {
256                        adjacency.entry(to.clone()).or_default();
257                    } else if to != crate::edge::END {
258                        adjacency.entry(from.clone()).or_default().push(to.clone());
259                    }
260                }
261                Edge::Conditional { from, path_map, .. } => {
262                    if from == crate::edge::START {
263                        for target in path_map.iter().map(|(_, v)| v) {
264                            adjacency.entry(target.clone()).or_default();
265                        }
266                    } else {
267                        let targets = adjacency.entry(from.clone()).or_default();
268                        for target in path_map.iter().map(|(_, v)| v) {
269                            if target != crate::edge::END {
270                                targets.push(target.clone());
271                            }
272                        }
273                    }
274                }
275            }
276        }
277
278        // Add fallback edges to adjacency list
279        for (node_name, meta) in builder_metadata {
280            if let Some(ref fallback) = meta.fallback_node {
281                adjacency
282                    .entry(node_name.clone())
283                    .or_default()
284                    .push(fallback.clone());
285            }
286        }
287
288        // BFS from entry point
289        let mut visited: HashSet<String> = HashSet::new();
290        let mut queue: VecDeque<String> = VecDeque::new();
291        queue.push_back(entry.to_string());
292        visited.insert(entry.to_string());
293
294        while let Some(current) = queue.pop_front() {
295            if let Some(neighbors) = adjacency.get(&current) {
296                for neighbor in neighbors {
297                    if visited.insert(neighbor.clone()) {
298                        queue.push_back(neighbor.clone());
299                    }
300                }
301            }
302        }
303
304        // Check for unreachable nodes
305        for node_name in nodes.keys() {
306            if !visited.contains(node_name) {
307                return Err(TopologyError::UnreachableNode {
308                    name: node_name.clone(),
309                });
310            }
311        }
312
313        Ok(())
314    }
315
316    /// Check for isolated nodes (no incoming or outgoing edges)
317    fn check_isolated_nodes<S: State>(
318        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
319        edges: &[Edge<S>],
320    ) -> Result<(), TopologyError> {
321        let mut has_incoming: HashSet<String> = HashSet::new();
322        let mut has_outgoing: HashSet<String> = HashSet::new();
323
324        for edge in edges {
325            match edge {
326                Edge::Fixed { from, to } => {
327                    if from != crate::edge::START {
328                        has_outgoing.insert(from.clone());
329                    }
330                    if to != crate::edge::END {
331                        has_incoming.insert(to.clone());
332                    }
333                }
334                Edge::Conditional { from, path_map, .. } => {
335                    if from != crate::edge::START {
336                        has_outgoing.insert(from.clone());
337                    }
338                    for target in path_map.iter().map(|(_, v)| v) {
339                        if target != crate::edge::END {
340                            has_incoming.insert(target.clone());
341                        }
342                    }
343                }
344            }
345        }
346
347        for node_name in nodes.keys() {
348            if !has_incoming.contains(node_name) && !has_outgoing.contains(node_name) {
349                return Err(TopologyError::IsolatedNode {
350                    name: node_name.clone(),
351                });
352            }
353        }
354
355        Ok(())
356    }
357
358    /// Check for potential infinite loops using SCC analysis
359    ///
360    /// Cycles are allowed if at least one node in the SCC has a conditional
361    /// edge that can exit the cycle -- either by routing to END or to a node
362    /// outside the SCC.  This covers intentional agent loops (e.g. `ReAct`
363    /// agent -> tools -> agent) where the agent conditionally routes to a
364    /// summary/finish node outside the loop.
365    fn check_infinite_loops<S: State>(
366        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
367        edges: &[Edge<S>],
368    ) -> Result<(), TopologyError> {
369        let sccs = TarjanSCC::find_sccs(nodes, edges);
370
371        for scc in sccs {
372            if scc.len() <= 1 {
373                continue;
374            }
375
376            let scc_set: HashSet<&str> = scc.iter().map(String::as_str).collect();
377
378            // A cycle is safe if any node in the SCC has a conditional edge
379            // whose path_map contains a target outside the SCC (including END).
380            let has_exit = scc.iter().any(|node| {
381                edges.iter().any(|edge| {
382                    matches!(edge, Edge::Conditional { from, path_map, .. }
383                    if from == node && path_map.iter().any(|(_, target)| {
384                        target == crate::edge::END || !scc_set.contains(target.as_str())
385                    }))
386                })
387            });
388
389            if !has_exit {
390                return Err(TopologyError::PotentialInfiniteLoop { cycle: scc });
391            }
392        }
393
394        Ok(())
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::{node::IntoNode, node::NodeFnUpdate};
402
403    #[test]
404    fn test_tarjan_scc_simple() {
405        let mut nodes: IndexMap<String, Arc<dyn crate::Node<StateDummy>>> = IndexMap::new();
406        nodes.insert("a".to_string(), mock_node("a"));
407        nodes.insert("b".to_string(), mock_node("b"));
408
409        let edges = vec![Edge::Fixed {
410            from: "a".to_string(),
411            to: "b".to_string(),
412        }];
413
414        let sccs = TarjanSCC::find_sccs(&nodes, &edges);
415        assert_eq!(sccs.len(), 2);
416    }
417
418    #[test]
419    fn test_tarjan_scc_cycle() {
420        let mut nodes: IndexMap<String, Arc<dyn crate::Node<StateDummy>>> = IndexMap::new();
421        nodes.insert("a".to_string(), mock_node("a"));
422        nodes.insert("b".to_string(), mock_node("b"));
423
424        let edges = vec![
425            Edge::Fixed {
426                from: "a".to_string(),
427                to: "b".to_string(),
428            },
429            Edge::Fixed {
430                from: "b".to_string(),
431                to: "a".to_string(),
432            },
433        ];
434
435        let sccs = TarjanSCC::find_sccs(&nodes, &edges);
436        assert_eq!(sccs.len(), 1);
437        assert_eq!(sccs[0].len(), 2);
438    }
439
440    fn mock_node(name: &str) -> Arc<dyn crate::Node<StateDummy>> {
441        NodeFnUpdate(|_s: &StateDummy| async move { Ok(StateDummyUpdate) }).into_node(name)
442    }
443
444    #[derive(Clone, Debug, Default)]
445    struct StateDummy;
446
447    impl crate::State for StateDummy {
448        type Update = StateDummyUpdate;
449        type FieldVersions = crate::state::FieldVersions;
450
451        fn apply(&mut self, _update: Self::Update) -> crate::FieldsChanged {
452            crate::FieldsChanged(0)
453        }
454
455        fn reset_ephemeral(&mut self) {}
456    }
457
458    #[derive(Clone, Debug, Default)]
459    struct StateDummyUpdate;
460}
461
462// Rust guideline compliant 2026-05-19