Skip to main content

agent_orchestrator/dynamic_orchestration/
graph.rs

1use std::collections::HashMap;
2
3use crate::config::{
4    CONVENTIONS, StepPrehookConfig, StepScope, TaskExecutionStep, TaskRuntimeContext,
5};
6use anyhow::{Result, anyhow};
7use serde::{Deserialize, Serialize};
8
9/// Source used to derive the effective execution graph.
10#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
11#[serde(rename_all = "snake_case")]
12pub enum ExecutionGraphSource {
13    /// Graph built directly from the static workflow definition.
14    #[default]
15    StaticBaseline,
16    /// Graph returned by the adaptive planner.
17    AdaptivePlanner,
18    /// Graph produced by deterministic fallback logic.
19    DeterministicFallback,
20}
21
22/// Node specification stored in the effective execution graph.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(tag = "kind", rename_all = "snake_case")]
25pub enum ExecutionGraphNodeSpec {
26    /// Reference to a statically configured workflow step.
27    StaticStep {
28        /// Workflow step identifier.
29        step_id: String,
30    },
31    /// Runtime-generated step selected by dynamic orchestration.
32    DynamicStep {
33        /// Dynamic step type or capability identifier.
34        step_type: String,
35        /// Pinned agent identifier, when the planner selected one.
36        #[serde(default, skip_serializing_if = "Option::is_none")]
37        agent_id: Option<String>,
38        /// Explicit template override to execute.
39        #[serde(default, skip_serializing_if = "Option::is_none")]
40        template: Option<String>,
41    },
42}
43
44/// One executable node inside the effective execution graph.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ExecutionGraphNode {
47    /// Stable node identifier.
48    pub id: String,
49    /// Execution scope used when scheduling the node.
50    pub scope: StepScope,
51    /// Whether the node should repeat on later cycles.
52    #[serde(default)]
53    pub repeatable: bool,
54    /// Whether the node can terminate the workflow loop.
55    #[serde(default)]
56    pub is_guard: bool,
57    /// Conditional execution hook evaluated before the node runs.
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub prehook: Option<StepPrehookConfig>,
60    /// Static or dynamic node specification.
61    pub spec: ExecutionGraphNodeSpec,
62}
63
64/// Directed edge between two execution graph nodes.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ExecutionGraphEdge {
67    /// Source node identifier.
68    pub from: String,
69    /// Destination node identifier.
70    pub to: String,
71    /// Optional condition that must evaluate truthy for traversal.
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub condition: Option<String>,
74}
75
76/// Fully materialized execution graph used by the runtime scheduler.
77#[derive(Debug, Clone, Serialize, Deserialize, Default)]
78pub struct EffectiveExecutionGraph {
79    /// Planner or fallback source that produced the graph.
80    #[serde(default)]
81    pub source: ExecutionGraphSource,
82    /// Node map keyed by node id.
83    #[serde(default)]
84    pub nodes: HashMap<String, ExecutionGraphNode>,
85    /// Directed edges between nodes.
86    #[serde(default)]
87    pub edges: Vec<ExecutionGraphEdge>,
88    /// Optional entry node selected for execution start.
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub entry: Option<String>,
91}
92
93impl EffectiveExecutionGraph {
94    /// Adds a node and rejects duplicate ids.
95    pub fn add_node(&mut self, node: ExecutionGraphNode) -> Result<()> {
96        if self.nodes.insert(node.id.clone(), node).is_some() {
97            return Err(anyhow!("graph node '{}' already exists", self.nodes.len()));
98        }
99        Ok(())
100    }
101
102    /// Adds an edge after verifying that both endpoint nodes exist.
103    pub fn add_edge(&mut self, edge: ExecutionGraphEdge) -> Result<()> {
104        if !self.nodes.contains_key(&edge.from) {
105            return Err(anyhow!("graph edge source '{}' does not exist", edge.from));
106        }
107        if !self.nodes.contains_key(&edge.to) {
108            return Err(anyhow!("graph edge target '{}' does not exist", edge.to));
109        }
110        self.edges.push(edge);
111        Ok(())
112    }
113
114    /// Returns one node by id.
115    pub fn get_node(&self, node_id: &str) -> Option<&ExecutionGraphNode> {
116        self.nodes.get(node_id)
117    }
118
119    /// Iterates over outgoing edges for one node.
120    pub fn outgoing_edges(&self, node_id: &str) -> impl Iterator<Item = &ExecutionGraphEdge> + '_ {
121        let node_id = node_id.to_owned();
122        self.edges.iter().filter(move |edge| edge.from == node_id)
123    }
124
125    /// Counts incoming edges for one node.
126    pub fn incoming_count(&self, node_id: &str) -> usize {
127        self.edges.iter().filter(|edge| edge.to == node_id).count()
128    }
129
130    /// Validates entry-point references and rejects cyclic graphs.
131    pub fn validate(&self) -> Result<()> {
132        if self.nodes.is_empty() {
133            return Err(anyhow!("effective execution graph has no nodes"));
134        }
135        if let Some(entry) = self.entry.as_deref() {
136            if !self.nodes.contains_key(entry) {
137                return Err(anyhow!(
138                    "effective execution graph entry '{}' is missing",
139                    entry
140                ));
141            }
142        }
143        let mut in_degree: HashMap<&str, usize> = self
144            .nodes
145            .keys()
146            .map(|node_id| (node_id.as_str(), 0usize))
147            .collect();
148        for edge in &self.edges {
149            let Some(degree) = in_degree.get_mut(edge.to.as_str()) else {
150                return Err(anyhow!("graph edge target '{}' is missing", edge.to));
151            };
152            *degree += 1;
153        }
154        let mut ready: Vec<&str> = in_degree
155            .iter()
156            .filter_map(|(node_id, degree)| (*degree == 0).then_some(*node_id))
157            .collect();
158        let mut visited = 0usize;
159        while let Some(node_id) = ready.pop() {
160            visited += 1;
161            for edge in self.outgoing_edges(node_id) {
162                let degree = in_degree
163                    .get_mut(edge.to.as_str())
164                    .ok_or_else(|| anyhow!("graph edge target '{}' is missing", edge.to))?;
165                *degree -= 1;
166                if *degree == 0 {
167                    ready.push(edge.to.as_str());
168                }
169            }
170        }
171        if visited != self.nodes.len() {
172            return Err(anyhow!("effective execution graph contains a cycle"));
173        }
174        Ok(())
175    }
176}
177
178fn static_step_node(step: &TaskExecutionStep) -> Option<ExecutionGraphNode> {
179    if !step.enabled || step.is_guard || step.id == "init_once" {
180        return None;
181    }
182    Some(ExecutionGraphNode {
183        id: step.id.clone(),
184        scope: step.resolved_scope(),
185        repeatable: step.repeatable,
186        is_guard: step.is_guard,
187        prehook: step.prehook.clone(),
188        spec: ExecutionGraphNodeSpec::StaticStep {
189            step_id: step.id.clone(),
190        },
191    })
192}
193
194/// Builds the baseline execution graph from statically configured workflow steps.
195pub fn build_static_execution_graph(
196    task_ctx: &TaskRuntimeContext,
197) -> Result<EffectiveExecutionGraph> {
198    let mut graph = EffectiveExecutionGraph {
199        source: ExecutionGraphSource::StaticBaseline,
200        ..EffectiveExecutionGraph::default()
201    };
202    let mut previous: Option<String> = None;
203    for step in &task_ctx.execution_plan.steps {
204        let Some(node) = static_step_node(step) else {
205            continue;
206        };
207        let node_id = node.id.clone();
208        graph.add_node(node)?;
209        if graph.entry.is_none() {
210            graph.entry = Some(node_id.clone());
211        }
212        if let Some(prev) = previous.as_ref() {
213            graph.add_edge(ExecutionGraphEdge {
214                from: prev.clone(),
215                to: node_id.clone(),
216                condition: None,
217            })?;
218        }
219        previous = Some(node_id);
220    }
221    graph.validate()?;
222    Ok(graph)
223}
224
225/// Builds an effective execution graph from a dynamic execution plan.
226pub fn build_adaptive_execution_graph(
227    plan: &super::DynamicExecutionPlan,
228    source: ExecutionGraphSource,
229) -> Result<EffectiveExecutionGraph> {
230    let mut graph = EffectiveExecutionGraph {
231        source,
232        entry: plan.entry.clone(),
233        ..EffectiveExecutionGraph::default()
234    };
235    for node in plan.nodes.values() {
236        graph.add_node(ExecutionGraphNode {
237            id: node.id.clone(),
238            scope: CONVENTIONS.default_scope(&node.step_type),
239            repeatable: node.repeatable,
240            is_guard: node.is_guard,
241            prehook: node.prehook.as_ref().map(|prehook| StepPrehookConfig {
242                engine: crate::config::StepHookEngine::Cel,
243                when: prehook.when.clone(),
244                reason: prehook.reason.clone(),
245                ui: None,
246                extended: prehook.extended,
247            }),
248            spec: ExecutionGraphNodeSpec::DynamicStep {
249                step_type: node.step_type.clone(),
250                agent_id: node.agent_id.clone(),
251                template: node.template.clone(),
252            },
253        })?;
254    }
255    for edge in &plan.edges {
256        graph.add_edge(ExecutionGraphEdge {
257            from: edge.from.clone(),
258            to: edge.to.clone(),
259            condition: edge.condition.clone(),
260        })?;
261    }
262    graph.validate()?;
263    Ok(graph)
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn build_static_execution_graph_skips_init_and_guard() {
272        let task_ctx = TaskRuntimeContext {
273            workspace_id: "ws".to_string(),
274            workspace_root: "/tmp".into(),
275            ticket_dir: "tickets".to_string(),
276            execution_plan: std::sync::Arc::new(crate::config::TaskExecutionPlan {
277                steps: vec![
278                    crate::config::TaskExecutionStep {
279                        id: "init_once".to_string(),
280                        required_capability: None,
281                        template: None,
282                        execution_profile: None,
283                        builtin: Some("init_once".to_string()),
284                        enabled: true,
285                        repeatable: false,
286                        is_guard: false,
287                        cost_preference: None,
288                        prehook: None,
289                        tty: false,
290                        outputs: vec![],
291                        pipe_to: None,
292                        command: None,
293                        chain_steps: vec![],
294                        scope: Some(StepScope::Task),
295                        behavior: Default::default(),
296                        max_parallel: None,
297                        stagger_delay_ms: None,
298                        timeout_secs: None,
299                        stall_timeout_secs: None,
300                        item_select_config: None,
301                        store_inputs: vec![],
302                        store_outputs: vec![],
303                        step_vars: None,
304                    },
305                    crate::config::TaskExecutionStep {
306                        id: "plan".to_string(),
307                        required_capability: Some("plan".to_string()),
308                        template: None,
309                        execution_profile: None,
310                        builtin: None,
311                        enabled: true,
312                        repeatable: true,
313                        is_guard: false,
314                        cost_preference: None,
315                        prehook: None,
316                        tty: false,
317                        outputs: vec![],
318                        pipe_to: None,
319                        command: None,
320                        chain_steps: vec![],
321                        scope: Some(StepScope::Task),
322                        behavior: Default::default(),
323                        max_parallel: None,
324                        stagger_delay_ms: None,
325                        timeout_secs: None,
326                        stall_timeout_secs: None,
327                        item_select_config: None,
328                        store_inputs: vec![],
329                        store_outputs: vec![],
330                        step_vars: None,
331                    },
332                    crate::config::TaskExecutionStep {
333                        id: "qa".to_string(),
334                        required_capability: Some("qa".to_string()),
335                        template: None,
336                        execution_profile: None,
337                        builtin: None,
338                        enabled: true,
339                        repeatable: true,
340                        is_guard: false,
341                        cost_preference: None,
342                        prehook: None,
343                        tty: false,
344                        outputs: vec![],
345                        pipe_to: None,
346                        command: None,
347                        chain_steps: vec![],
348                        scope: Some(StepScope::Item),
349                        behavior: Default::default(),
350                        max_parallel: None,
351                        stagger_delay_ms: None,
352                        timeout_secs: None,
353                        stall_timeout_secs: None,
354                        item_select_config: None,
355                        store_inputs: vec![],
356                        store_outputs: vec![],
357                        step_vars: None,
358                    },
359                    crate::config::TaskExecutionStep {
360                        id: "loop_guard".to_string(),
361                        required_capability: None,
362                        template: None,
363                        execution_profile: None,
364                        builtin: Some("loop_guard".to_string()),
365                        enabled: true,
366                        repeatable: true,
367                        is_guard: true,
368                        cost_preference: None,
369                        prehook: None,
370                        tty: false,
371                        outputs: vec![],
372                        pipe_to: None,
373                        command: None,
374                        chain_steps: vec![],
375                        scope: Some(StepScope::Task),
376                        behavior: Default::default(),
377                        max_parallel: None,
378                        stagger_delay_ms: None,
379                        timeout_secs: None,
380                        stall_timeout_secs: None,
381                        item_select_config: None,
382                        store_inputs: vec![],
383                        store_outputs: vec![],
384                        step_vars: None,
385                    },
386                ],
387                loop_policy: Default::default(),
388                finalize: Default::default(),
389                max_parallel: None,
390                stagger_delay_ms: None,
391                item_isolation: None,
392            }),
393            execution: Default::default(),
394            current_cycle: 1,
395            init_done: true,
396            dynamic_steps: std::sync::Arc::new(vec![]),
397            adaptive: std::sync::Arc::new(None),
398            pipeline_vars: Default::default(),
399            safety: std::sync::Arc::new(Default::default()),
400            self_referential: false,
401            consecutive_failures: 0,
402            project_id: "default".to_string(),
403            pinned_invariants: std::sync::Arc::new(vec![]),
404            workflow_id: "wf".to_string(),
405            spawn_depth: 0,
406            item_step_failures: HashMap::new(),
407            item_retry_after: HashMap::new(),
408            restart_completed_steps: std::collections::HashSet::new(),
409            step_filter: None,
410        };
411
412        let graph = build_static_execution_graph(&task_ctx).expect("graph");
413        assert_eq!(graph.entry.as_deref(), Some("plan"));
414        assert_eq!(graph.nodes.len(), 2);
415        assert!(graph.nodes.contains_key("plan"));
416        assert!(graph.nodes.contains_key("qa"));
417        assert_eq!(graph.edges.len(), 1);
418        assert_eq!(graph.edges[0].from, "plan");
419        assert_eq!(graph.edges[0].to, "qa");
420    }
421
422    #[test]
423    fn build_adaptive_execution_graph_preserves_conditions() {
424        let mut plan = super::super::DynamicExecutionPlan {
425            entry: Some("qa".to_string()),
426            ..Default::default()
427        };
428        plan.add_node(super::super::WorkflowNode {
429            id: "qa".to_string(),
430            step_type: "qa".to_string(),
431            agent_id: None,
432            template: None,
433            prehook: None,
434            is_guard: false,
435            repeatable: true,
436        })
437        .expect("add qa node");
438        plan.add_node(super::super::WorkflowNode {
439            id: "fix".to_string(),
440            step_type: "fix".to_string(),
441            agent_id: Some("fixer".to_string()),
442            template: Some("fix {rel_path}".to_string()),
443            prehook: None,
444            is_guard: false,
445            repeatable: true,
446        })
447        .expect("add fix node");
448        plan.add_edge(super::super::WorkflowEdge {
449            from: "qa".to_string(),
450            to: "fix".to_string(),
451            condition: Some("active_ticket_count > 0".to_string()),
452        })
453        .expect("add edge");
454
455        let graph = build_adaptive_execution_graph(&plan, ExecutionGraphSource::AdaptivePlanner)
456            .expect("graph");
457        assert_eq!(graph.source, ExecutionGraphSource::AdaptivePlanner);
458        assert_eq!(graph.entry.as_deref(), Some("qa"));
459        assert_eq!(
460            graph.edges[0].condition.as_deref(),
461            Some("active_ticket_count > 0")
462        );
463    }
464}