1use crate::parser::models::{BoolOrExpression, DependsOn, Job, Pipeline, Stage, Variable};
5
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::fmt;
8
9#[derive(Debug, Clone)]
11pub struct GraphError {
12 pub message: String,
13 pub kind: GraphErrorKind,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum GraphErrorKind {
18 CyclicDependency,
20 UnknownDependency,
22 InvalidStructure,
24}
25
26impl fmt::Display for GraphError {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 write!(f, "graph error: {}", self.message)
29 }
30}
31
32impl std::error::Error for GraphError {}
33
34impl GraphError {
35 pub fn cyclic(message: impl Into<String>) -> Self {
36 Self {
37 message: message.into(),
38 kind: GraphErrorKind::CyclicDependency,
39 }
40 }
41
42 pub fn unknown_dependency(message: impl Into<String>) -> Self {
43 Self {
44 message: message.into(),
45 kind: GraphErrorKind::UnknownDependency,
46 }
47 }
48
49 pub fn invalid_structure(message: impl Into<String>) -> Self {
50 Self {
51 message: message.into(),
52 kind: GraphErrorKind::InvalidStructure,
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct ExecutionGraph {
60 pub stages: Vec<StageNode>,
62 stage_indices: HashMap<String, usize>,
64 pub variables: Vec<Variable>,
66}
67
68#[derive(Debug, Clone)]
70pub struct StageNode {
71 pub stage: Stage,
73 pub dependencies: Vec<String>,
75 pub jobs: Vec<JobNode>,
77 job_indices: HashMap<String, usize>,
79}
80
81#[derive(Debug, Clone)]
83pub struct JobNode {
84 pub job: Job,
86 pub dependencies: Vec<String>,
88 pub matrix_instances: Vec<super::matrix::MatrixInstance>,
90}
91
92impl ExecutionGraph {
93 pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, GraphError> {
95 let stages = Self::normalize_to_stages(pipeline)?;
97
98 let mut stage_nodes = Vec::with_capacity(stages.len());
100 let mut stage_indices = HashMap::new();
101
102 for (i, stage) in stages.iter().enumerate() {
103 let stage_name = stage.stage.clone().unwrap_or_default();
104 stage_indices.insert(stage_name.clone(), i);
105
106 let dependencies = Self::calculate_stage_dependencies(stage, i, &stages);
108
109 let jobs = Self::build_job_nodes(&stage.jobs)?;
111
112 stage_nodes.push(StageNode {
113 stage: stage.clone(),
114 dependencies,
115 jobs,
116 job_indices: stage
117 .jobs
118 .iter()
119 .enumerate()
120 .filter_map(|(idx, j)| j.identifier().map(|name| (name.to_string(), idx)))
121 .collect(),
122 });
123 }
124
125 let graph = Self {
126 stages: stage_nodes,
127 stage_indices,
128 variables: pipeline.variables.clone(),
129 };
130
131 graph.validate()?;
133
134 Ok(graph)
135 }
136
137 fn normalize_to_stages(pipeline: &Pipeline) -> Result<Vec<Stage>, GraphError> {
139 if !pipeline.stages.is_empty() {
141 return Ok(pipeline.stages.clone());
142 }
143
144 if !pipeline.jobs.is_empty() {
146 return Ok(vec![Stage {
147 stage: Some("__default".to_string()),
148 display_name: None,
149 depends_on: DependsOn::None,
150 condition: None,
151 variables: Vec::new(),
152 jobs: pipeline.jobs.clone(),
153 lock_behavior: None,
154 template: None,
155 parameters: HashMap::new(),
156 pool: pipeline.pool.clone(),
157 has_template_directives: false,
158 }]);
159 }
160
161 if !pipeline.steps.is_empty() {
163 let job = Job {
164 job: Some("__default".to_string()),
165 deployment: None,
166 display_name: None,
167 depends_on: DependsOn::None,
168 condition: None,
169 strategy: None,
170 pool: pipeline.pool.clone(),
171 container: None,
172 services: HashMap::new(),
173 variables: Vec::new(),
174 steps: pipeline.steps.clone(),
175 timeout_in_minutes: None,
176 cancel_timeout_in_minutes: None,
177 continue_on_error: BoolOrExpression::default(),
178 workspace: None,
179 uses: None,
180 template: None,
181 parameters: HashMap::new(),
182 environment: None,
183 has_template_directives: false,
184 };
185
186 return Ok(vec![Stage {
187 stage: Some("__default".to_string()),
188 display_name: None,
189 depends_on: DependsOn::None,
190 condition: None,
191 variables: Vec::new(),
192 jobs: vec![job],
193 lock_behavior: None,
194 template: None,
195 parameters: HashMap::new(),
196 pool: pipeline.pool.clone(),
197 has_template_directives: false,
198 }]);
199 }
200
201 Ok(Vec::new())
203 }
204
205 fn calculate_stage_dependencies(
207 stage: &Stage,
208 index: usize,
209 all_stages: &[Stage],
210 ) -> Vec<String> {
211 match &stage.depends_on {
212 DependsOn::Default => {
213 if index > 0 {
215 vec![all_stages[index - 1].stage.clone().unwrap_or_default()]
216 } else {
217 vec![]
218 }
219 }
220 DependsOn::None => vec![],
221 DependsOn::Single(name) => vec![name.clone()],
222 DependsOn::Multiple(names) => names.clone(),
223 }
224 }
225
226 fn build_job_nodes(jobs: &[Job]) -> Result<Vec<JobNode>, GraphError> {
228 let mut job_nodes = Vec::with_capacity(jobs.len());
229 let job_names: HashSet<_> = jobs.iter().filter_map(|j| j.identifier()).collect();
230
231 for (i, job) in jobs.iter().enumerate() {
232 let dependencies = Self::calculate_job_dependencies(job, i, jobs, &job_names)?;
233
234 job_nodes.push(JobNode {
235 job: job.clone(),
236 dependencies,
237 matrix_instances: Vec::new(), });
239 }
240
241 Ok(job_nodes)
242 }
243
244 fn calculate_job_dependencies(
246 job: &Job,
247 index: usize,
248 all_jobs: &[Job],
249 job_names: &HashSet<&str>,
250 ) -> Result<Vec<String>, GraphError> {
251 let deps = match &job.depends_on {
252 DependsOn::Default => {
253 if index > 0 {
255 if let Some(prev_name) = all_jobs[index - 1].identifier() {
256 vec![prev_name.to_string()]
257 } else {
258 vec![]
259 }
260 } else {
261 vec![]
262 }
263 }
264 DependsOn::None => vec![],
265 DependsOn::Single(name) => vec![name.clone()],
266 DependsOn::Multiple(names) => names.clone(),
267 };
268
269 for dep in &deps {
271 if !job_names.contains(dep.as_str()) {
272 return Err(GraphError::unknown_dependency(format!(
273 "job '{}' depends on unknown job '{}'",
274 job.identifier().unwrap_or("unknown"),
275 dep
276 )));
277 }
278 }
279
280 Ok(deps)
281 }
282
283 pub fn validate(&self) -> Result<(), GraphError> {
285 let stage_names: HashSet<_> = self
287 .stages
288 .iter()
289 .map(|s| s.stage.stage.as_deref().unwrap_or(""))
290 .collect();
291
292 for stage_node in &self.stages {
293 for dep in &stage_node.dependencies {
294 if !stage_names.contains(dep.as_str()) {
295 return Err(GraphError::unknown_dependency(format!(
296 "stage '{}' depends on unknown stage '{}'",
297 stage_node.stage.stage.as_deref().unwrap_or("unknown"),
298 dep
299 )));
300 }
301 }
302 }
303
304 self.detect_stage_cycles()?;
306
307 for stage_node in &self.stages {
309 self.detect_job_cycles(stage_node)?;
310 }
311
312 Ok(())
313 }
314
315 fn detect_stage_cycles(&self) -> Result<(), GraphError> {
317 let mut visited = HashSet::new();
318 let mut rec_stack = HashSet::new();
319
320 for stage_node in &self.stages {
321 if !visited.contains(stage_node.stage.stage.as_deref().unwrap_or("")) {
322 if let Some(cycle) = self.dfs_stage_cycle(stage_node, &mut visited, &mut rec_stack)
323 {
324 return Err(GraphError::cyclic(format!(
325 "circular dependency detected in stages: {}",
326 cycle.join(" -> ")
327 )));
328 }
329 }
330 }
331
332 Ok(())
333 }
334
335 fn dfs_stage_cycle(
336 &self,
337 node: &StageNode,
338 visited: &mut HashSet<String>,
339 rec_stack: &mut HashSet<String>,
340 ) -> Option<Vec<String>> {
341 let name = node.stage.stage.clone().unwrap_or_default();
342 visited.insert(name.clone());
343 rec_stack.insert(name.clone());
344
345 for dep in &node.dependencies {
346 if !visited.contains(dep) {
347 if let Some(stage_idx) = self.stage_indices.get(dep) {
348 if let Some(mut cycle) =
349 self.dfs_stage_cycle(&self.stages[*stage_idx], visited, rec_stack)
350 {
351 cycle.insert(0, name.clone());
352 return Some(cycle);
353 }
354 }
355 } else if rec_stack.contains(dep) {
356 return Some(vec![name.clone(), dep.clone()]);
357 }
358 }
359
360 rec_stack.remove(&name);
361 None
362 }
363
364 fn detect_job_cycles(&self, stage: &StageNode) -> Result<(), GraphError> {
366 let mut visited = HashSet::new();
367 let mut rec_stack = HashSet::new();
368
369 for job_node in &stage.jobs {
370 let job_name = job_node.job.identifier().unwrap_or("unknown").to_string();
371 if !visited.contains(&job_name) {
372 if let Some(cycle) =
373 self.dfs_job_cycle(stage, job_node, &mut visited, &mut rec_stack)
374 {
375 return Err(GraphError::cyclic(format!(
376 "circular dependency detected in jobs of stage '{}': {}",
377 stage.stage.stage.as_deref().unwrap_or("unknown"),
378 cycle.join(" -> ")
379 )));
380 }
381 }
382 }
383
384 Ok(())
385 }
386
387 #[allow(clippy::only_used_in_recursion)]
388 fn dfs_job_cycle(
389 &self,
390 stage: &StageNode,
391 node: &JobNode,
392 visited: &mut HashSet<String>,
393 rec_stack: &mut HashSet<String>,
394 ) -> Option<Vec<String>> {
395 let name = node.job.identifier().unwrap_or("unknown").to_string();
396 visited.insert(name.clone());
397 rec_stack.insert(name.clone());
398
399 for dep in &node.dependencies {
400 if !visited.contains(dep) {
401 if let Some(job_idx) = stage.job_indices.get(dep) {
402 if let Some(mut cycle) =
403 self.dfs_job_cycle(stage, &stage.jobs[*job_idx], visited, rec_stack)
404 {
405 cycle.insert(0, name.clone());
406 return Some(cycle);
407 }
408 }
409 } else if rec_stack.contains(dep) {
410 return Some(vec![name.clone(), dep.clone()]);
411 }
412 }
413
414 rec_stack.remove(&name);
415 None
416 }
417
418 pub fn topological_order(&self) -> Vec<&StageNode> {
420 let mut in_degree: HashMap<&str, usize> = HashMap::new();
422 let mut adj_list: HashMap<&str, Vec<&str>> = HashMap::new();
423
424 for stage in &self.stages {
426 let name = stage.stage.stage.as_deref().unwrap_or("");
427 in_degree.entry(name).or_insert(0);
428 adj_list.entry(name).or_default();
429
430 for dep in &stage.dependencies {
431 adj_list.entry(dep.as_str()).or_default().push(name);
432 *in_degree.entry(name).or_insert(0) += 1;
433 }
434 }
435
436 let mut queue: VecDeque<&str> = in_degree
438 .iter()
439 .filter(|(_, °)| deg == 0)
440 .map(|(&name, _)| name)
441 .collect();
442
443 let mut result = Vec::new();
444
445 while let Some(name) = queue.pop_front() {
446 if let Some(idx) = self.stage_indices.get(name) {
447 result.push(&self.stages[*idx]);
448 }
449
450 if let Some(neighbors) = adj_list.get(name) {
451 for &neighbor in neighbors {
452 if let Some(deg) = in_degree.get_mut(neighbor) {
453 *deg -= 1;
454 if *deg == 0 {
455 queue.push_back(neighbor);
456 }
457 }
458 }
459 }
460 }
461
462 result
463 }
464
465 pub fn get_stage(&self, name: &str) -> Option<&StageNode> {
467 self.stage_indices.get(name).map(|&idx| &self.stages[idx])
468 }
469
470 pub fn jobs_topological_order<'a>(&self, stage: &'a StageNode) -> Vec<&'a JobNode> {
472 let mut in_degree: HashMap<&str, usize> = HashMap::new();
473 let mut adj_list: HashMap<&str, Vec<&str>> = HashMap::new();
474
475 for job in &stage.jobs {
477 let name = job.job.identifier().unwrap_or("unknown");
478 in_degree.entry(name).or_insert(0);
479 adj_list.entry(name).or_default();
480
481 for dep in &job.dependencies {
482 adj_list.entry(dep.as_str()).or_default().push(name);
483 *in_degree.entry(name).or_insert(0) += 1;
484 }
485 }
486
487 let mut queue: VecDeque<&str> = in_degree
489 .iter()
490 .filter(|(_, °)| deg == 0)
491 .map(|(&name, _)| name)
492 .collect();
493
494 let mut result = Vec::new();
495
496 while let Some(name) = queue.pop_front() {
497 if let Some(idx) = stage.job_indices.get(name) {
498 result.push(&stage.jobs[*idx]);
499 }
500
501 if let Some(neighbors) = adj_list.get(name) {
502 for &neighbor in neighbors {
503 if let Some(deg) = in_degree.get_mut(neighbor) {
504 *deg -= 1;
505 if *deg == 0 {
506 queue.push_back(neighbor);
507 }
508 }
509 }
510 }
511 }
512
513 result
514 }
515
516 pub fn parallel_stages(&self) -> Vec<Vec<&StageNode>> {
518 let mut levels: Vec<Vec<&StageNode>> = Vec::new();
519 let mut assigned: HashMap<&str, usize> = HashMap::new();
520
521 for stage in self.topological_order() {
522 let name = stage.stage.stage.as_deref().unwrap_or("");
523 let level = if stage.dependencies.is_empty() {
524 0
525 } else {
526 stage
527 .dependencies
528 .iter()
529 .filter_map(|dep| assigned.get(dep.as_str()))
530 .max()
531 .map(|l| l + 1)
532 .unwrap_or(0)
533 };
534
535 assigned.insert(name, level);
536
537 if level >= levels.len() {
538 levels.resize(level + 1, Vec::new());
539 }
540 levels[level].push(stage);
541 }
542
543 levels
544 }
545
546 pub fn parallel_jobs<'a>(&self, stage: &'a StageNode) -> Vec<Vec<&'a JobNode>> {
548 let mut levels: Vec<Vec<&'a JobNode>> = Vec::new();
549 let mut assigned: HashMap<&str, usize> = HashMap::new();
550
551 for job in self.jobs_topological_order(stage) {
552 let name = job.job.identifier().unwrap_or("unknown");
553 let level = if job.dependencies.is_empty() {
554 0
555 } else {
556 job.dependencies
557 .iter()
558 .filter_map(|dep| assigned.get(dep.as_str()))
559 .max()
560 .map(|l| l + 1)
561 .unwrap_or(0)
562 };
563
564 assigned.insert(name, level);
565
566 if level >= levels.len() {
567 levels.resize(level + 1, Vec::new());
568 }
569 levels[level].push(job);
570 }
571
572 levels
573 }
574}
575
576impl StageNode {
577 pub fn get_job(&self, name: &str) -> Option<&JobNode> {
579 self.job_indices.get(name).map(|&idx| &self.jobs[idx])
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586
587 fn make_pipeline_with_stages(stages: Vec<Stage>) -> Pipeline {
588 Pipeline {
589 stages,
590 ..Default::default()
591 }
592 }
593
594 fn make_stage(name: &str, depends_on: DependsOn) -> Stage {
595 Stage {
596 stage: Some(name.to_string()),
597 display_name: None,
598 depends_on,
599 condition: None,
600 variables: Vec::new(),
601 jobs: vec![make_job("Job1", DependsOn::None)],
602 lock_behavior: None,
603 template: None,
604 parameters: HashMap::new(),
605 pool: None,
606 has_template_directives: false,
607 }
608 }
609
610 fn make_job(name: &str, depends_on: DependsOn) -> Job {
611 Job {
612 job: Some(name.to_string()),
613 deployment: None,
614 display_name: None,
615 depends_on,
616 condition: None,
617 strategy: None,
618 pool: None,
619 container: None,
620 services: HashMap::new(),
621 variables: Vec::new(),
622 steps: Vec::new(),
623 timeout_in_minutes: None,
624 cancel_timeout_in_minutes: None,
625 continue_on_error: BoolOrExpression::default(),
626 workspace: None,
627 uses: None,
628 template: None,
629 parameters: HashMap::new(),
630 environment: None,
631 has_template_directives: false,
632 }
633 }
634
635 #[test]
636 fn test_simple_linear_stages() {
637 let pipeline = make_pipeline_with_stages(vec![
638 make_stage("Build", DependsOn::None),
639 make_stage("Test", DependsOn::Default), make_stage("Deploy", DependsOn::Default), ]);
642
643 let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
644
645 assert_eq!(graph.stages.len(), 3);
646 assert!(graph.stages[0].dependencies.is_empty());
647 assert_eq!(graph.stages[1].dependencies, vec!["Build"]);
648 assert_eq!(graph.stages[2].dependencies, vec!["Test"]);
649
650 let order: Vec<_> = graph.topological_order();
652 assert_eq!(order.len(), 3);
653 assert_eq!(order[0].stage.stage, Some("Build".to_string()));
654 assert_eq!(order[1].stage.stage, Some("Test".to_string()));
655 assert_eq!(order[2].stage.stage, Some("Deploy".to_string()));
656 }
657
658 #[test]
659 fn test_parallel_stages() {
660 let pipeline = make_pipeline_with_stages(vec![
661 make_stage("Build", DependsOn::None),
662 make_stage("UnitTest", DependsOn::Single("Build".to_string())),
663 make_stage("IntegrationTest", DependsOn::Single("Build".to_string())),
664 make_stage(
665 "Deploy",
666 DependsOn::Multiple(vec!["UnitTest".to_string(), "IntegrationTest".to_string()]),
667 ),
668 ]);
669
670 let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
671
672 let parallel = graph.parallel_stages();
673 assert_eq!(parallel.len(), 3);
674
675 assert_eq!(parallel[0].len(), 1);
677 assert_eq!(parallel[0][0].stage.stage, Some("Build".to_string()));
678
679 assert_eq!(parallel[1].len(), 2);
681
682 assert_eq!(parallel[2].len(), 1);
684 assert_eq!(parallel[2][0].stage.stage, Some("Deploy".to_string()));
685 }
686
687 #[test]
688 fn test_cycle_detection_stages() {
689 let pipeline = make_pipeline_with_stages(vec![
690 make_stage("A", DependsOn::Single("C".to_string())),
691 make_stage("B", DependsOn::Single("A".to_string())),
692 make_stage("C", DependsOn::Single("B".to_string())),
693 ]);
694
695 let result = ExecutionGraph::from_pipeline(&pipeline);
696 assert!(result.is_err());
697 let err = result.unwrap_err();
698 assert_eq!(err.kind, GraphErrorKind::CyclicDependency);
699 }
700
701 #[test]
702 fn test_unknown_dependency() {
703 let pipeline = make_pipeline_with_stages(vec![
704 make_stage("Build", DependsOn::None),
705 make_stage("Test", DependsOn::Single("Unknown".to_string())),
706 ]);
707
708 let result = ExecutionGraph::from_pipeline(&pipeline);
709 assert!(result.is_err());
710 let err = result.unwrap_err();
711 assert_eq!(err.kind, GraphErrorKind::UnknownDependency);
712 }
713
714 #[test]
715 fn test_jobs_within_stage() {
716 let mut stage = make_stage("Build", DependsOn::None);
717 stage.jobs = vec![
718 make_job("Compile", DependsOn::None),
719 make_job("Lint", DependsOn::None),
720 make_job(
721 "Package",
722 DependsOn::Multiple(vec!["Compile".to_string(), "Lint".to_string()]),
723 ),
724 ];
725
726 let pipeline = make_pipeline_with_stages(vec![stage]);
727 let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
728
729 let stage_node = &graph.stages[0];
730 let parallel_jobs = graph.parallel_jobs(stage_node);
731
732 assert_eq!(parallel_jobs[0].len(), 2);
734
735 assert_eq!(parallel_jobs[1].len(), 1);
737 assert_eq!(parallel_jobs[1][0].job.identifier(), Some("Package"));
738 }
739
740 #[test]
741 fn test_job_cycle_detection() {
742 let mut stage = make_stage("Build", DependsOn::None);
743 stage.jobs = vec![
744 make_job("A", DependsOn::Single("C".to_string())),
745 make_job("B", DependsOn::Single("A".to_string())),
746 make_job("C", DependsOn::Single("B".to_string())),
747 ];
748
749 let pipeline = make_pipeline_with_stages(vec![stage]);
750 let result = ExecutionGraph::from_pipeline(&pipeline);
751
752 assert!(result.is_err());
753 let err = result.unwrap_err();
754 assert_eq!(err.kind, GraphErrorKind::CyclicDependency);
755 }
756
757 #[test]
758 fn test_normalize_steps_only_pipeline() {
759 use crate::parser::models::{ScriptStep, Step, StepAction};
760
761 let pipeline = Pipeline {
762 steps: vec![Step {
763 name: Some("echo".to_string()),
764 display_name: Some("Echo Hello".to_string()),
765 condition: None,
766 continue_on_error: BoolOrExpression::default(),
767 enabled: true,
768 timeout_in_minutes: None,
769 retry_count_on_task_failure: None,
770 env: HashMap::new(),
771 action: StepAction::Script(ScriptStep {
772 script: "echo hello".to_string(),
773 working_directory: None,
774 fail_on_stderr: false,
775 }),
776 }],
777 ..Default::default()
778 };
779
780 let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
781
782 assert_eq!(graph.stages.len(), 1);
783 assert_eq!(graph.stages[0].stage.stage, Some("__default".to_string()));
784 assert_eq!(graph.stages[0].jobs.len(), 1);
785 assert_eq!(graph.stages[0].jobs[0].job.steps.len(), 1);
786 }
787
788 #[test]
789 fn test_normalize_jobs_only_pipeline() {
790 let pipeline = Pipeline {
791 jobs: vec![
792 make_job("Build", DependsOn::None),
793 make_job("Test", DependsOn::Default),
794 ],
795 ..Default::default()
796 };
797
798 let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
799
800 assert_eq!(graph.stages.len(), 1);
801 assert_eq!(graph.stages[0].stage.stage, Some("__default".to_string()));
802 assert_eq!(graph.stages[0].jobs.len(), 2);
803 }
804
805 #[test]
806 fn test_explicit_none_dependency() {
807 let pipeline = make_pipeline_with_stages(vec![
808 make_stage("Build", DependsOn::None),
809 make_stage("Deploy", DependsOn::None), ]);
811
812 let graph = ExecutionGraph::from_pipeline(&pipeline).unwrap();
813
814 let parallel = graph.parallel_stages();
816 assert_eq!(parallel.len(), 1);
817 assert_eq!(parallel[0].len(), 2);
818 }
819}