Skip to main content

pipeline_service/execution/
graph.rs

1// Execution Graph (DAG) Builder
2// Builds a directed acyclic graph from pipeline definition for execution ordering
3
4use crate::parser::models::{BoolOrExpression, DependsOn, Job, Pipeline, Stage, Variable};
5
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::fmt;
8
9/// Error type for graph operations
10#[derive(Debug, Clone)]
11pub struct GraphError {
12    pub message: String,
13    pub kind: GraphErrorKind,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum GraphErrorKind {
18    /// Circular dependency detected
19    CyclicDependency,
20    /// Reference to unknown stage/job
21    UnknownDependency,
22    /// Invalid pipeline structure
23    InvalidStructure,
24}
25
26impl fmt::Display for GraphError {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        write!(f, "graph error: {}", self.message)
29    }
30}
31
32impl std::error::Error for GraphError {}
33
34impl GraphError {
35    pub fn cyclic(message: impl Into<String>) -> Self {
36        Self {
37            message: message.into(),
38            kind: GraphErrorKind::CyclicDependency,
39        }
40    }
41
42    pub fn unknown_dependency(message: impl Into<String>) -> Self {
43        Self {
44            message: message.into(),
45            kind: GraphErrorKind::UnknownDependency,
46        }
47    }
48
49    pub fn invalid_structure(message: impl Into<String>) -> Self {
50        Self {
51            message: message.into(),
52            kind: GraphErrorKind::InvalidStructure,
53        }
54    }
55}
56
57/// Execution graph representing the DAG of stages and jobs
58#[derive(Debug, Clone)]
59pub struct ExecutionGraph {
60    /// All stages in the pipeline
61    pub stages: Vec<StageNode>,
62    /// Quick lookup of stage index by name
63    stage_indices: HashMap<String, usize>,
64    /// Pipeline-level variables
65    pub variables: Vec<Variable>,
66}
67
68/// A node representing a stage in the execution graph
69#[derive(Debug, Clone)]
70pub struct StageNode {
71    /// Stage definition
72    pub stage: Stage,
73    /// Names of stages this stage depends on
74    pub dependencies: Vec<String>,
75    /// Jobs within this stage
76    pub jobs: Vec<JobNode>,
77    /// Quick lookup of job index by name
78    job_indices: HashMap<String, usize>,
79}
80
81/// A node representing a job in the execution graph
82#[derive(Debug, Clone)]
83pub struct JobNode {
84    /// Job definition
85    pub job: Job,
86    /// Names of jobs this job depends on (within the same stage)
87    pub dependencies: Vec<String>,
88    /// Matrix instances (empty if no matrix strategy)
89    pub matrix_instances: Vec<super::matrix::MatrixInstance>,
90}
91
92impl ExecutionGraph {
93    /// Build an execution graph from a pipeline definition
94    pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, GraphError> {
95        // Normalize pipeline to always have stages
96        let stages = Self::normalize_to_stages(pipeline)?;
97
98        // Build stage nodes
99        let mut stage_nodes = Vec::with_capacity(stages.len());
100        let mut stage_indices = HashMap::new();
101
102        for (i, stage) in stages.iter().enumerate() {
103            let stage_name = stage.stage.clone().unwrap_or_default();
104            stage_indices.insert(stage_name.clone(), i);
105
106            // Calculate stage dependencies
107            let dependencies = Self::calculate_stage_dependencies(stage, i, &stages);
108
109            // Build job nodes for this stage
110            let jobs = Self::build_job_nodes(&stage.jobs)?;
111
112            stage_nodes.push(StageNode {
113                stage: stage.clone(),
114                dependencies,
115                jobs,
116                job_indices: stage
117                    .jobs
118                    .iter()
119                    .enumerate()
120                    .filter_map(|(idx, j)| j.identifier().map(|name| (name.to_string(), idx)))
121                    .collect(),
122            });
123        }
124
125        let graph = Self {
126            stages: stage_nodes,
127            stage_indices,
128            variables: pipeline.variables.clone(),
129        };
130
131        // Validate the graph
132        graph.validate()?;
133
134        Ok(graph)
135    }
136
137    /// Normalize pipeline to stage-based structure
138    fn normalize_to_stages(pipeline: &Pipeline) -> Result<Vec<Stage>, GraphError> {
139        // If we have stages, use them directly
140        if !pipeline.stages.is_empty() {
141            return Ok(pipeline.stages.clone());
142        }
143
144        // If we have jobs but no stages, create a single stage
145        if !pipeline.jobs.is_empty() {
146            return Ok(vec![Stage {
147                stage: Some("__default".to_string()),
148                display_name: None,
149                depends_on: DependsOn::None,
150                condition: None,
151                variables: Vec::new(),
152                jobs: pipeline.jobs.clone(),
153                lock_behavior: None,
154                template: None,
155                parameters: HashMap::new(),
156                pool: pipeline.pool.clone(),
157                has_template_directives: false,
158            }]);
159        }
160
161        // If we have steps but no jobs/stages, create a single job in a single stage
162        if !pipeline.steps.is_empty() {
163            let job = Job {
164                job: Some("__default".to_string()),
165                deployment: None,
166                display_name: None,
167                depends_on: DependsOn::None,
168                condition: None,
169                strategy: None,
170                pool: pipeline.pool.clone(),
171                container: None,
172                services: HashMap::new(),
173                variables: Vec::new(),
174                steps: pipeline.steps.clone(),
175                timeout_in_minutes: None,
176                cancel_timeout_in_minutes: None,
177                continue_on_error: BoolOrExpression::default(),
178                workspace: None,
179                uses: None,
180                template: None,
181                parameters: HashMap::new(),
182                environment: None,
183                has_template_directives: false,
184            };
185
186            return Ok(vec![Stage {
187                stage: Some("__default".to_string()),
188                display_name: None,
189                depends_on: DependsOn::None,
190                condition: None,
191                variables: Vec::new(),
192                jobs: vec![job],
193                lock_behavior: None,
194                template: None,
195                parameters: HashMap::new(),
196                pool: pipeline.pool.clone(),
197                has_template_directives: false,
198            }]);
199        }
200
201        // Empty pipeline
202        Ok(Vec::new())
203    }
204
205    /// Calculate dependencies for a stage based on dependsOn field
206    fn calculate_stage_dependencies(
207        stage: &Stage,
208        index: usize,
209        all_stages: &[Stage],
210    ) -> Vec<String> {
211        match &stage.depends_on {
212            DependsOn::Default => {
213                // Default: depends on the previous stage (if any)
214                if index > 0 {
215                    vec![all_stages[index - 1].stage.clone().unwrap_or_default()]
216                } else {
217                    vec![]
218                }
219            }
220            DependsOn::None => vec![],
221            DependsOn::Single(name) => vec![name.clone()],
222            DependsOn::Multiple(names) => names.clone(),
223        }
224    }
225
226    /// Build job nodes for a stage
227    fn build_job_nodes(jobs: &[Job]) -> Result<Vec<JobNode>, GraphError> {
228        let mut job_nodes = Vec::with_capacity(jobs.len());
229        let job_names: HashSet<_> = jobs.iter().filter_map(|j| j.identifier()).collect();
230
231        for (i, job) in jobs.iter().enumerate() {
232            let dependencies = Self::calculate_job_dependencies(job, i, jobs, &job_names)?;
233
234            job_nodes.push(JobNode {
235                job: job.clone(),
236                dependencies,
237                matrix_instances: Vec::new(), // Populated later by matrix expander
238            });
239        }
240
241        Ok(job_nodes)
242    }
243
244    /// Calculate dependencies for a job based on dependsOn field
245    fn calculate_job_dependencies(
246        job: &Job,
247        index: usize,
248        all_jobs: &[Job],
249        job_names: &HashSet<&str>,
250    ) -> Result<Vec<String>, GraphError> {
251        let deps = match &job.depends_on {
252            DependsOn::Default => {
253                // Default: depends on the previous job (if any)
254                if index > 0 {
255                    if let Some(prev_name) = all_jobs[index - 1].identifier() {
256                        vec![prev_name.to_string()]
257                    } else {
258                        vec![]
259                    }
260                } else {
261                    vec![]
262                }
263            }
264            DependsOn::None => vec![],
265            DependsOn::Single(name) => vec![name.clone()],
266            DependsOn::Multiple(names) => names.clone(),
267        };
268
269        // Validate that all dependencies exist
270        for dep in &deps {
271            if !job_names.contains(dep.as_str()) {
272                return Err(GraphError::unknown_dependency(format!(
273                    "job '{}' depends on unknown job '{}'",
274                    job.identifier().unwrap_or("unknown"),
275                    dep
276                )));
277            }
278        }
279
280        Ok(deps)
281    }
282
283    /// Validate the execution graph (check for cycles and unknown dependencies)
284    pub fn validate(&self) -> Result<(), GraphError> {
285        // Validate stage dependencies exist
286        let stage_names: HashSet<_> = self
287            .stages
288            .iter()
289            .map(|s| s.stage.stage.as_deref().unwrap_or(""))
290            .collect();
291
292        for stage_node in &self.stages {
293            for dep in &stage_node.dependencies {
294                if !stage_names.contains(dep.as_str()) {
295                    return Err(GraphError::unknown_dependency(format!(
296                        "stage '{}' depends on unknown stage '{}'",
297                        stage_node.stage.stage.as_deref().unwrap_or("unknown"),
298                        dep
299                    )));
300                }
301            }
302        }
303
304        // Check for cycles at stage level
305        self.detect_stage_cycles()?;
306
307        // Check for cycles at job level within each stage
308        for stage_node in &self.stages {
309            self.detect_job_cycles(stage_node)?;
310        }
311
312        Ok(())
313    }
314
315    /// Detect cycles in stage dependencies using DFS
316    fn detect_stage_cycles(&self) -> Result<(), GraphError> {
317        let mut visited = HashSet::new();
318        let mut rec_stack = HashSet::new();
319
320        for stage_node in &self.stages {
321            if !visited.contains(stage_node.stage.stage.as_deref().unwrap_or("")) {
322                if let Some(cycle) = self.dfs_stage_cycle(stage_node, &mut visited, &mut rec_stack)
323                {
324                    return Err(GraphError::cyclic(format!(
325                        "circular dependency detected in stages: {}",
326                        cycle.join(" -> ")
327                    )));
328                }
329            }
330        }
331
332        Ok(())
333    }
334
335    fn dfs_stage_cycle(
336        &self,
337        node: &StageNode,
338        visited: &mut HashSet<String>,
339        rec_stack: &mut HashSet<String>,
340    ) -> Option<Vec<String>> {
341        let name = node.stage.stage.clone().unwrap_or_default();
342        visited.insert(name.clone());
343        rec_stack.insert(name.clone());
344
345        for dep in &node.dependencies {
346            if !visited.contains(dep) {
347                if let Some(stage_idx) = self.stage_indices.get(dep) {
348                    if let Some(mut cycle) =
349                        self.dfs_stage_cycle(&self.stages[*stage_idx], visited, rec_stack)
350                    {
351                        cycle.insert(0, name.clone());
352                        return Some(cycle);
353                    }
354                }
355            } else if rec_stack.contains(dep) {
356                return Some(vec![name.clone(), dep.clone()]);
357            }
358        }
359
360        rec_stack.remove(&name);
361        None
362    }
363
364    /// Detect cycles in job dependencies within a stage using DFS
365    fn detect_job_cycles(&self, stage: &StageNode) -> Result<(), GraphError> {
366        let mut visited = HashSet::new();
367        let mut rec_stack = HashSet::new();
368
369        for job_node in &stage.jobs {
370            let job_name = job_node.job.identifier().unwrap_or("unknown").to_string();
371            if !visited.contains(&job_name) {
372                if let Some(cycle) =
373                    self.dfs_job_cycle(stage, job_node, &mut visited, &mut rec_stack)
374                {
375                    return Err(GraphError::cyclic(format!(
376                        "circular dependency detected in jobs of stage '{}': {}",
377                        stage.stage.stage.as_deref().unwrap_or("unknown"),
378                        cycle.join(" -> ")
379                    )));
380                }
381            }
382        }
383
384        Ok(())
385    }
386
387    #[allow(clippy::only_used_in_recursion)]
388    fn dfs_job_cycle(
389        &self,
390        stage: &StageNode,
391        node: &JobNode,
392        visited: &mut HashSet<String>,
393        rec_stack: &mut HashSet<String>,
394    ) -> Option<Vec<String>> {
395        let name = node.job.identifier().unwrap_or("unknown").to_string();
396        visited.insert(name.clone());
397        rec_stack.insert(name.clone());
398
399        for dep in &node.dependencies {
400            if !visited.contains(dep) {
401                if let Some(job_idx) = stage.job_indices.get(dep) {
402                    if let Some(mut cycle) =
403                        self.dfs_job_cycle(stage, &stage.jobs[*job_idx], visited, rec_stack)
404                    {
405                        cycle.insert(0, name.clone());
406                        return Some(cycle);
407                    }
408                }
409            } else if rec_stack.contains(dep) {
410                return Some(vec![name.clone(), dep.clone()]);
411            }
412        }
413
414        rec_stack.remove(&name);
415        None
416    }
417
418    /// Get stages in topological order (respecting dependencies)
419    pub fn topological_order(&self) -> Vec<&StageNode> {
420        // Kahn's algorithm for topological sort
421        let mut in_degree: HashMap<&str, usize> = HashMap::new();
422        let mut adj_list: HashMap<&str, Vec<&str>> = HashMap::new();
423
424        // Initialize
425        for stage in &self.stages {
426            let name = stage.stage.stage.as_deref().unwrap_or("");
427            in_degree.entry(name).or_insert(0);
428            adj_list.entry(name).or_default();
429
430            for dep in &stage.dependencies {
431                adj_list.entry(dep.as_str()).or_default().push(name);
432                *in_degree.entry(name).or_insert(0) += 1;
433            }
434        }
435
436        // Find all nodes with in-degree 0
437        let mut queue: VecDeque<&str> = in_degree
438            .iter()
439            .filter(|(_, &deg)| deg == 0)
440            .map(|(&name, _)| name)
441            .collect();
442
443        let mut result = Vec::new();
444
445        while let Some(name) = queue.pop_front() {
446            if let Some(idx) = self.stage_indices.get(name) {
447                result.push(&self.stages[*idx]);
448            }
449
450            if let Some(neighbors) = adj_list.get(name) {
451                for &neighbor in neighbors {
452                    if let Some(deg) = in_degree.get_mut(neighbor) {
453                        *deg -= 1;
454                        if *deg == 0 {
455                            queue.push_back(neighbor);
456                        }
457                    }
458                }
459            }
460        }
461
462        result
463    }
464
465    /// Get a stage by name
466    pub fn get_stage(&self, name: &str) -> Option<&StageNode> {
467        self.stage_indices.get(name).map(|&idx| &self.stages[idx])
468    }
469
470    /// Get jobs in topological order for a stage
471    pub fn jobs_topological_order<'a>(&self, stage: &'a StageNode) -> Vec<&'a JobNode> {
472        let mut in_degree: HashMap<&str, usize> = HashMap::new();
473        let mut adj_list: HashMap<&str, Vec<&str>> = HashMap::new();
474
475        // Initialize
476        for job in &stage.jobs {
477            let name = job.job.identifier().unwrap_or("unknown");
478            in_degree.entry(name).or_insert(0);
479            adj_list.entry(name).or_default();
480
481            for dep in &job.dependencies {
482                adj_list.entry(dep.as_str()).or_default().push(name);
483                *in_degree.entry(name).or_insert(0) += 1;
484            }
485        }
486
487        // Find all nodes with in-degree 0
488        let mut queue: VecDeque<&str> = in_degree
489            .iter()
490            .filter(|(_, &deg)| deg == 0)
491            .map(|(&name, _)| name)
492            .collect();
493
494        let mut result = Vec::new();
495
496        while let Some(name) = queue.pop_front() {
497            if let Some(idx) = stage.job_indices.get(name) {
498                result.push(&stage.jobs[*idx]);
499            }
500
501            if let Some(neighbors) = adj_list.get(name) {
502                for &neighbor in neighbors {
503                    if let Some(deg) = in_degree.get_mut(neighbor) {
504                        *deg -= 1;
505                        if *deg == 0 {
506                            queue.push_back(neighbor);
507                        }
508                    }
509                }
510            }
511        }
512
513        result
514    }
515
516    /// Get stages that can run in parallel (no dependencies between them)
517    pub fn parallel_stages(&self) -> Vec<Vec<&StageNode>> {
518        let mut levels: Vec<Vec<&StageNode>> = Vec::new();
519        let mut assigned: HashMap<&str, usize> = HashMap::new();
520
521        for stage in self.topological_order() {
522            let name = stage.stage.stage.as_deref().unwrap_or("");
523            let level = if stage.dependencies.is_empty() {
524                0
525            } else {
526                stage
527                    .dependencies
528                    .iter()
529                    .filter_map(|dep| assigned.get(dep.as_str()))
530                    .max()
531                    .map(|l| l + 1)
532                    .unwrap_or(0)
533            };
534
535            assigned.insert(name, level);
536
537            if level >= levels.len() {
538                levels.resize(level + 1, Vec::new());
539            }
540            levels[level].push(stage);
541        }
542
543        levels
544    }
545
546    /// Get jobs that can run in parallel within a stage
547    pub fn parallel_jobs<'a>(&self, stage: &'a StageNode) -> Vec<Vec<&'a JobNode>> {
548        let mut levels: Vec<Vec<&'a JobNode>> = Vec::new();
549        let mut assigned: HashMap<&str, usize> = HashMap::new();
550
551        for job in self.jobs_topological_order(stage) {
552            let name = job.job.identifier().unwrap_or("unknown");
553            let level = if job.dependencies.is_empty() {
554                0
555            } else {
556                job.dependencies
557                    .iter()
558                    .filter_map(|dep| assigned.get(dep.as_str()))
559                    .max()
560                    .map(|l| l + 1)
561                    .unwrap_or(0)
562            };
563
564            assigned.insert(name, level);
565
566            if level >= levels.len() {
567                levels.resize(level + 1, Vec::new());
568            }
569            levels[level].push(job);
570        }
571
572        levels
573    }
574}
575
576impl StageNode {
577    /// Get a job by name
578    pub fn get_job(&self, name: &str) -> Option<&JobNode> {
579        self.job_indices.get(name).map(|&idx| &self.jobs[idx])
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586
587    fn make_pipeline_with_stages(stages: Vec<Stage>) -> Pipeline {
588        Pipeline {
589            stages,
590            ..Default::default()
591        }
592    }
593
594    fn make_stage(name: &str, depends_on: DependsOn) -> Stage {
595        Stage {
596            stage: Some(name.to_string()),
597            display_name: None,
598            depends_on,
599            condition: None,
600            variables: Vec::new(),
601            jobs: vec![make_job("Job1", DependsOn::None)],
602            lock_behavior: None,
603            template: None,
604            parameters: HashMap::new(),
605            pool: None,
606            has_template_directives: false,
607        }
608    }
609
610    fn make_job(name: &str, depends_on: DependsOn) -> Job {
611        Job {
612            job: Some(name.to_string()),
613            deployment: None,
614            display_name: None,
615            depends_on,
616            condition: None,
617            strategy: None,
618            pool: None,
619            container: None,
620            services: HashMap::new(),
621            variables: Vec::new(),
622            steps: Vec::new(),
623            timeout_in_minutes: None,
624            cancel_timeout_in_minutes: None,
625            continue_on_error: BoolOrExpression::default(),
626            workspace: None,
627            uses: None,
628            template: None,
629            parameters: HashMap::new(),
630            environment: None,
631            has_template_directives: false,
632        }
633    }
634
635    #[test]
636    fn test_simple_linear_stages() {
637        let pipeline = make_pipeline_with_stages(vec![
638            make_stage("Build", DependsOn::None),
639            make_stage("Test", DependsOn::Default), // depends on Build
640            make_stage("Deploy", DependsOn::Default), // depends on Test
641        ]);
642
643        let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
644
645        assert_eq!(graph.stages.len(), 3);
646        assert!(graph.stages[0].dependencies.is_empty());
647        assert_eq!(graph.stages[1].dependencies, vec!["Build"]);
648        assert_eq!(graph.stages[2].dependencies, vec!["Test"]);
649
650        // Topological order should be Build -> Test -> Deploy
651        let order: Vec<_> = graph.topological_order();
652        assert_eq!(order.len(), 3);
653        assert_eq!(order[0].stage.stage, Some("Build".to_string()));
654        assert_eq!(order[1].stage.stage, Some("Test".to_string()));
655        assert_eq!(order[2].stage.stage, Some("Deploy".to_string()));
656    }
657
658    #[test]
659    fn test_parallel_stages() {
660        let pipeline = make_pipeline_with_stages(vec![
661            make_stage("Build", DependsOn::None),
662            make_stage("UnitTest", DependsOn::Single("Build".to_string())),
663            make_stage("IntegrationTest", DependsOn::Single("Build".to_string())),
664            make_stage(
665                "Deploy",
666                DependsOn::Multiple(vec!["UnitTest".to_string(), "IntegrationTest".to_string()]),
667            ),
668        ]);
669
670        let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
671
672        let parallel = graph.parallel_stages();
673        assert_eq!(parallel.len(), 3);
674
675        // Level 0: Build
676        assert_eq!(parallel[0].len(), 1);
677        assert_eq!(parallel[0][0].stage.stage, Some("Build".to_string()));
678
679        // Level 1: UnitTest, IntegrationTest (can run in parallel)
680        assert_eq!(parallel[1].len(), 2);
681
682        // Level 2: Deploy
683        assert_eq!(parallel[2].len(), 1);
684        assert_eq!(parallel[2][0].stage.stage, Some("Deploy".to_string()));
685    }
686
687    #[test]
688    fn test_cycle_detection_stages() {
689        let pipeline = make_pipeline_with_stages(vec![
690            make_stage("A", DependsOn::Single("C".to_string())),
691            make_stage("B", DependsOn::Single("A".to_string())),
692            make_stage("C", DependsOn::Single("B".to_string())),
693        ]);
694
695        let result = ExecutionGraph::from_pipeline(&pipeline);
696        assert!(result.is_err());
697        let err = result.unwrap_err();
698        assert_eq!(err.kind, GraphErrorKind::CyclicDependency);
699    }
700
701    #[test]
702    fn test_unknown_dependency() {
703        let pipeline = make_pipeline_with_stages(vec![
704            make_stage("Build", DependsOn::None),
705            make_stage("Test", DependsOn::Single("Unknown".to_string())),
706        ]);
707
708        let result = ExecutionGraph::from_pipeline(&pipeline);
709        assert!(result.is_err());
710        let err = result.unwrap_err();
711        assert_eq!(err.kind, GraphErrorKind::UnknownDependency);
712    }
713
714    #[test]
715    fn test_jobs_within_stage() {
716        let mut stage = make_stage("Build", DependsOn::None);
717        stage.jobs = vec![
718            make_job("Compile", DependsOn::None),
719            make_job("Lint", DependsOn::None),
720            make_job(
721                "Package",
722                DependsOn::Multiple(vec!["Compile".to_string(), "Lint".to_string()]),
723            ),
724        ];
725
726        let pipeline = make_pipeline_with_stages(vec![stage]);
727        let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
728
729        let stage_node = &graph.stages[0];
730        let parallel_jobs = graph.parallel_jobs(stage_node);
731
732        // Level 0: Compile, Lint (can run in parallel)
733        assert_eq!(parallel_jobs[0].len(), 2);
734
735        // Level 1: Package
736        assert_eq!(parallel_jobs[1].len(), 1);
737        assert_eq!(parallel_jobs[1][0].job.identifier(), Some("Package"));
738    }
739
740    #[test]
741    fn test_job_cycle_detection() {
742        let mut stage = make_stage("Build", DependsOn::None);
743        stage.jobs = vec![
744            make_job("A", DependsOn::Single("C".to_string())),
745            make_job("B", DependsOn::Single("A".to_string())),
746            make_job("C", DependsOn::Single("B".to_string())),
747        ];
748
749        let pipeline = make_pipeline_with_stages(vec![stage]);
750        let result = ExecutionGraph::from_pipeline(&pipeline);
751
752        assert!(result.is_err());
753        let err = result.unwrap_err();
754        assert_eq!(err.kind, GraphErrorKind::CyclicDependency);
755    }
756
757    #[test]
758    fn test_normalize_steps_only_pipeline() {
759        use crate::parser::models::{ScriptStep, Step, StepAction};
760
761        let pipeline = Pipeline {
762            steps: vec![Step {
763                name: Some("echo".to_string()),
764                display_name: Some("Echo Hello".to_string()),
765                condition: None,
766                continue_on_error: BoolOrExpression::default(),
767                enabled: true,
768                timeout_in_minutes: None,
769                retry_count_on_task_failure: None,
770                env: HashMap::new(),
771                action: StepAction::Script(ScriptStep {
772                    script: "echo hello".to_string(),
773                    working_directory: None,
774                    fail_on_stderr: false,
775                }),
776            }],
777            ..Default::default()
778        };
779
780        let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
781
782        assert_eq!(graph.stages.len(), 1);
783        assert_eq!(graph.stages[0].stage.stage, Some("__default".to_string()));
784        assert_eq!(graph.stages[0].jobs.len(), 1);
785        assert_eq!(graph.stages[0].jobs[0].job.steps.len(), 1);
786    }
787
788    #[test]
789    fn test_normalize_jobs_only_pipeline() {
790        let pipeline = Pipeline {
791            jobs: vec![
792                make_job("Build", DependsOn::None),
793                make_job("Test", DependsOn::Default),
794            ],
795            ..Default::default()
796        };
797
798        let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
799
800        assert_eq!(graph.stages.len(), 1);
801        assert_eq!(graph.stages[0].stage.stage, Some("__default".to_string()));
802        assert_eq!(graph.stages[0].jobs.len(), 2);
803    }
804
805    #[test]
806    fn test_explicit_none_dependency() {
807        let pipeline = make_pipeline_with_stages(vec![
808            make_stage("Build", DependsOn::None),
809            make_stage("Deploy", DependsOn::None), // Explicitly no dependencies
810        ]);
811
812        let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
813
814        // Both stages should have no dependencies and can run in parallel
815        let parallel = graph.parallel_stages();
816        assert_eq!(parallel.len(), 1);
817        assert_eq!(parallel[0].len(), 2);
818    }
819}