1use std::collections::HashMap;
2
3use crate::config::{
4 CONVENTIONS, StepPrehookConfig, StepScope, TaskExecutionStep, TaskRuntimeContext,
5};
6use anyhow::{Result, anyhow};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
11#[serde(rename_all = "snake_case")]
12pub enum ExecutionGraphSource {
13 #[default]
15 StaticBaseline,
16 AdaptivePlanner,
18 DeterministicFallback,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(tag = "kind", rename_all = "snake_case")]
25pub enum ExecutionGraphNodeSpec {
26 StaticStep {
28 step_id: String,
30 },
31 DynamicStep {
33 step_type: String,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
37 agent_id: Option<String>,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
40 template: Option<String>,
41 },
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ExecutionGraphNode {
47 pub id: String,
49 pub scope: StepScope,
51 #[serde(default)]
53 pub repeatable: bool,
54 #[serde(default)]
56 pub is_guard: bool,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub prehook: Option<StepPrehookConfig>,
60 pub spec: ExecutionGraphNodeSpec,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ExecutionGraphEdge {
67 pub from: String,
69 pub to: String,
71 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub condition: Option<String>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize, Default)]
78pub struct EffectiveExecutionGraph {
79 #[serde(default)]
81 pub source: ExecutionGraphSource,
82 #[serde(default)]
84 pub nodes: HashMap<String, ExecutionGraphNode>,
85 #[serde(default)]
87 pub edges: Vec<ExecutionGraphEdge>,
88 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub entry: Option<String>,
91}
92
93impl EffectiveExecutionGraph {
94 pub fn add_node(&mut self, node: ExecutionGraphNode) -> Result<()> {
96 if self.nodes.insert(node.id.clone(), node).is_some() {
97 return Err(anyhow!("graph node '{}' already exists", self.nodes.len()));
98 }
99 Ok(())
100 }
101
102 pub fn add_edge(&mut self, edge: ExecutionGraphEdge) -> Result<()> {
104 if !self.nodes.contains_key(&edge.from) {
105 return Err(anyhow!("graph edge source '{}' does not exist", edge.from));
106 }
107 if !self.nodes.contains_key(&edge.to) {
108 return Err(anyhow!("graph edge target '{}' does not exist", edge.to));
109 }
110 self.edges.push(edge);
111 Ok(())
112 }
113
114 pub fn get_node(&self, node_id: &str) -> Option<&ExecutionGraphNode> {
116 self.nodes.get(node_id)
117 }
118
119 pub fn outgoing_edges(&self, node_id: &str) -> impl Iterator<Item = &ExecutionGraphEdge> + '_ {
121 let node_id = node_id.to_owned();
122 self.edges.iter().filter(move |edge| edge.from == node_id)
123 }
124
125 pub fn incoming_count(&self, node_id: &str) -> usize {
127 self.edges.iter().filter(|edge| edge.to == node_id).count()
128 }
129
130 pub fn validate(&self) -> Result<()> {
132 if self.nodes.is_empty() {
133 return Err(anyhow!("effective execution graph has no nodes"));
134 }
135 if let Some(entry) = self.entry.as_deref() {
136 if !self.nodes.contains_key(entry) {
137 return Err(anyhow!(
138 "effective execution graph entry '{}' is missing",
139 entry
140 ));
141 }
142 }
143 let mut in_degree: HashMap<&str, usize> = self
144 .nodes
145 .keys()
146 .map(|node_id| (node_id.as_str(), 0usize))
147 .collect();
148 for edge in &self.edges {
149 let Some(degree) = in_degree.get_mut(edge.to.as_str()) else {
150 return Err(anyhow!("graph edge target '{}' is missing", edge.to));
151 };
152 *degree += 1;
153 }
154 let mut ready: Vec<&str> = in_degree
155 .iter()
156 .filter_map(|(node_id, degree)| (*degree == 0).then_some(*node_id))
157 .collect();
158 let mut visited = 0usize;
159 while let Some(node_id) = ready.pop() {
160 visited += 1;
161 for edge in self.outgoing_edges(node_id) {
162 let degree = in_degree
163 .get_mut(edge.to.as_str())
164 .ok_or_else(|| anyhow!("graph edge target '{}' is missing", edge.to))?;
165 *degree -= 1;
166 if *degree == 0 {
167 ready.push(edge.to.as_str());
168 }
169 }
170 }
171 if visited != self.nodes.len() {
172 return Err(anyhow!("effective execution graph contains a cycle"));
173 }
174 Ok(())
175 }
176}
177
178fn static_step_node(step: &TaskExecutionStep) -> Option<ExecutionGraphNode> {
179 if !step.enabled || step.is_guard || step.id == "init_once" {
180 return None;
181 }
182 Some(ExecutionGraphNode {
183 id: step.id.clone(),
184 scope: step.resolved_scope(),
185 repeatable: step.repeatable,
186 is_guard: step.is_guard,
187 prehook: step.prehook.clone(),
188 spec: ExecutionGraphNodeSpec::StaticStep {
189 step_id: step.id.clone(),
190 },
191 })
192}
193
194pub fn build_static_execution_graph(
196 task_ctx: &TaskRuntimeContext,
197) -> Result<EffectiveExecutionGraph> {
198 let mut graph = EffectiveExecutionGraph {
199 source: ExecutionGraphSource::StaticBaseline,
200 ..EffectiveExecutionGraph::default()
201 };
202 let mut previous: Option<String> = None;
203 for step in &task_ctx.execution_plan.steps {
204 let Some(node) = static_step_node(step) else {
205 continue;
206 };
207 let node_id = node.id.clone();
208 graph.add_node(node)?;
209 if graph.entry.is_none() {
210 graph.entry = Some(node_id.clone());
211 }
212 if let Some(prev) = previous.as_ref() {
213 graph.add_edge(ExecutionGraphEdge {
214 from: prev.clone(),
215 to: node_id.clone(),
216 condition: None,
217 })?;
218 }
219 previous = Some(node_id);
220 }
221 graph.validate()?;
222 Ok(graph)
223}
224
225pub fn build_adaptive_execution_graph(
227 plan: &super::DynamicExecutionPlan,
228 source: ExecutionGraphSource,
229) -> Result<EffectiveExecutionGraph> {
230 let mut graph = EffectiveExecutionGraph {
231 source,
232 entry: plan.entry.clone(),
233 ..EffectiveExecutionGraph::default()
234 };
235 for node in plan.nodes.values() {
236 graph.add_node(ExecutionGraphNode {
237 id: node.id.clone(),
238 scope: CONVENTIONS.default_scope(&node.step_type),
239 repeatable: node.repeatable,
240 is_guard: node.is_guard,
241 prehook: node.prehook.as_ref().map(|prehook| StepPrehookConfig {
242 engine: crate::config::StepHookEngine::Cel,
243 when: prehook.when.clone(),
244 reason: prehook.reason.clone(),
245 ui: None,
246 extended: prehook.extended,
247 }),
248 spec: ExecutionGraphNodeSpec::DynamicStep {
249 step_type: node.step_type.clone(),
250 agent_id: node.agent_id.clone(),
251 template: node.template.clone(),
252 },
253 })?;
254 }
255 for edge in &plan.edges {
256 graph.add_edge(ExecutionGraphEdge {
257 from: edge.from.clone(),
258 to: edge.to.clone(),
259 condition: edge.condition.clone(),
260 })?;
261 }
262 graph.validate()?;
263 Ok(graph)
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn build_static_execution_graph_skips_init_and_guard() {
272 let task_ctx = TaskRuntimeContext {
273 workspace_id: "ws".to_string(),
274 workspace_root: "/tmp".into(),
275 ticket_dir: "tickets".to_string(),
276 execution_plan: std::sync::Arc::new(crate::config::TaskExecutionPlan {
277 steps: vec![
278 crate::config::TaskExecutionStep {
279 id: "init_once".to_string(),
280 required_capability: None,
281 template: None,
282 execution_profile: None,
283 builtin: Some("init_once".to_string()),
284 enabled: true,
285 repeatable: false,
286 is_guard: false,
287 cost_preference: None,
288 prehook: None,
289 tty: false,
290 outputs: vec![],
291 pipe_to: None,
292 command: None,
293 chain_steps: vec![],
294 scope: Some(StepScope::Task),
295 behavior: Default::default(),
296 max_parallel: None,
297 stagger_delay_ms: None,
298 timeout_secs: None,
299 stall_timeout_secs: None,
300 item_select_config: None,
301 store_inputs: vec![],
302 store_outputs: vec![],
303 step_vars: None,
304 },
305 crate::config::TaskExecutionStep {
306 id: "plan".to_string(),
307 required_capability: Some("plan".to_string()),
308 template: None,
309 execution_profile: None,
310 builtin: None,
311 enabled: true,
312 repeatable: true,
313 is_guard: false,
314 cost_preference: None,
315 prehook: None,
316 tty: false,
317 outputs: vec![],
318 pipe_to: None,
319 command: None,
320 chain_steps: vec![],
321 scope: Some(StepScope::Task),
322 behavior: Default::default(),
323 max_parallel: None,
324 stagger_delay_ms: None,
325 timeout_secs: None,
326 stall_timeout_secs: None,
327 item_select_config: None,
328 store_inputs: vec![],
329 store_outputs: vec![],
330 step_vars: None,
331 },
332 crate::config::TaskExecutionStep {
333 id: "qa".to_string(),
334 required_capability: Some("qa".to_string()),
335 template: None,
336 execution_profile: None,
337 builtin: None,
338 enabled: true,
339 repeatable: true,
340 is_guard: false,
341 cost_preference: None,
342 prehook: None,
343 tty: false,
344 outputs: vec![],
345 pipe_to: None,
346 command: None,
347 chain_steps: vec![],
348 scope: Some(StepScope::Item),
349 behavior: Default::default(),
350 max_parallel: None,
351 stagger_delay_ms: None,
352 timeout_secs: None,
353 stall_timeout_secs: None,
354 item_select_config: None,
355 store_inputs: vec![],
356 store_outputs: vec![],
357 step_vars: None,
358 },
359 crate::config::TaskExecutionStep {
360 id: "loop_guard".to_string(),
361 required_capability: None,
362 template: None,
363 execution_profile: None,
364 builtin: Some("loop_guard".to_string()),
365 enabled: true,
366 repeatable: true,
367 is_guard: true,
368 cost_preference: None,
369 prehook: None,
370 tty: false,
371 outputs: vec![],
372 pipe_to: None,
373 command: None,
374 chain_steps: vec![],
375 scope: Some(StepScope::Task),
376 behavior: Default::default(),
377 max_parallel: None,
378 stagger_delay_ms: None,
379 timeout_secs: None,
380 stall_timeout_secs: None,
381 item_select_config: None,
382 store_inputs: vec![],
383 store_outputs: vec![],
384 step_vars: None,
385 },
386 ],
387 loop_policy: Default::default(),
388 finalize: Default::default(),
389 max_parallel: None,
390 stagger_delay_ms: None,
391 item_isolation: None,
392 }),
393 execution: Default::default(),
394 current_cycle: 1,
395 init_done: true,
396 dynamic_steps: std::sync::Arc::new(vec![]),
397 adaptive: std::sync::Arc::new(None),
398 pipeline_vars: Default::default(),
399 safety: std::sync::Arc::new(Default::default()),
400 self_referential: false,
401 consecutive_failures: 0,
402 project_id: "default".to_string(),
403 pinned_invariants: std::sync::Arc::new(vec![]),
404 workflow_id: "wf".to_string(),
405 spawn_depth: 0,
406 item_step_failures: HashMap::new(),
407 item_retry_after: HashMap::new(),
408 restart_completed_steps: std::collections::HashSet::new(),
409 step_filter: None,
410 };
411
412 let graph = build_static_execution_graph(&task_ctx).expect("graph");
413 assert_eq!(graph.entry.as_deref(), Some("plan"));
414 assert_eq!(graph.nodes.len(), 2);
415 assert!(graph.nodes.contains_key("plan"));
416 assert!(graph.nodes.contains_key("qa"));
417 assert_eq!(graph.edges.len(), 1);
418 assert_eq!(graph.edges[0].from, "plan");
419 assert_eq!(graph.edges[0].to, "qa");
420 }
421
422 #[test]
423 fn build_adaptive_execution_graph_preserves_conditions() {
424 let mut plan = super::super::DynamicExecutionPlan {
425 entry: Some("qa".to_string()),
426 ..Default::default()
427 };
428 plan.add_node(super::super::WorkflowNode {
429 id: "qa".to_string(),
430 step_type: "qa".to_string(),
431 agent_id: None,
432 template: None,
433 prehook: None,
434 is_guard: false,
435 repeatable: true,
436 })
437 .expect("add qa node");
438 plan.add_node(super::super::WorkflowNode {
439 id: "fix".to_string(),
440 step_type: "fix".to_string(),
441 agent_id: Some("fixer".to_string()),
442 template: Some("fix {rel_path}".to_string()),
443 prehook: None,
444 is_guard: false,
445 repeatable: true,
446 })
447 .expect("add fix node");
448 plan.add_edge(super::super::WorkflowEdge {
449 from: "qa".to_string(),
450 to: "fix".to_string(),
451 condition: Some("active_ticket_count > 0".to_string()),
452 })
453 .expect("add edge");
454
455 let graph = build_adaptive_execution_graph(&plan, ExecutionGraphSource::AdaptivePlanner)
456 .expect("graph");
457 assert_eq!(graph.source, ExecutionGraphSource::AdaptivePlanner);
458 assert_eq!(graph.entry.as_deref(), Some("qa"));
459 assert_eq!(
460 graph.edges[0].condition.as_deref(),
461 Some("active_ticket_count > 0")
462 );
463 }
464}