weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! GraphBuilder — fluent API for constructing workflow graphs.

use rustc_hash::FxHashMap;
use std::sync::Arc;

use super::edges::{ConditionalEdge, EdgePredicate};
use crate::node::Node;
use crate::reducers::{Reducer, ReducerRegistry};
use crate::runtimes::{EventBusConfig, RuntimeConfig};
use crate::types::{ChannelType, NodeKind};

// Deconstructed builder state passed to the compiler.
type GraphParts = (
    FxHashMap<NodeKind, Arc<dyn Node>>,
    FxHashMap<NodeKind, Vec<NodeKind>>,
    Vec<ConditionalEdge>,
    RuntimeConfig,
    ReducerRegistry,
);

/// Fluent builder for workflow graphs.
///
/// Chain `add_node`, `add_edge`, and configuration calls, then call
/// [`compile`](Self::compile) to produce an executable [`App`](crate::app::App).
///
/// `NodeKind::Start` and `NodeKind::End` are virtual endpoints — never register
/// them with `add_node`, but connect edges to/from them to define entry and
/// exit points.
///
/// # Examples
///
/// ```
/// use weavegraph::graphs::GraphBuilder;
/// use weavegraph::types::NodeKind;
///
/// # struct MyNode;
/// # #[async_trait::async_trait]
/// # impl weavegraph::node::Node for MyNode {
/// #     async fn run(&self, _: weavegraph::state::StateSnapshot, _: weavegraph::node::NodeContext) -> Result<weavegraph::node::NodePartial, weavegraph::node::NodeError> {
/// #         Ok(weavegraph::node::NodePartial::default())
/// #     }
/// # }
///
/// let app = GraphBuilder::new()
///     .add_node(NodeKind::Custom("worker".into()), MyNode)
///     .add_edge(NodeKind::Start, NodeKind::Custom("worker".into()))
///     .add_edge(NodeKind::Custom("worker".into()), NodeKind::End)
///     .compile();
/// ```
pub struct GraphBuilder {
    // Node registry keyed by identifier.
    nodes: FxHashMap<NodeKind, Arc<dyn Node>>,
    // Unconditional edges: source → targets.
    edges: FxHashMap<NodeKind, Vec<NodeKind>>,
    // Conditional edges for state-driven routing.
    conditional_edges: Vec<ConditionalEdge>,
    // Runtime configuration carried into the compiled app.
    runtime_config: RuntimeConfig,
    // Reducer registry for channel update operations.
    reducer_registry: ReducerRegistry,
}

impl Default for GraphBuilder {
    fn default() -> Self {
        Self::new()
    }
}

impl GraphBuilder {
    /// Creates an empty builder.
    #[must_use]
    pub fn new() -> Self {
        Self {
            nodes: FxHashMap::default(),
            edges: FxHashMap::default(),
            conditional_edges: Vec::new(),
            runtime_config: RuntimeConfig::default(),
            reducer_registry: ReducerRegistry::default(),
        }
    }

    /// Registers a node with the given identifier.
    ///
    /// Attempting to register `NodeKind::Start` or `NodeKind::End` logs a
    /// warning and is otherwise a no-op — those identifiers are virtual.
    #[must_use]
    pub fn add_node(mut self, id: NodeKind, node: impl Node + 'static) -> Self {
        match id {
            NodeKind::Start | NodeKind::End => {
                tracing::warn!(
                    ?id,
                    "Ignoring registration of virtual node kind (Start/End are virtual)"
                );
            }
            _ => {
                self.nodes.insert(id, Arc::new(node));
            }
        }
        self
    }

    /// Adds an unconditional edge from `from` to `to`.
    #[must_use]
    pub fn add_edge(mut self, from: NodeKind, to: NodeKind) -> Self {
        self.edges.entry(from).or_default().push(to);
        self
    }

    /// Adds a conditional edge.
    ///
    /// When execution reaches `from`, `predicate` is called with the current
    /// [`StateSnapshot`](crate::state::StateSnapshot) and returns the names of
    /// the next nodes to activate.
    #[must_use]
    pub fn add_conditional_edge(mut self, from: NodeKind, predicate: EdgePredicate) -> Self {
        self.conditional_edges
            .push(ConditionalEdge::new(from, predicate));
        self
    }

    /// Sets the runtime configuration for the compiled app.
    #[must_use]
    pub fn with_runtime_config(mut self, runtime_config: RuntimeConfig) -> Self {
        self.runtime_config = runtime_config;
        self
    }

    /// Overrides the event bus configuration while keeping all other runtime settings.
    #[must_use]
    pub fn with_event_bus_config(mut self, config: EventBusConfig) -> Self {
        self.runtime_config.event_bus = config;
        self
    }

    /// Appends a custom reducer for `channel`.
    ///
    /// Multiple reducers for the same channel are applied in registration order.
    ///
    /// ```
    /// use std::sync::Arc;
    /// use weavegraph::graphs::GraphBuilder;
    /// use weavegraph::reducers::{Reducer, AddMessages};
    /// use weavegraph::types::{ChannelType, NodeKind};
    ///
    /// # struct MyNode;
    /// # #[async_trait::async_trait]
    /// # impl weavegraph::node::Node for MyNode {
    /// #     async fn run(&self, _: weavegraph::state::StateSnapshot, _: weavegraph::node::NodeContext) -> Result<weavegraph::node::NodePartial, weavegraph::node::NodeError> {
    /// #         Ok(weavegraph::node::NodePartial::default())
    /// #     }
    /// # }
    ///
    /// let app = GraphBuilder::new()
    ///     .add_node(NodeKind::Custom("worker".into()), MyNode)
    ///     .with_reducer(ChannelType::Message, Arc::new(AddMessages))
    ///     .add_edge(NodeKind::Start, NodeKind::Custom("worker".into()))
    ///     .add_edge(NodeKind::Custom("worker".into()), NodeKind::End)
    ///     .compile();
    /// ```
    #[must_use]
    pub fn with_reducer(mut self, channel: ChannelType, reducer: Arc<dyn Reducer>) -> Self {
        self.reducer_registry.register(channel, reducer);
        self
    }

    /// Replaces the entire reducer registry.
    #[must_use]
    pub fn with_reducer_registry(mut self, registry: ReducerRegistry) -> Self {
        self.reducer_registry = registry;
        self
    }

    /// Iterates over all registered node identifiers.
    ///
    /// Virtual `Start` and `End` nodes are not included.
    ///
    /// ```
    /// use weavegraph::graphs::GraphBuilder;
    /// use weavegraph::types::NodeKind;
    ///
    /// # struct MyNode;
    /// # #[async_trait::async_trait]
    /// # impl weavegraph::node::Node for MyNode {
    /// #     async fn run(&self, _: weavegraph::state::StateSnapshot, _: weavegraph::node::NodeContext) -> Result<weavegraph::node::NodePartial, weavegraph::node::NodeError> {
    /// #         Ok(weavegraph::node::NodePartial::default())
    /// #     }
    /// # }
    ///
    /// let builder = GraphBuilder::new()
    ///     .add_node(NodeKind::Custom("A".into()), MyNode)
    ///     .add_node(NodeKind::Custom("B".into()), MyNode);
    ///
    /// assert_eq!(builder.nodes().count(), 2);
    /// ```
    pub fn nodes(&self) -> super::iteration::NodesIter<'_> {
        super::iteration::NodesIter::new(self.nodes.keys())
    }

    /// Iterates over all unconditional edges as `(source, target)` pairs.
    ///
    /// Iteration order is not deterministic; use [`topological_sort`](Self::topological_sort)
    /// for ordered traversal.
    ///
    /// ```
    /// use weavegraph::graphs::GraphBuilder;
    /// use weavegraph::types::NodeKind;
    ///
    /// # struct MyNode;
    /// # #[async_trait::async_trait]
    /// # impl weavegraph::node::Node for MyNode {
    /// #     async fn run(&self, _: weavegraph::state::StateSnapshot, _: weavegraph::node::NodeContext) -> Result<weavegraph::node::NodePartial, weavegraph::node::NodeError> {
    /// #         Ok(weavegraph::node::NodePartial::default())
    /// #     }
    /// # }
    ///
    /// let builder = GraphBuilder::new()
    ///     .add_node(NodeKind::Custom("A".into()), MyNode)
    ///     .add_edge(NodeKind::Start, NodeKind::Custom("A".into()))
    ///     .add_edge(NodeKind::Custom("A".into()), NodeKind::End);
    ///
    /// assert_eq!(builder.edges().count(), 2);
    /// ```
    pub fn edges(&self) -> super::iteration::EdgesIter<'_> {
        super::iteration::EdgesIter::new(&self.edges)
    }

    /// Returns the number of registered nodes (excludes virtual nodes).
    #[must_use]
    pub fn node_count(&self) -> usize {
        self.nodes.len()
    }

    /// Returns the total number of unconditional edges.
    #[must_use]
    pub fn edge_count(&self) -> usize {
        self.edges.values().map(|v| v.len()).sum()
    }

    /// Returns a topologically sorted node list.
    ///
    /// `NodeKind::Start` is always first; `NodeKind::End` is always last.
    /// Nodes at the same depth are sorted lexicographically for determinism.
    /// Nodes in cycles (if any) are excluded from the result.
    ///
    /// ```
    /// use weavegraph::graphs::GraphBuilder;
    /// use weavegraph::types::NodeKind;
    ///
    /// # struct MyNode;
    /// # #[async_trait::async_trait]
    /// # impl weavegraph::node::Node for MyNode {
    /// #     async fn run(&self, _: weavegraph::state::StateSnapshot, _: weavegraph::node::NodeContext) -> Result<weavegraph::node::NodePartial, weavegraph::node::NodeError> {
    /// #         Ok(weavegraph::node::NodePartial::default())
    /// #     }
    /// # }
    ///
    /// let builder = GraphBuilder::new()
    ///     .add_node(NodeKind::Custom("A".into()), MyNode)
    ///     .add_node(NodeKind::Custom("B".into()), MyNode)
    ///     .add_edge(NodeKind::Start, NodeKind::Custom("A".into()))
    ///     .add_edge(NodeKind::Custom("A".into()), NodeKind::Custom("B".into()))
    ///     .add_edge(NodeKind::Custom("B".into()), NodeKind::End);
    ///
    /// let sorted = builder.topological_sort();
    /// assert_eq!(sorted[0], NodeKind::Start);
    /// assert_eq!(sorted[sorted.len() - 1], NodeKind::End);
    /// ```
    #[must_use]
    pub fn topological_sort(&self) -> Vec<crate::types::NodeKind> {
        super::iteration::topological_sort(&self.edges)
    }

    /// Converts the graph to a petgraph `DiGraph`.
    ///
    /// Requires the `petgraph-compat` feature.
    #[cfg(feature = "petgraph-compat")]
    #[cfg_attr(docsrs, doc(cfg(feature = "petgraph-compat")))]
    #[must_use]
    pub fn to_petgraph(&self) -> super::petgraph_compat::PetgraphConversion {
        super::petgraph_compat::to_petgraph(&self.edges)
    }

    /// Exports the graph to DOT format for Graphviz rendering.
    ///
    /// Requires the `petgraph-compat` feature.
    #[cfg(feature = "petgraph-compat")]
    #[cfg_attr(docsrs, doc(cfg(feature = "petgraph-compat")))]
    #[must_use]
    pub fn to_dot(&self) -> String {
        super::petgraph_compat::to_dot(&self.edges)
    }

    /// Checks for cycles using petgraph's algorithm.
    ///
    /// Requires the `petgraph-compat` feature.
    #[cfg(feature = "petgraph-compat")]
    #[cfg_attr(docsrs, doc(cfg(feature = "petgraph-compat")))]
    #[must_use]
    pub fn is_cyclic_petgraph(&self) -> bool {
        super::petgraph_compat::is_cyclic(&self.edges)
    }

    pub(super) fn into_parts(self) -> GraphParts {
        (
            self.nodes,
            self.edges,
            self.conditional_edges,
            self.runtime_config,
            self.reducer_registry,
        )
    }

    pub(super) fn nodes_ref(&self) -> &FxHashMap<NodeKind, Arc<dyn Node>> {
        &self.nodes
    }

    pub(super) fn edges_ref(&self) -> &FxHashMap<NodeKind, Vec<NodeKind>> {
        &self.edges
    }

    pub(super) fn conditional_edges_ref(&self) -> &[ConditionalEdge] {
        &self.conditional_edges
    }
}