1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
//! CompiledGraph - validated and ready-to-execute graph
//!
//! Supports parallel execution when multiple targets are available.
use super::edge::{ConditionalEdge, Edge, EdgeTarget};
use super::node::{DynNode, NodeState};
use futures::future::join_all;
use std::collections::HashMap;
/// Compiled graph - validated and ready to execute
pub struct CompiledGraph {
pub(crate) nodes: HashMap<String, DynNode>,
pub(crate) edges: Vec<Edge>,
pub(crate) conditional_edges: Vec<ConditionalEdge>,
pub(crate) entry_point: String,
}
impl CompiledGraph {
/// Get a node by name
pub fn get_node(&self, name: &str) -> Option<&DynNode> {
self.nodes.get(name)
}
/// Get the entry point
pub fn entry_point(&self) -> &str {
&self.entry_point
}
/// Get the next node(s) after the given node
pub fn get_next(&self, from: &str, output: &str) -> Vec<EdgeTarget> {
let mut targets = Vec::new();
// Check conditional edges first
for ce in &self.conditional_edges {
if ce.from == from {
targets.push((ce.router)(output));
}
}
// Then check regular edges
for edge in &self.edges {
if edge.from == from {
targets.push(edge.to.clone());
}
}
targets
}
/// Run the graph with an initial input
pub async fn run(&self, input: impl Into<String>) -> anyhow::Result<NodeState> {
let initial_state = NodeState::from_string(&input.into());
self.run_with_state(initial_state).await
}
/// Run the graph with an initial state
///
/// When multiple targets are available, executes them in parallel.
/// This implements the Agentic DAG execution model where independent
/// nodes can run concurrently.
pub async fn run_with_state(&self, initial_state: NodeState) -> anyhow::Result<NodeState> {
let mut current_node = self.entry_point.clone();
let mut state = initial_state;
loop {
// Get the current node
let node = self
.nodes
.get(¤t_node)
.ok_or_else(|| anyhow::anyhow!("Node '{}' not found", current_node))?;
// Execute the node
tracing::debug!(node = %current_node, "Executing node");
state = node.execute(state).await?;
// Get the output for routing
let output = state.as_str().unwrap_or_default().to_string();
// Find next node(s)
let next_targets = self.get_next(¤t_node, &output);
if next_targets.is_empty() {
// No outgoing edges - end execution
tracing::debug!(node = %current_node, "No outgoing edges, ending");
break;
}
// Check for END target
let has_end = next_targets.iter().any(|t| matches!(t, EdgeTarget::End));
if has_end {
tracing::debug!("Reached END");
break;
}
// Collect node targets (filter out End)
let node_targets: Vec<String> = next_targets
.iter()
.filter_map(|t| match t {
EdgeTarget::Node(n) => Some(n.clone()),
EdgeTarget::End => None,
})
.collect();
if node_targets.is_empty() {
break;
}
// Single target - sequential execution
if node_targets.len() == 1 {
current_node = node_targets[0].clone();
continue;
}
// Multiple targets - PARALLEL EXECUTION
tracing::debug!(
targets = ?node_targets,
"Executing {} nodes in parallel",
node_targets.len()
);
// Execute all target nodes in parallel
let parallel_results = self
.execute_nodes_parallel(&node_targets, state.clone())
.await?;
// Aggregate results: combine all outputs
// For now, we use the last successful result as the state
// In a full implementation, this would support custom aggregation strategies
if let Some(last_state) = parallel_results.into_iter().last() {
state = last_state;
}
// After parallel execution, check if any nodes have outgoing edges
// For simplicity, we end after parallel execution
// A full implementation would continue with fan-in logic
tracing::debug!("Parallel execution complete");
break;
}
Ok(state)
}
/// Execute multiple nodes in parallel
///
/// Returns results from all nodes that completed successfully.
async fn execute_nodes_parallel(
&self,
node_names: &[String],
input_state: NodeState,
) -> anyhow::Result<Vec<NodeState>> {
let futures: Vec<_> = node_names
.iter()
.filter_map(|name| {
self.nodes.get(name).map(|node| {
let state = input_state.clone();
let node_name = name.clone();
async move {
tracing::debug!(node = %node_name, "Executing parallel node");
node.execute(state).await
}
})
})
.collect();
let results = join_all(futures).await;
// Collect successful results
let successful: Vec<NodeState> = results.into_iter().filter_map(|r| r.ok()).collect();
if successful.is_empty() {
anyhow::bail!("All parallel nodes failed");
}
Ok(successful)
}
/// Get node count
pub fn node_count(&self) -> usize {
self.nodes.len()
}
/// Get edge count
pub fn edge_count(&self) -> usize {
self.edges.len() + self.conditional_edges.len()
}
}