Skip to main content

agent_orchestrator/dynamic_orchestration/
dag.rs

1use std::collections::{HashMap, HashSet};
2
3use crate::config::StepPrehookContext;
4use anyhow::{Result, anyhow};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use super::step_pool::evaluate_trigger_condition;
9
10/// A node in the workflow DAG
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct WorkflowNode {
13    /// Unique identifier for this node
14    pub id: String,
15    /// The step type
16    pub step_type: String,
17    /// Agent ID to use
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub agent_id: Option<String>,
20    /// Template to execute
21    #[serde(default, skip_serializing_if = "Option::is_none")]
22    pub template: Option<String>,
23    /// Prehook configuration
24    #[serde(default, skip_serializing_if = "Option::is_none")]
25    pub prehook: Option<PrehookConfig>,
26    /// Whether this is a guard node
27    #[serde(default)]
28    pub is_guard: bool,
29    /// Whether this node is repeatable
30    #[serde(default = "default_true")]
31    pub repeatable: bool,
32}
33
34fn default_true() -> bool {
35    true
36}
37
38/// A directed edge in the workflow DAG
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WorkflowEdge {
41    /// Source node ID
42    pub from: String,
43    /// Target node ID
44    pub to: String,
45    /// CEL condition for this edge (None = unconditional)
46    #[serde(default, skip_serializing_if = "Option::is_none")]
47    pub condition: Option<String>,
48}
49
50/// Prehook configuration for dynamic orchestration
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct PrehookConfig {
53    /// Engine to use (cel, etc.)
54    #[serde(default)]
55    pub engine: String,
56    /// Condition expression
57    pub when: String,
58    /// Reason (for logging)
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub reason: Option<String>,
61    /// Whether to return extended decision (Phase 1+)
62    #[serde(default)]
63    pub extended: bool,
64}
65
66impl Default for PrehookConfig {
67    fn default() -> Self {
68        Self {
69            engine: "cel".to_string(),
70            when: "true".to_string(),
71            reason: None,
72            extended: false,
73        }
74    }
75}
76
77/// Dynamic execution plan with DAG structure
78#[derive(Debug, Clone, Serialize, Deserialize, Default)]
79pub struct DynamicExecutionPlan {
80    /// All nodes in the DAG
81    #[serde(default)]
82    pub nodes: HashMap<String, WorkflowNode>,
83    /// All edges in the DAG
84    #[serde(default)]
85    pub edges: Vec<WorkflowEdge>,
86    /// Entry point node ID
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub entry: Option<String>,
89}
90
91impl DynamicExecutionPlan {
92    /// Create a new empty execution plan
93    pub fn new() -> Self {
94        Self::default()
95    }
96
97    /// Add a node to the plan
98    pub fn add_node(&mut self, node: WorkflowNode) -> Result<()> {
99        if self.nodes.contains_key(&node.id) {
100            return Err(anyhow!("Node {} already exists", node.id));
101        }
102        self.nodes.insert(node.id.clone(), node);
103        Ok(())
104    }
105
106    /// Add an edge to the plan
107    pub fn add_edge(&mut self, edge: WorkflowEdge) -> Result<()> {
108        if !self.nodes.contains_key(&edge.from) {
109            return Err(anyhow!("Source node {} does not exist", edge.from));
110        }
111        if !self.nodes.contains_key(&edge.to) {
112            return Err(anyhow!("Target node {} does not exist", edge.to));
113        }
114        self.edges.push(edge);
115        Ok(())
116    }
117
118    /// Get nodes that have no incoming edges (starting points)
119    pub fn get_entry_nodes(&self) -> Vec<&WorkflowNode> {
120        let has_incoming: HashSet<&str> = self.edges.iter().map(|e| e.to.as_str()).collect();
121
122        self.nodes
123            .values()
124            .filter(|n| !has_incoming.contains(n.id.as_str()))
125            .collect()
126    }
127
128    /// Get nodes that have no outgoing edges (endpoints)
129    pub fn get_exit_nodes(&self) -> Vec<&WorkflowNode> {
130        let has_outgoing: HashSet<&str> = self.edges.iter().map(|e| e.from.as_str()).collect();
131
132        self.nodes
133            .values()
134            .filter(|n| !has_outgoing.contains(n.id.as_str()))
135            .collect()
136    }
137
138    /// Get outgoing edges from a node
139    pub fn get_outgoing_edges(&self, node_id: &str) -> Vec<&WorkflowEdge> {
140        self.edges.iter().filter(|e| e.from == node_id).collect()
141    }
142
143    /// Get incoming edges to a node
144    pub fn get_incoming_edges(&self, node_id: &str) -> Vec<&WorkflowEdge> {
145        self.edges.iter().filter(|e| e.to == node_id).collect()
146    }
147
148    /// Check for cycles in the DAG
149    pub fn has_cycles(&self) -> bool {
150        let mut visited: HashSet<String> = HashSet::new();
151        let mut rec_stack: HashSet<String> = HashSet::new();
152
153        fn dfs(
154            node: String,
155            plan: &DynamicExecutionPlan,
156            visited: &mut HashSet<String>,
157            rec_stack: &mut HashSet<String>,
158        ) -> bool {
159            visited.insert(node.clone());
160            rec_stack.insert(node.clone());
161
162            for edge in plan.get_outgoing_edges(&node) {
163                let target = edge.to.clone();
164                if !visited.contains(&target) {
165                    if dfs(target, plan, visited, rec_stack) {
166                        return true;
167                    }
168                } else if rec_stack.contains(&target) {
169                    return true;
170                }
171            }
172
173            rec_stack.remove(&node);
174            false
175        }
176
177        for node_id in self.nodes.keys() {
178            if !visited.contains(node_id)
179                && dfs(node_id.clone(), self, &mut visited, &mut rec_stack)
180            {
181                return true;
182            }
183        }
184
185        false
186    }
187
188    /// Topological sort (returns nodes in execution order)
189    /// Returns Err if there are cycles
190    pub fn topological_sort(&self) -> Result<Vec<String>> {
191        if self.has_cycles() {
192            return Err(anyhow!("Cannot topological sort: graph has cycles"));
193        }
194
195        let mut in_degree: HashMap<&str, usize> =
196            self.nodes.keys().map(|k| (k.as_str(), 0)).collect();
197
198        for edge in &self.edges {
199            let degree = in_degree.get_mut(edge.to.as_str()).ok_or_else(|| {
200                anyhow!("Topological sort failed: missing target node {}", edge.to)
201            })?;
202            *degree += 1;
203        }
204
205        let mut queue: Vec<&str> = in_degree
206            .iter()
207            .filter(|(_, d)| **d == 0)
208            .map(|(k, _)| *k)
209            .collect();
210
211        let mut result: Vec<String> = Vec::new();
212
213        while let Some(node) = queue.pop() {
214            result.push(node.to_string());
215
216            for edge in self.get_outgoing_edges(node) {
217                let degree = in_degree.get_mut(edge.to.as_str()).ok_or_else(|| {
218                    anyhow!("Topological sort failed: missing target node {}", edge.to)
219                })?;
220                *degree -= 1;
221                if *degree == 0 {
222                    queue.push(&edge.to);
223                }
224            }
225        }
226
227        if result.len() != self.nodes.len() {
228            return Err(anyhow!("Topological sort failed: graph has cycles"));
229        }
230
231        Ok(result)
232    }
233
234    /// Returns the next nodes whose edge conditions pass for the current context.
235    pub fn find_next_nodes(
236        &self,
237        current_node_id: &str,
238        context: &StepPrehookContext,
239    ) -> Vec<String> {
240        let mut next_nodes = Vec::new();
241
242        for edge in self.get_outgoing_edges(current_node_id) {
243            if let Some(ref condition) = edge.condition {
244                if evaluate_trigger_condition(condition, context).unwrap_or(false) {
245                    next_nodes.push(edge.to.clone());
246                }
247            } else {
248                next_nodes.push(edge.to.clone());
249            }
250        }
251
252        next_nodes
253    }
254
255    /// Returns one node by identifier.
256    pub fn get_node(&self, node_id: &str) -> Option<&WorkflowNode> {
257        self.nodes.get(node_id)
258    }
259
260    /// Returns `true` when every exit node is marked completed in the execution state.
261    pub fn is_completed(&self, state: &DagExecutionState) -> bool {
262        let exit_nodes = self.get_exit_nodes();
263        for node in exit_nodes {
264            if state.completed_nodes.contains(&node.id) {
265                return true;
266            }
267        }
268        false
269    }
270}
271
272/// Execution state for the DAG engine
273#[derive(Debug, Clone, Serialize, Deserialize, Default)]
274pub struct DagExecutionState {
275    /// Current node being executed
276    pub current_node: Option<String>,
277    /// Completed nodes
278    #[serde(default)]
279    pub completed_nodes: HashSet<String>,
280    /// Skipped nodes
281    #[serde(default)]
282    pub skipped_nodes: HashSet<String>,
283    /// Dynamic context accumulated during execution
284    #[serde(default)]
285    pub context: HashMap<String, serde_json::Value>,
286    /// Branch history for debugging
287    #[serde(default)]
288    pub branch_history: Vec<BranchRecord>,
289}
290
291/// Record of one branch decision taken while traversing the DAG.
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct BranchRecord {
294    /// Source node of the traversed edge.
295    pub from_node: String,
296    /// Target node chosen or evaluated.
297    pub to_node: String,
298    /// Edge condition that was evaluated, if any.
299    pub condition: Option<String>,
300    /// Whether the branch condition evaluated to true.
301    pub result: bool,
302    /// Timestamp when the branch decision was recorded.
303    pub timestamp: DateTime<Utc>,
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::dynamic_orchestration::StepPrehookContext;
310
311    #[test]
312    fn test_dag_topological_sort() {
313        let mut plan = DynamicExecutionPlan::new();
314
315        plan.add_node(WorkflowNode {
316            id: "a".to_string(),
317            step_type: "qa".to_string(),
318            agent_id: None,
319            template: None,
320            prehook: None,
321            is_guard: false,
322            repeatable: false,
323        })
324        .expect("add node a");
325
326        plan.add_node(WorkflowNode {
327            id: "b".to_string(),
328            step_type: "fix".to_string(),
329            agent_id: None,
330            template: None,
331            prehook: None,
332            is_guard: false,
333            repeatable: true,
334        })
335        .expect("add node b");
336
337        plan.add_edge(WorkflowEdge {
338            from: "a".to_string(),
339            to: "b".to_string(),
340            condition: None,
341        })
342        .expect("add edge a->b");
343
344        let sorted = plan.topological_sort().expect("topological sort");
345        assert_eq!(sorted, vec!["a", "b"]);
346    }
347
348    #[test]
349    fn test_dag_cycle_detection() {
350        let mut plan = DynamicExecutionPlan::new();
351
352        plan.add_node(WorkflowNode {
353            id: "a".to_string(),
354            step_type: "qa".to_string(),
355            agent_id: None,
356            template: None,
357            prehook: None,
358            is_guard: false,
359            repeatable: false,
360        })
361        .expect("add node a");
362
363        plan.add_node(WorkflowNode {
364            id: "b".to_string(),
365            step_type: "fix".to_string(),
366            agent_id: None,
367            template: None,
368            prehook: None,
369            is_guard: false,
370            repeatable: true,
371        })
372        .expect("add node b");
373
374        plan.add_edge(WorkflowEdge {
375            from: "a".to_string(),
376            to: "b".to_string(),
377            condition: None,
378        })
379        .expect("add edge a->b");
380
381        plan.add_edge(WorkflowEdge {
382            from: "b".to_string(),
383            to: "a".to_string(),
384            condition: None,
385        })
386        .expect("add edge b->a");
387
388        assert!(plan.has_cycles());
389    }
390
391    #[test]
392    fn test_dag_find_next_nodes() {
393        let mut plan = DynamicExecutionPlan::new();
394
395        plan.add_node(WorkflowNode {
396            id: "qa".to_string(),
397            step_type: "qa".to_string(),
398            agent_id: None,
399            template: None,
400            prehook: None,
401            is_guard: false,
402            repeatable: false,
403        })
404        .expect("add node qa");
405
406        plan.add_node(WorkflowNode {
407            id: "fix".to_string(),
408            step_type: "fix".to_string(),
409            agent_id: None,
410            template: None,
411            prehook: None,
412            is_guard: false,
413            repeatable: true,
414        })
415        .expect("add node fix");
416
417        plan.add_node(WorkflowNode {
418            id: "done".to_string(),
419            step_type: "done".to_string(),
420            agent_id: None,
421            template: None,
422            prehook: None,
423            is_guard: true,
424            repeatable: false,
425        })
426        .expect("add node done");
427
428        plan.add_edge(WorkflowEdge {
429            from: "qa".to_string(),
430            to: "fix".to_string(),
431            condition: None,
432        })
433        .expect("add edge qa->fix");
434
435        plan.add_edge(WorkflowEdge {
436            from: "fix".to_string(),
437            to: "done".to_string(),
438            condition: Some("active_ticket_count == 0".to_string()),
439        })
440        .expect("add edge fix->done");
441
442        let context = StepPrehookContext {
443            active_ticket_count: 0,
444            upstream_artifacts: Vec::new(),
445            build_error_count: 0,
446            test_failure_count: 0,
447            build_exit_code: None,
448            test_exit_code: None,
449            ..Default::default()
450        };
451
452        let next_from_qa = plan.find_next_nodes("qa", &context);
453        assert!(next_from_qa.contains(&"fix".to_string()));
454
455        let next_from_fix = plan.find_next_nodes("fix", &context);
456        assert!(next_from_fix.contains(&"done".to_string()));
457    }
458
459    #[test]
460    fn test_dag_get_node() {
461        let mut plan = DynamicExecutionPlan::new();
462
463        plan.add_node(WorkflowNode {
464            id: "test".to_string(),
465            step_type: "qa".to_string(),
466            agent_id: Some("echo".to_string()),
467            template: Some("echo test".to_string()),
468            prehook: None,
469            is_guard: false,
470            repeatable: false,
471        })
472        .expect("add node test");
473
474        let node = plan.get_node("test");
475        assert!(node.is_some());
476        assert_eq!(node.expect("node test should exist").id, "test");
477
478        let none = plan.get_node("nonexistent");
479        assert!(none.is_none());
480    }
481
482    #[test]
483    fn test_dag_add_duplicate_node_error() {
484        let mut plan = DynamicExecutionPlan::new();
485        plan.add_node(WorkflowNode {
486            id: "a".to_string(),
487            step_type: "qa".to_string(),
488            agent_id: None,
489            template: None,
490            prehook: None,
491            is_guard: false,
492            repeatable: false,
493        })
494        .expect("seed duplicate node a");
495
496        let err = plan
497            .add_node(WorkflowNode {
498                id: "a".to_string(),
499                step_type: "fix".to_string(),
500                agent_id: None,
501                template: None,
502                prehook: None,
503                is_guard: false,
504                repeatable: true,
505            })
506            .expect_err("operation should fail");
507        assert!(err.to_string().contains("already exists"));
508    }
509
510    #[test]
511    fn test_dag_add_edge_missing_source() {
512        let mut plan = DynamicExecutionPlan::new();
513        plan.add_node(WorkflowNode {
514            id: "b".to_string(),
515            step_type: "fix".to_string(),
516            agent_id: None,
517            template: None,
518            prehook: None,
519            is_guard: false,
520            repeatable: false,
521        })
522        .expect("seed node b");
523
524        let err = plan
525            .add_edge(WorkflowEdge {
526                from: "a".to_string(),
527                to: "b".to_string(),
528                condition: None,
529            })
530            .expect_err("operation should fail");
531        assert!(err.to_string().contains("Source node"));
532    }
533
534    #[test]
535    fn test_dag_add_edge_missing_target() {
536        let mut plan = DynamicExecutionPlan::new();
537        plan.add_node(WorkflowNode {
538            id: "a".to_string(),
539            step_type: "qa".to_string(),
540            agent_id: None,
541            template: None,
542            prehook: None,
543            is_guard: false,
544            repeatable: false,
545        })
546        .expect("seed node a");
547
548        let err = plan
549            .add_edge(WorkflowEdge {
550                from: "a".to_string(),
551                to: "b".to_string(),
552                condition: None,
553            })
554            .expect_err("operation should fail");
555        assert!(err.to_string().contains("Target node"));
556    }
557
558    #[test]
559    fn test_dag_entry_exit_nodes() {
560        let mut plan = DynamicExecutionPlan::new();
561        plan.add_node(WorkflowNode {
562            id: "start".to_string(),
563            step_type: "init".to_string(),
564            agent_id: None,
565            template: None,
566            prehook: None,
567            is_guard: false,
568            repeatable: false,
569        })
570        .expect("add start");
571        plan.add_node(WorkflowNode {
572            id: "mid".to_string(),
573            step_type: "qa".to_string(),
574            agent_id: None,
575            template: None,
576            prehook: None,
577            is_guard: false,
578            repeatable: false,
579        })
580        .expect("add mid");
581        plan.add_node(WorkflowNode {
582            id: "end".to_string(),
583            step_type: "done".to_string(),
584            agent_id: None,
585            template: None,
586            prehook: None,
587            is_guard: false,
588            repeatable: false,
589        })
590        .expect("add end");
591        plan.add_edge(WorkflowEdge {
592            from: "start".to_string(),
593            to: "mid".to_string(),
594            condition: None,
595        })
596        .expect("add edge start->mid");
597        plan.add_edge(WorkflowEdge {
598            from: "mid".to_string(),
599            to: "end".to_string(),
600            condition: None,
601        })
602        .expect("add edge mid->end");
603
604        let entries: Vec<&str> = plan
605            .get_entry_nodes()
606            .iter()
607            .map(|n| n.id.as_str())
608            .collect();
609        assert_eq!(entries.len(), 1);
610        assert!(entries.contains(&"start"));
611
612        let exits: Vec<&str> = plan
613            .get_exit_nodes()
614            .iter()
615            .map(|n| n.id.as_str())
616            .collect();
617        assert_eq!(exits.len(), 1);
618        assert!(exits.contains(&"end"));
619    }
620
621    #[test]
622    fn test_dag_empty_plan_no_entries_no_exits() {
623        let plan = DynamicExecutionPlan::new();
624        assert!(plan.get_entry_nodes().is_empty());
625        assert!(plan.get_exit_nodes().is_empty());
626        assert!(!plan.has_cycles());
627    }
628
629    #[test]
630    fn test_dag_single_node_is_both_entry_and_exit() {
631        let mut plan = DynamicExecutionPlan::new();
632        plan.add_node(WorkflowNode {
633            id: "only".to_string(),
634            step_type: "qa".to_string(),
635            agent_id: None,
636            template: None,
637            prehook: None,
638            is_guard: false,
639            repeatable: false,
640        })
641        .expect("add only node");
642
643        assert_eq!(plan.get_entry_nodes().len(), 1);
644        assert_eq!(plan.get_exit_nodes().len(), 1);
645        assert!(!plan.has_cycles());
646    }
647
648    #[test]
649    fn test_dag_incoming_outgoing_edges() {
650        let mut plan = DynamicExecutionPlan::new();
651        plan.add_node(WorkflowNode {
652            id: "a".to_string(),
653            step_type: "qa".to_string(),
654            agent_id: None,
655            template: None,
656            prehook: None,
657            is_guard: false,
658            repeatable: false,
659        })
660        .expect("add node a");
661        plan.add_node(WorkflowNode {
662            id: "b".to_string(),
663            step_type: "fix".to_string(),
664            agent_id: None,
665            template: None,
666            prehook: None,
667            is_guard: false,
668            repeatable: false,
669        })
670        .expect("add node b");
671        plan.add_edge(WorkflowEdge {
672            from: "a".to_string(),
673            to: "b".to_string(),
674            condition: None,
675        })
676        .expect("add edge a->b");
677
678        assert_eq!(plan.get_outgoing_edges("a").len(), 1);
679        assert_eq!(plan.get_incoming_edges("b").len(), 1);
680        assert!(plan.get_outgoing_edges("b").is_empty());
681        assert!(plan.get_incoming_edges("a").is_empty());
682        assert!(plan.get_outgoing_edges("nonexistent").is_empty());
683    }
684
685    #[test]
686    fn test_dag_topological_sort_cycle_error() {
687        let mut plan = DynamicExecutionPlan::new();
688        plan.add_node(WorkflowNode {
689            id: "a".to_string(),
690            step_type: "qa".to_string(),
691            agent_id: None,
692            template: None,
693            prehook: None,
694            is_guard: false,
695            repeatable: false,
696        })
697        .expect("add node a");
698        plan.add_node(WorkflowNode {
699            id: "b".to_string(),
700            step_type: "fix".to_string(),
701            agent_id: None,
702            template: None,
703            prehook: None,
704            is_guard: false,
705            repeatable: false,
706        })
707        .expect("add node b");
708        plan.add_edge(WorkflowEdge {
709            from: "a".to_string(),
710            to: "b".to_string(),
711            condition: None,
712        })
713        .expect("add edge a->b");
714        plan.add_edge(WorkflowEdge {
715            from: "b".to_string(),
716            to: "a".to_string(),
717            condition: None,
718        })
719        .expect("add edge b->a");
720
721        let err = plan.topological_sort().expect_err("operation should fail");
722        assert!(err.to_string().contains("cycles"));
723    }
724
725    #[test]
726    fn test_dag_topological_sort_empty() {
727        let plan = DynamicExecutionPlan::new();
728        let sorted = plan.topological_sort().expect("empty topological sort");
729        assert!(sorted.is_empty());
730    }
731
732    #[test]
733    fn test_dag_topological_sort_rejects_unknown_target() {
734        let mut plan = DynamicExecutionPlan::new();
735        plan.add_node(WorkflowNode {
736            id: "a".to_string(),
737            step_type: "qa".to_string(),
738            agent_id: None,
739            template: None,
740            prehook: None,
741            is_guard: false,
742            repeatable: false,
743        })
744        .expect("add node a");
745        plan.edges.push(WorkflowEdge {
746            from: "a".to_string(),
747            to: "ghost".to_string(),
748            condition: None,
749        });
750
751        let err = plan.topological_sort().expect_err("operation should fail");
752        assert!(err.to_string().contains("missing target node ghost"));
753    }
754
755    #[test]
756    fn test_dag_topological_sort_diamond() {
757        let mut plan = DynamicExecutionPlan::new();
758        for id in &["a", "b", "c", "d"] {
759            plan.add_node(WorkflowNode {
760                id: id.to_string(),
761                step_type: "step".to_string(),
762                agent_id: None,
763                template: None,
764                prehook: None,
765                is_guard: false,
766                repeatable: false,
767            })
768            .expect("add diamond node");
769        }
770        plan.add_edge(WorkflowEdge {
771            from: "a".to_string(),
772            to: "b".to_string(),
773            condition: None,
774        })
775        .expect("add edge a->b");
776        plan.add_edge(WorkflowEdge {
777            from: "a".to_string(),
778            to: "c".to_string(),
779            condition: None,
780        })
781        .expect("add edge a->c");
782        plan.add_edge(WorkflowEdge {
783            from: "b".to_string(),
784            to: "d".to_string(),
785            condition: None,
786        })
787        .expect("add edge b->d");
788        plan.add_edge(WorkflowEdge {
789            from: "c".to_string(),
790            to: "d".to_string(),
791            condition: None,
792        })
793        .expect("add edge c->d");
794
795        let sorted = plan.topological_sort().expect("diamond topological sort");
796        assert_eq!(sorted.len(), 4);
797        let pos = |id: &str| {
798            sorted
799                .iter()
800                .position(|s| s == id)
801                .expect("id should be present in sorted output")
802        };
803        assert!(pos("a") < pos("b"));
804        assert!(pos("a") < pos("c"));
805        assert!(pos("b") < pos("d"));
806        assert!(pos("c") < pos("d"));
807    }
808
809    #[test]
810    fn test_dag_find_next_nodes_conditional_not_met() {
811        let mut plan = DynamicExecutionPlan::new();
812        plan.add_node(WorkflowNode {
813            id: "a".to_string(),
814            step_type: "qa".to_string(),
815            agent_id: None,
816            template: None,
817            prehook: None,
818            is_guard: false,
819            repeatable: false,
820        })
821        .expect("add node a");
822        plan.add_node(WorkflowNode {
823            id: "b".to_string(),
824            step_type: "fix".to_string(),
825            agent_id: None,
826            template: None,
827            prehook: None,
828            is_guard: false,
829            repeatable: false,
830        })
831        .expect("add node b");
832        plan.add_edge(WorkflowEdge {
833            from: "a".to_string(),
834            to: "b".to_string(),
835            condition: Some("active_ticket_count > 0".to_string()),
836        })
837        .expect("add conditional edge a->b");
838
839        let ctx = StepPrehookContext {
840            active_ticket_count: 0,
841            ..Default::default()
842        };
843        let next = plan.find_next_nodes("a", &ctx);
844        assert!(next.is_empty());
845    }
846
847    #[test]
848    fn test_dag_find_next_nodes_nonexistent_node() {
849        let plan = DynamicExecutionPlan::new();
850        let ctx = StepPrehookContext::default();
851        let next = plan.find_next_nodes("nope", &ctx);
852        assert!(next.is_empty());
853    }
854
855    #[test]
856    fn test_dag_is_completed_not_completed_when_only_mid_done() {
857        let mut plan = DynamicExecutionPlan::new();
858        plan.add_node(WorkflowNode {
859            id: "start".to_string(),
860            step_type: "init".to_string(),
861            agent_id: None,
862            template: None,
863            prehook: None,
864            is_guard: false,
865            repeatable: false,
866        })
867        .expect("add start node");
868        plan.add_node(WorkflowNode {
869            id: "end".to_string(),
870            step_type: "done".to_string(),
871            agent_id: None,
872            template: None,
873            prehook: None,
874            is_guard: false,
875            repeatable: false,
876        })
877        .expect("add end node");
878        plan.add_edge(WorkflowEdge {
879            from: "start".to_string(),
880            to: "end".to_string(),
881            condition: None,
882        })
883        .expect("add edge start->end");
884
885        let mut state = DagExecutionState::default();
886        state.completed_nodes.insert("start".to_string());
887        assert!(!plan.is_completed(&state));
888    }
889
890    #[test]
891    fn test_prehook_config_default() {
892        let cfg = PrehookConfig::default();
893        assert_eq!(cfg.engine, "cel");
894        assert_eq!(cfg.when, "true");
895        assert!(cfg.reason.is_none());
896        assert!(!cfg.extended);
897    }
898
899    #[test]
900    fn test_dynamic_execution_plan_serde_round_trip() {
901        let mut plan = DynamicExecutionPlan::new();
902        plan.add_node(WorkflowNode {
903            id: "n1".to_string(),
904            step_type: "qa".to_string(),
905            agent_id: None,
906            template: None,
907            prehook: None,
908            is_guard: false,
909            repeatable: true,
910        })
911        .expect("add node n1");
912        plan.entry = Some("n1".to_string());
913
914        let json = serde_json::to_string(&plan).expect("serialize plan");
915        let plan2: DynamicExecutionPlan = serde_json::from_str(&json).expect("deserialize plan");
916        assert_eq!(plan2.entry, Some("n1".to_string()));
917        assert!(plan2.nodes.contains_key("n1"));
918    }
919
920    #[test]
921    fn test_dag_execution_state_default() {
922        let state = DagExecutionState::default();
923        assert!(state.current_node.is_none());
924        assert!(state.completed_nodes.is_empty());
925        assert!(state.skipped_nodes.is_empty());
926        assert!(state.context.is_empty());
927        assert!(state.branch_history.is_empty());
928    }
929
930    #[test]
931    fn test_dag_is_completed() {
932        let mut plan = DynamicExecutionPlan::new();
933
934        plan.add_node(WorkflowNode {
935            id: "start".to_string(),
936            step_type: "init".to_string(),
937            agent_id: None,
938            template: None,
939            prehook: None,
940            is_guard: false,
941            repeatable: false,
942        })
943        .expect("add start node");
944
945        plan.add_node(WorkflowNode {
946            id: "end".to_string(),
947            step_type: "done".to_string(),
948            agent_id: None,
949            template: None,
950            prehook: None,
951            is_guard: true,
952            repeatable: false,
953        })
954        .expect("add end node");
955
956        plan.add_edge(WorkflowEdge {
957            from: "start".to_string(),
958            to: "end".to_string(),
959            condition: None,
960        })
961        .expect("add edge start->end");
962
963        let mut state = DagExecutionState::default();
964
965        assert!(!plan.is_completed(&state));
966
967        state.completed_nodes.insert("end".to_string());
968
969        assert!(plan.is_completed(&state));
970    }
971}