weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! Graph compilation logic and validation.

use crate::app::App;
use crate::types::NodeKind;
use rustc_hash::{FxHashMap, FxHashSet};
use std::collections::VecDeque;

/// Errors that can occur when compiling a graph.
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))]
pub enum GraphCompileError {
    /// No entry edge defined from the virtual Start node.
    #[error("missing entry: no edge or conditional edge originates from Start")]
    MissingEntry,

    /// An edge references an unregistered node.
    #[error("unknown node referenced in edge: {0}")]
    UnknownNode(NodeKind),

    /// An edge originates from the virtual End node, which is terminal.
    #[error("invalid edge: cannot originate from End")]
    EdgeFromEnd,

    /// A cycle was detected among unconditional edges.
    #[error("cycle detected in graph: {}", .cycle.iter().map(|n| n.to_string()).collect::<Vec<_>>().join(" -> "))]
    CycleDetected {
        /// Nodes forming the cycle, in traversal order.
        cycle: Vec<NodeKind>,
    },

    /// One or more nodes are unreachable from Start.
    #[error("unreachable nodes detected (no path from Start): {}", .nodes.iter().map(|n| n.to_string()).collect::<Vec<_>>().join(", "))]
    UnreachableNodes {
        /// Nodes with no path from Start.
        nodes: Vec<NodeKind>,
    },

    /// One or more nodes have no path to End.
    #[error("nodes with no path to End: {}", .nodes.iter().map(|n| n.to_string()).collect::<Vec<_>>().join(", "))]
    NoPathToEnd {
        /// Nodes that cannot reach End.
        nodes: Vec<NodeKind>,
    },

    /// A duplicate unconditional edge was detected.
    #[error("duplicate edge detected: {} -> {}", .from, .to)]
    DuplicateEdge {
        /// Source node.
        from: NodeKind,
        /// Target node.
        to: NodeKind,
    },
}

impl super::builder::GraphBuilder {
    /// Validates the graph and produces an executable [`App`].
    ///
    /// ```
    /// use weavegraph::graphs::GraphBuilder;
    /// use weavegraph::types::NodeKind;
    ///
    /// # struct MyNode;
    /// # #[async_trait::async_trait]
    /// # impl weavegraph::node::Node for MyNode {
    /// #     async fn run(&self, _: weavegraph::state::StateSnapshot, _: weavegraph::node::NodeContext) -> Result<weavegraph::node::NodePartial, weavegraph::node::NodeError> {
    /// #         Ok(weavegraph::node::NodePartial::default())
    /// #     }
    /// # }
    ///
    /// let app = GraphBuilder::new()
    ///     .add_node(NodeKind::Custom("process".into()), MyNode)
    ///     .add_edge(NodeKind::Start, NodeKind::Custom("process".into()))
    ///     .add_edge(NodeKind::Custom("process".into()), NodeKind::End)
    ///     .compile()?;
    /// # Ok::<_, weavegraph::graphs::GraphCompileError>(())
    /// ```
    pub fn compile(self) -> Result<App, GraphCompileError> {
        self.validate()?;
        let (nodes, edges, conditional_edges, runtime_config, reducer_registry) = self.into_parts();
        Ok(App::from_parts(
            nodes,
            edges,
            conditional_edges,
            runtime_config,
            reducer_registry,
        ))
    }

    /// Validates the graph for structural correctness.
    ///
    /// Checks in order:
    /// - At least one entry edge from Start (unconditional or conditional)
    /// - No cycle in unconditional edges
    /// - All registered nodes reachable from Start (skipped when conditional edges exist)
    /// - All registered nodes have a path to End (skipped when conditional edges exist)
    /// - No duplicate unconditional edges
    /// - No edge from End; all Custom node references are registered
    pub fn validate(&self) -> Result<(), GraphCompileError> {
        let has_start_edge = self
            .edges_ref()
            .get(&NodeKind::Start)
            .is_some_and(|v| !v.is_empty())
            || self
                .conditional_edges_ref()
                .iter()
                .any(|ce| ce.from() == &NodeKind::Start);

        if !has_start_edge {
            return Err(GraphCompileError::MissingEntry);
        }

        if let Some(cycle) = self.detect_cycle() {
            return Err(GraphCompileError::CycleDetected { cycle });
        }

        if self.conditional_edges_ref().is_empty() {
            let unreachable = self.detect_unreachable_nodes();
            if !unreachable.is_empty() {
                return Err(GraphCompileError::UnreachableNodes { nodes: unreachable });
            }

            let no_path = self.detect_no_path_to_end();
            if !no_path.is_empty() {
                return Err(GraphCompileError::NoPathToEnd { nodes: no_path });
            }
        }

        if let Some((from, to)) = self.detect_duplicate_edge() {
            return Err(GraphCompileError::DuplicateEdge { from, to });
        }

        for (from, tos) in self.edges_ref() {
            if matches!(from, NodeKind::End) {
                return Err(GraphCompileError::EdgeFromEnd);
            }
            if let NodeKind::Custom(_) = from
                && !self.nodes_ref().contains_key(from)
            {
                return Err(GraphCompileError::UnknownNode(from.clone()));
            }
            for to in tos {
                if let NodeKind::Custom(_) = to
                    && !self.nodes_ref().contains_key(to)
                {
                    return Err(GraphCompileError::UnknownNode(to.clone()));
                }
            }
        }

        Ok(())
    }

    fn detect_cycle(&self) -> Option<Vec<NodeKind>> {
        #[derive(Clone, Copy, PartialEq)]
        enum Color {
            White,
            Gray,
            Black,
        }

        fn dfs(
            node: &NodeKind,
            colors: &mut FxHashMap<NodeKind, Color>,
            path: &mut Vec<NodeKind>,
            edges: &FxHashMap<NodeKind, Vec<NodeKind>>,
        ) -> Option<Vec<NodeKind>> {
            colors.insert(node.clone(), Color::Gray);
            path.push(node.clone());

            for neighbor in edges.get(node).into_iter().flatten() {
                match colors.get(neighbor).copied().unwrap_or(Color::White) {
                    Color::White => {
                        if let Some(cycle) = dfs(neighbor, colors, path, edges) {
                            return Some(cycle);
                        }
                    }
                    Color::Gray => {
                        if let Some(start) = path.iter().position(|n| n == neighbor) {
                            let mut cycle = path[start..].to_vec();
                            cycle.push(neighbor.clone());
                            return Some(cycle);
                        }
                    }
                    Color::Black => {}
                }
            }

            path.pop();
            colors.insert(node.clone(), Color::Black);
            None
        }

        let mut colors: FxHashMap<NodeKind, Color> = self
            .edges_ref()
            .iter()
            .flat_map(|(from, tos)| std::iter::once(from).chain(tos))
            .map(|n| (n.clone(), Color::White))
            .collect();

        let nodes: Vec<NodeKind> = colors.keys().cloned().collect();
        let mut path = Vec::new();

        for node in &nodes {
            if colors.get(node).copied() == Some(Color::White)
                && let Some(cycle) = dfs(node, &mut colors, &mut path, self.edges_ref())
            {
                return Some(cycle);
            }
        }

        None
    }

    fn detect_unreachable_nodes(&self) -> Vec<NodeKind> {
        let mut reachable: FxHashSet<NodeKind> = FxHashSet::default();
        let mut queue: VecDeque<NodeKind> = VecDeque::new();

        queue.push_back(NodeKind::Start);
        reachable.insert(NodeKind::Start);

        while let Some(node) = queue.pop_front() {
            for neighbor in self.edges_ref().get(&node).into_iter().flatten() {
                if reachable.insert(neighbor.clone()) {
                    queue.push_back(neighbor.clone());
                }
            }
        }

        let mut unreachable: Vec<NodeKind> = self
            .nodes_ref()
            .keys()
            .filter(|n| !reachable.contains(*n))
            .cloned()
            .collect();
        unreachable.sort_by_key(|n| n.to_string());
        unreachable
    }

    fn detect_no_path_to_end(&self) -> Vec<NodeKind> {
        let mut reverse: FxHashMap<NodeKind, Vec<NodeKind>> = FxHashMap::default();
        for (from, tos) in self.edges_ref() {
            for to in tos {
                reverse.entry(to.clone()).or_default().push(from.clone());
            }
        }

        let mut can_reach_end: FxHashSet<NodeKind> = FxHashSet::default();
        let mut queue: VecDeque<NodeKind> = VecDeque::new();

        queue.push_back(NodeKind::End);
        can_reach_end.insert(NodeKind::End);

        while let Some(node) = queue.pop_front() {
            for pred in reverse.get(&node).into_iter().flatten() {
                if can_reach_end.insert(pred.clone()) {
                    queue.push_back(pred.clone());
                }
            }
        }

        let mut no_path: Vec<NodeKind> = self
            .nodes_ref()
            .keys()
            .filter(|n| !can_reach_end.contains(*n))
            .cloned()
            .collect();
        no_path.sort_by_key(|n| n.to_string());
        no_path
    }

    fn detect_duplicate_edge(&self) -> Option<(NodeKind, NodeKind)> {
        for (from, tos) in self.edges_ref() {
            let mut seen: FxHashSet<&NodeKind> = FxHashSet::default();
            for to in tos {
                if !seen.insert(to) {
                    return Some((from.clone(), to.clone()));
                }
            }
        }
        None
    }
}