Skip to main content

juncture_core/graph/
topology.rs

1//! Topology validation for `StateGraph`
2//!
3//! Provides comprehensive validation of graph structure including:
4//! - Entry point verification
5//! - Node existence checks for all edges
6//! - Conditional edge path map validation
7//! - Reachability analysis via BFS
8//! - Isolated node detection
9//! - Unreachable node detection
10//! - SCC-based infinite loop detection (Tarjan's algorithm)
11
12use crate::{State, edge::Edge};
13use indexmap::IndexMap;
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::sync::Arc;
16
17/// Topology validation errors
18///
19/// These errors indicate structural problems with a graph that would
20/// prevent correct execution.
21#[derive(Debug, thiserror::Error)]
22pub enum TopologyError {
23    #[error("node '{name}' already exists")]
24    DuplicateNode { name: String },
25
26    #[error("node '{name}' is invalid: {reason}")]
27    InvalidNodeName { name: String, reason: String },
28
29    #[error("no entry point set")]
30    NoEntryPoint,
31
32    #[error("edge references non-existent node '{name}'")]
33    NodeNotFound { name: String },
34
35    #[error(
36        "conditional edge from '{from}' branch '{branch}' targets non-existent node '{target}'"
37    )]
38    EdgeTargetNotFound {
39        from: String,
40        branch: String,
41        target: String,
42    },
43
44    #[error("node '{name}' has no incoming or outgoing edges (isolated)")]
45    IsolatedNode { name: String },
46
47    #[error("node '{name}' is unreachable from entry point")]
48    UnreachableNode { name: String },
49
50    #[error("potential infinite loop detected, path: {cycle:?}")]
51    PotentialInfiniteLoop { cycle: Vec<String> },
52
53    #[error(
54        "field index {index} in {context} is out of range (state has {field_count} fields: {field_names:?})"
55    )]
56    InvalidFieldReference {
57        index: usize,
58        field_count: usize,
59        field_names: &'static [&'static str],
60        context: String,
61    },
62}
63
64/// Strongly connected components finder using `Tarjan`'s algorithm
65///
66/// Used to detect cycles in the graph that could cause infinite loops.
67struct TarjanSCC {
68    index: usize,
69    stack: Vec<String>,
70    indices: HashMap<String, usize>,
71    lowlink: HashMap<String, usize>,
72    onstack: HashSet<String>,
73    sccs: Vec<Vec<String>>,
74}
75
76impl TarjanSCC {
77    fn new() -> Self {
78        Self {
79            index: 0,
80            stack: Vec::new(),
81            indices: HashMap::new(),
82            lowlink: HashMap::new(),
83            onstack: HashSet::new(),
84            sccs: Vec::new(),
85        }
86    }
87
88    /// Find all strongly connected components in the graph
89    fn find_sccs<S: State>(
90        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
91        edges: &[Edge<S>],
92    ) -> Vec<Vec<String>> {
93        let mut tarjan = Self::new();
94        let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
95
96        // Build adjacency list
97        for node_name in nodes.keys() {
98            adjacency.entry(node_name.clone()).or_default();
99        }
100
101        for edge in edges {
102            match edge {
103                Edge::Fixed { from, to } => {
104                    if from != crate::edge::START && to != crate::edge::END {
105                        adjacency.entry(from.clone()).or_default().push(to.clone());
106                    }
107                }
108                Edge::Conditional { from, path_map, .. } => {
109                    if from != crate::edge::START {
110                        let targets = adjacency.entry(from.clone()).or_default();
111                        for target in path_map.iter().map(|(_, v)| v) {
112                            if target != crate::edge::END {
113                                targets.push(target.clone());
114                            }
115                        }
116                    }
117                }
118            }
119        }
120
121        // Run Tarjan's algorithm
122        for node_name in nodes.keys() {
123            if !tarjan.indices.contains_key(node_name) {
124                tarjan.visit(node_name, &adjacency);
125            }
126        }
127
128        tarjan.sccs
129    }
130
131    fn visit(&mut self, node: &str, adjacency: &HashMap<String, Vec<String>>) {
132        self.indices.insert(node.to_string(), self.index);
133        self.lowlink.insert(node.to_string(), self.index);
134        self.index += 1;
135        self.stack.push(node.to_string());
136        self.onstack.insert(node.to_string());
137
138        if let Some(neighbors) = adjacency.get(node) {
139            for neighbor in neighbors {
140                if !self.indices.contains_key(neighbor) {
141                    self.visit(neighbor, adjacency);
142                    let low = self.lowlink.get(node).copied().unwrap_or(0);
143                    let neighbor_low = self.lowlink.get(neighbor).copied().unwrap_or(0);
144                    self.lowlink.insert(node.to_string(), low.min(neighbor_low));
145                } else if self.onstack.contains(neighbor) {
146                    let low = self.lowlink.get(node).copied().unwrap_or(0);
147                    let neighbor_idx = self.indices.get(neighbor).copied().unwrap_or(0);
148                    self.lowlink.insert(node.to_string(), low.min(neighbor_idx));
149                }
150            }
151        }
152
153        if self.lowlink.get(node) == self.indices.get(node) {
154            let mut scc = Vec::new();
155            loop {
156                let w = self.stack.pop().expect("stack should not be empty");
157                self.onstack.remove(&w);
158                scc.push(w.clone());
159                if w == node {
160                    break;
161                }
162            }
163            self.sccs.push(scc);
164        }
165    }
166}
167
168/// Validates the topology of a `StateGraph`
169///
170/// Performs comprehensive checks to ensure the graph structure is valid
171/// and can be executed correctly.
172pub(super) struct TopologyValidator;
173
174impl TopologyValidator {
175    /// Validate the complete graph topology
176    ///
177    /// # Errors
178    ///
179    /// Returns [`TopologyError`] if any validation check fails.
180    pub(super) fn validate<S: State>(
181        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
182        edges: &[Edge<S>],
183        entry_point: Option<&str>,
184    ) -> Result<(), TopologyError> {
185        Self::check_entry_point(entry_point)?;
186        Self::check_edge_targets(nodes, edges)?;
187        Self::check_reachability(nodes, edges, entry_point)?;
188        Self::check_isolated_nodes(nodes, edges)?;
189        Self::check_infinite_loops(nodes, edges)?;
190
191        Ok(())
192    }
193
194    /// Check that entry point is set
195    const fn check_entry_point(entry_point: Option<&str>) -> Result<(), TopologyError> {
196        if entry_point.is_none() {
197            return Err(TopologyError::NoEntryPoint);
198        }
199        Ok(())
200    }
201
202    /// Check that all edge references point to existing nodes
203    fn check_edge_targets<S: State>(
204        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
205        edges: &[Edge<S>],
206    ) -> Result<(), TopologyError> {
207        for edge in edges {
208            match edge {
209                Edge::Fixed { from, to } => {
210                    if from != crate::edge::START && !nodes.contains_key(from) {
211                        return Err(TopologyError::NodeNotFound { name: from.clone() });
212                    }
213                    if to != crate::edge::END && !nodes.contains_key(to) {
214                        return Err(TopologyError::NodeNotFound { name: to.clone() });
215                    }
216                }
217                Edge::Conditional { from, path_map, .. } => {
218                    if from != crate::edge::START && !nodes.contains_key(from) {
219                        return Err(TopologyError::NodeNotFound { name: from.clone() });
220                    }
221                    for (branch, target) in path_map.iter() {
222                        if target != crate::edge::END && !nodes.contains_key(target) {
223                            return Err(TopologyError::EdgeTargetNotFound {
224                                from: from.clone(),
225                                branch: branch.clone(),
226                                target: target.clone(),
227                            });
228                        }
229                    }
230                }
231            }
232        }
233        Ok(())
234    }
235
236    /// Check that all nodes are reachable from the entry point
237    fn check_reachability<S: State>(
238        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
239        edges: &[Edge<S>],
240        entry_point: Option<&str>,
241    ) -> Result<(), TopologyError> {
242        let entry = entry_point.expect("entry point should exist");
243
244        // Build adjacency list for BFS
245        let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
246        for node_name in nodes.keys() {
247            adjacency.entry(node_name.clone()).or_default();
248        }
249
250        for edge in edges {
251            match edge {
252                Edge::Fixed { from, to } => {
253                    if from == crate::edge::START {
254                        adjacency.entry(to.clone()).or_default();
255                    } else if to != crate::edge::END {
256                        adjacency.entry(from.clone()).or_default().push(to.clone());
257                    }
258                }
259                Edge::Conditional { from, path_map, .. } => {
260                    if from == crate::edge::START {
261                        for target in path_map.iter().map(|(_, v)| v) {
262                            adjacency.entry(target.clone()).or_default();
263                        }
264                    } else {
265                        let targets = adjacency.entry(from.clone()).or_default();
266                        for target in path_map.iter().map(|(_, v)| v) {
267                            if target != crate::edge::END {
268                                targets.push(target.clone());
269                            }
270                        }
271                    }
272                }
273            }
274        }
275
276        // BFS from entry point
277        let mut visited: HashSet<String> = HashSet::new();
278        let mut queue: VecDeque<String> = VecDeque::new();
279        queue.push_back(entry.to_string());
280        visited.insert(entry.to_string());
281
282        while let Some(current) = queue.pop_front() {
283            if let Some(neighbors) = adjacency.get(&current) {
284                for neighbor in neighbors {
285                    if visited.insert(neighbor.clone()) {
286                        queue.push_back(neighbor.clone());
287                    }
288                }
289            }
290        }
291
292        // Check for unreachable nodes
293        for node_name in nodes.keys() {
294            if !visited.contains(node_name) {
295                return Err(TopologyError::UnreachableNode {
296                    name: node_name.clone(),
297                });
298            }
299        }
300
301        Ok(())
302    }
303
304    /// Check for isolated nodes (no incoming or outgoing edges)
305    fn check_isolated_nodes<S: State>(
306        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
307        edges: &[Edge<S>],
308    ) -> Result<(), TopologyError> {
309        let mut has_incoming: HashSet<String> = HashSet::new();
310        let mut has_outgoing: HashSet<String> = HashSet::new();
311
312        for edge in edges {
313            match edge {
314                Edge::Fixed { from, to } => {
315                    if from != crate::edge::START {
316                        has_outgoing.insert(from.clone());
317                    }
318                    if to != crate::edge::END {
319                        has_incoming.insert(to.clone());
320                    }
321                }
322                Edge::Conditional { from, path_map, .. } => {
323                    if from != crate::edge::START {
324                        has_outgoing.insert(from.clone());
325                    }
326                    for target in path_map.iter().map(|(_, v)| v) {
327                        if target != crate::edge::END {
328                            has_incoming.insert(target.clone());
329                        }
330                    }
331                }
332            }
333        }
334
335        for node_name in nodes.keys() {
336            if !has_incoming.contains(node_name) && !has_outgoing.contains(node_name) {
337                return Err(TopologyError::IsolatedNode {
338                    name: node_name.clone(),
339                });
340            }
341        }
342
343        Ok(())
344    }
345
346    /// Check for potential infinite loops using SCC analysis
347    ///
348    /// Cycles are allowed if at least one node in the SCC has a conditional
349    /// edge that can exit the cycle -- either by routing to END or to a node
350    /// outside the SCC.  This covers intentional agent loops (e.g. `ReAct`
351    /// agent -> tools -> agent) where the agent conditionally routes to a
352    /// summary/finish node outside the loop.
353    fn check_infinite_loops<S: State>(
354        nodes: &IndexMap<String, Arc<dyn crate::Node<S>>>,
355        edges: &[Edge<S>],
356    ) -> Result<(), TopologyError> {
357        let sccs = TarjanSCC::find_sccs(nodes, edges);
358
359        for scc in sccs {
360            if scc.len() <= 1 {
361                continue;
362            }
363
364            let scc_set: HashSet<&str> = scc.iter().map(String::as_str).collect();
365
366            // A cycle is safe if any node in the SCC has a conditional edge
367            // whose path_map contains a target outside the SCC (including END).
368            let has_exit = scc.iter().any(|node| {
369                edges.iter().any(|edge| {
370                    matches!(edge, Edge::Conditional { from, path_map, .. }
371                    if from == node && path_map.iter().any(|(_, target)| {
372                        target == crate::edge::END || !scc_set.contains(target.as_str())
373                    }))
374                })
375            });
376
377            if !has_exit {
378                return Err(TopologyError::PotentialInfiniteLoop { cycle: scc });
379            }
380        }
381
382        Ok(())
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use crate::{node::IntoNode, node::NodeFnUpdate};
390
391    #[test]
392    fn test_tarjan_scc_simple() {
393        let mut nodes: IndexMap<String, Arc<dyn crate::Node<StateDummy>>> = IndexMap::new();
394        nodes.insert("a".to_string(), mock_node("a"));
395        nodes.insert("b".to_string(), mock_node("b"));
396
397        let edges = vec![Edge::Fixed {
398            from: "a".to_string(),
399            to: "b".to_string(),
400        }];
401
402        let sccs = TarjanSCC::find_sccs(&nodes, &edges);
403        assert_eq!(sccs.len(), 2);
404    }
405
406    #[test]
407    fn test_tarjan_scc_cycle() {
408        let mut nodes: IndexMap<String, Arc<dyn crate::Node<StateDummy>>> = IndexMap::new();
409        nodes.insert("a".to_string(), mock_node("a"));
410        nodes.insert("b".to_string(), mock_node("b"));
411
412        let edges = vec![
413            Edge::Fixed {
414                from: "a".to_string(),
415                to: "b".to_string(),
416            },
417            Edge::Fixed {
418                from: "b".to_string(),
419                to: "a".to_string(),
420            },
421        ];
422
423        let sccs = TarjanSCC::find_sccs(&nodes, &edges);
424        assert_eq!(sccs.len(), 1);
425        assert_eq!(sccs[0].len(), 2);
426    }
427
428    fn mock_node(name: &str) -> Arc<dyn crate::Node<StateDummy>> {
429        NodeFnUpdate(|_s: &StateDummy| async move { Ok(StateDummyUpdate) }).into_node(name)
430    }
431
432    #[derive(Clone, Debug, Default)]
433    struct StateDummy;
434
435    impl crate::State for StateDummy {
436        type Update = StateDummyUpdate;
437        type FieldVersions = crate::state::FieldVersions;
438
439        fn apply(&mut self, _update: Self::Update) -> crate::FieldsChanged {
440            crate::FieldsChanged(0)
441        }
442
443        fn reset_ephemeral(&mut self) {}
444    }
445
446    #[derive(Clone, Debug, Default)]
447    struct StateDummyUpdate;
448}
449
450// Rust guideline compliant 2026-05-19