enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! CompiledGraph - validated and ready-to-execute graph
//!
//! Supports parallel execution when multiple targets are available.

use super::edge::{ConditionalEdge, Edge, EdgeTarget};
use super::node::{DynNode, NodeState};
use futures::future::join_all;
use std::collections::HashMap;

/// Compiled graph - validated and ready to execute
pub struct CompiledGraph {
    pub(crate) nodes: HashMap<String, DynNode>,
    pub(crate) edges: Vec<Edge>,
    pub(crate) conditional_edges: Vec<ConditionalEdge>,
    pub(crate) entry_point: String,
}

impl CompiledGraph {
    /// Get a node by name
    pub fn get_node(&self, name: &str) -> Option<&DynNode> {
        self.nodes.get(name)
    }

    /// Get the entry point
    pub fn entry_point(&self) -> &str {
        &self.entry_point
    }

    /// Get the next node(s) after the given node
    pub fn get_next(&self, from: &str, output: &str) -> Vec<EdgeTarget> {
        let mut targets = Vec::new();

        // Check conditional edges first
        for ce in &self.conditional_edges {
            if ce.from == from {
                targets.push((ce.router)(output));
            }
        }

        // Then check regular edges
        for edge in &self.edges {
            if edge.from == from {
                targets.push(edge.to.clone());
            }
        }

        targets
    }

    /// Run the graph with an initial input
    pub async fn run(&self, input: impl Into<String>) -> anyhow::Result<NodeState> {
        let initial_state = NodeState::from_string(&input.into());
        self.run_with_state(initial_state).await
    }

    /// Run the graph with an initial state
    ///
    /// When multiple targets are available, executes them in parallel.
    /// This implements the Agentic DAG execution model where independent
    /// nodes can run concurrently.
    pub async fn run_with_state(&self, initial_state: NodeState) -> anyhow::Result<NodeState> {
        let mut current_node = self.entry_point.clone();
        let mut state = initial_state;

        loop {
            // Get the current node
            let node = self
                .nodes
                .get(&current_node)
                .ok_or_else(|| anyhow::anyhow!("Node '{}' not found", current_node))?;

            // Execute the node
            tracing::debug!(node = %current_node, "Executing node");
            state = node.execute(state).await?;

            // Get the output for routing
            let output = state.as_str().unwrap_or_default().to_string();

            // Find next node(s)
            let next_targets = self.get_next(&current_node, &output);

            if next_targets.is_empty() {
                // No outgoing edges - end execution
                tracing::debug!(node = %current_node, "No outgoing edges, ending");
                break;
            }

            // Check for END target
            let has_end = next_targets.iter().any(|t| matches!(t, EdgeTarget::End));
            if has_end {
                tracing::debug!("Reached END");
                break;
            }

            // Collect node targets (filter out End)
            let node_targets: Vec<String> = next_targets
                .iter()
                .filter_map(|t| match t {
                    EdgeTarget::Node(n) => Some(n.clone()),
                    EdgeTarget::End => None,
                })
                .collect();

            if node_targets.is_empty() {
                break;
            }

            // Single target - sequential execution
            if node_targets.len() == 1 {
                current_node = node_targets[0].clone();
                continue;
            }

            // Multiple targets - PARALLEL EXECUTION
            tracing::debug!(
                targets = ?node_targets,
                "Executing {} nodes in parallel",
                node_targets.len()
            );

            // Execute all target nodes in parallel
            let parallel_results = self
                .execute_nodes_parallel(&node_targets, state.clone())
                .await?;

            // Aggregate results: combine all outputs
            // For now, we use the last successful result as the state
            // In a full implementation, this would support custom aggregation strategies
            if let Some(last_state) = parallel_results.into_iter().last() {
                state = last_state;
            }

            // After parallel execution, check if any nodes have outgoing edges
            // For simplicity, we end after parallel execution
            // A full implementation would continue with fan-in logic
            tracing::debug!("Parallel execution complete");
            break;
        }

        Ok(state)
    }

    /// Execute multiple nodes in parallel
    ///
    /// Returns results from all nodes that completed successfully.
    async fn execute_nodes_parallel(
        &self,
        node_names: &[String],
        input_state: NodeState,
    ) -> anyhow::Result<Vec<NodeState>> {
        let futures: Vec<_> = node_names
            .iter()
            .filter_map(|name| {
                self.nodes.get(name).map(|node| {
                    let state = input_state.clone();
                    let node_name = name.clone();
                    async move {
                        tracing::debug!(node = %node_name, "Executing parallel node");
                        node.execute(state).await
                    }
                })
            })
            .collect();

        let results = join_all(futures).await;

        // Collect successful results
        let successful: Vec<NodeState> = results.into_iter().filter_map(|r| r.ok()).collect();

        if successful.is_empty() {
            anyhow::bail!("All parallel nodes failed");
        }

        Ok(successful)
    }

    /// Get node count
    pub fn node_count(&self) -> usize {
        self.nodes.len()
    }

    /// Get edge count
    pub fn edge_count(&self) -> usize {
        self.edges.len() + self.conditional_edges.len()
    }
}