1use crate::execution::context::RuntimeContext;
5use crate::execution::events::{EventSender, ExecutionEvent, ProgressSender};
6use crate::execution::graph::{ExecutionGraph, GraphError, JobNode, StageNode};
7use crate::execution::matrix::MatrixExpander;
8use crate::parser::models::{
9 ExecutionContext, Job, JobResult, JobStatus, Pipeline, StageResult, StageStatus, Step,
10 StepAction, StepResult, StepStatus,
11};
12use crate::runners::container::ContainerRunner;
13use crate::runners::task::TaskRunner;
14
15use std::collections::HashMap;
16use std::path::PathBuf;
17use std::time::{Duration, Instant};
18
19#[derive(Debug, Clone)]
21pub struct ExecutionResult {
22 pub stages: Vec<StageResult>,
24 pub duration: Duration,
26 pub success: bool,
28 pub variables: HashMap<String, String>,
30}
31
32#[derive(Debug, Clone)]
34pub struct ExecutorConfig {
35 pub max_parallel_stages: usize,
37 pub max_parallel_jobs: usize,
39 pub default_step_timeout: u32,
41 pub continue_on_error: bool,
43 pub task_cache_dir: Option<PathBuf>,
45 pub enable_containers: bool,
47}
48
49impl Default for ExecutorConfig {
50 fn default() -> Self {
51 Self {
52 max_parallel_stages: 0,
53 max_parallel_jobs: 0,
54 default_step_timeout: 60,
55 continue_on_error: false,
56 task_cache_dir: None,
57 enable_containers: false,
58 }
59 }
60}
61
62pub struct PipelineExecutor {
64 graph: ExecutionGraph,
66 config: ExecutorConfig,
68 event_tx: Option<ProgressSender>,
70 task_runner: Option<TaskRunner>,
72 container_runner: Option<ContainerRunner>,
74}
75
76impl PipelineExecutor {
77 pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, GraphError> {
79 let graph = ExecutionGraph::from_pipeline(pipeline)?;
80 Ok(Self {
81 graph,
82 config: ExecutorConfig::default(),
83 event_tx: None,
84 task_runner: None,
85 container_runner: None,
86 })
87 }
88
89 pub fn new(graph: ExecutionGraph) -> Self {
91 Self {
92 graph,
93 config: ExecutorConfig::default(),
94 event_tx: None,
95 task_runner: None,
96 container_runner: None,
97 }
98 }
99
100 pub fn with_config(mut self, config: ExecutorConfig) -> Self {
102 if let Some(cache_dir) = &config.task_cache_dir {
104 self.task_runner = Some(TaskRunner::new(cache_dir.clone()));
105 }
106
107 if config.enable_containers {
109 self.container_runner = Some(ContainerRunner::new());
110 }
111
112 self.config = config;
113 self
114 }
115
116 pub fn with_progress(mut self, tx: ProgressSender) -> Self {
118 self.event_tx = Some(tx);
119 self
120 }
121
122 pub fn with_task_runner(mut self, cache_dir: PathBuf) -> Self {
124 self.task_runner = Some(TaskRunner::new(cache_dir));
125 self
126 }
127
128 pub fn with_container_runner(mut self) -> Self {
130 self.container_runner = Some(ContainerRunner::new());
131 self
132 }
133
134 pub async fn execute(&self, context: ExecutionContext) -> ExecutionResult {
136 let start = Instant::now();
137 let mut runtime = RuntimeContext::new(context);
138
139 let test_vars = runtime.variables.clone();
143 runtime.merge_pipeline_variables(&self.graph.variables);
144 for (k, v) in test_vars {
145 runtime.variables.insert(k, v);
146 }
147
148 let mut stage_results = Vec::new();
149 let mut overall_success = true;
150
151 self.event_tx.send_event(ExecutionEvent::pipeline_started(
153 &runtime.base.pipeline_name,
154 self.graph.stages.len(),
155 ));
156
157 let parallel_stages = self.graph.parallel_stages();
159
160 for stage_level in parallel_stages {
161 let level_results = self.execute_stage_level(&stage_level, &mut runtime).await;
163
164 for result in level_results {
165 let failed = result.status == StageStatus::Failed;
166 stage_results.push(result);
167
168 if failed {
169 overall_success = false;
170 if !self.config.continue_on_error {
171 break;
172 }
173 }
174 }
175
176 if !overall_success && !self.config.continue_on_error {
177 break;
178 }
179 }
180
181 let duration = start.elapsed();
182
183 self.event_tx.send_event(ExecutionEvent::pipeline_completed(
185 &runtime.base.pipeline_name,
186 overall_success,
187 duration,
188 ));
189
190 ExecutionResult {
191 stages: stage_results,
192 duration,
193 success: overall_success,
194 variables: runtime
195 .variables
196 .iter()
197 .map(|(k, v)| (k.clone(), v.as_string()))
198 .collect(),
199 }
200 }
201
202 async fn execute_stage_level(
204 &self,
205 stages: &[&StageNode],
206 runtime: &mut RuntimeContext,
207 ) -> Vec<StageResult> {
208 let mut results = Vec::new();
211
212 for stage in stages {
213 let result = self.execute_stage(stage, runtime).await;
214 results.push(result);
215 }
216
217 results
218 }
219
220 async fn execute_stage(
222 &self,
223 stage_node: &StageNode,
224 runtime: &mut RuntimeContext,
225 ) -> StageResult {
226 let start = Instant::now();
227 let stage = &stage_node.stage;
228 let stage_name = stage.stage.clone().unwrap_or_default();
229
230 if !stage_node.dependencies.is_empty()
232 && !runtime.dependencies_succeeded(&stage_node.dependencies, true)
233 {
234 self.event_tx.send_event(ExecutionEvent::StageSkipped {
235 stage_name: stage_name.clone(),
236 reason: "Dependencies failed".to_string(),
237 });
238
239 return StageResult {
240 stage_name: stage_name.clone(),
241 display_name: stage.display_name.clone(),
242 status: StageStatus::Skipped,
243 jobs: skipped_job_results(stage_node),
244 duration: start.elapsed(),
245 };
246 }
247
248 if let Some(condition) = &stage.condition {
250 match runtime.evaluate_condition(condition) {
251 Ok(true) => {} Ok(false) => {
253 self.event_tx.send_event(ExecutionEvent::StageSkipped {
254 stage_name: stage_name.clone(),
255 reason: format!("Condition '{}' evaluated to false", condition),
256 });
257
258 return StageResult {
259 stage_name: stage_name.clone(),
260 display_name: stage.display_name.clone(),
261 status: StageStatus::Skipped,
262 jobs: skipped_job_results(stage_node),
263 duration: start.elapsed(),
264 };
265 }
266 Err(e) => {
267 self.event_tx.send_event(ExecutionEvent::error(
268 format!("Condition evaluation failed: {}", e),
269 Some(stage_name.clone()),
270 None,
271 ));
272
273 return StageResult {
274 stage_name: stage_name.clone(),
275 display_name: stage.display_name.clone(),
276 status: StageStatus::Failed,
277 jobs: skipped_job_results(stage_node),
278 duration: start.elapsed(),
279 };
280 }
281 }
282 }
283
284 runtime.enter_stage(stage);
286
287 self.event_tx.send_event(ExecutionEvent::stage_started(
288 &stage_name,
289 stage.display_name.clone(),
290 stage_node.jobs.len(),
291 ));
292
293 let mut job_results = Vec::new();
295 let mut stage_status = StageStatus::Succeeded;
296
297 let parallel_jobs = self.graph.parallel_jobs(stage_node);
299
300 for job_level in parallel_jobs {
301 let level_results = self
303 .execute_job_level(&job_level, &stage_name, runtime)
304 .await;
305
306 for result in level_results {
307 if result.status == JobStatus::Failed {
308 stage_status = StageStatus::Failed;
309 } else if result.status == JobStatus::SucceededWithIssues
310 && stage_status == StageStatus::Succeeded
311 {
312 stage_status = StageStatus::SucceededWithIssues;
313 }
314 job_results.push(result);
315 }
316
317 if stage_status == StageStatus::Failed && !self.config.continue_on_error {
318 break;
319 }
320 }
321
322 let duration = start.elapsed();
323
324 let result = StageResult {
326 stage_name: stage_name.clone(),
327 display_name: stage.display_name.clone(),
328 status: stage_status.clone(),
329 jobs: job_results,
330 duration,
331 };
332
333 runtime.exit_stage(result.clone());
334
335 self.event_tx.send_event(ExecutionEvent::stage_completed(
336 &stage_name,
337 stage_status,
338 duration,
339 ));
340
341 result
342 }
343
344 async fn execute_job_level(
346 &self,
347 jobs: &[&JobNode],
348 stage_name: &str,
349 runtime: &mut RuntimeContext,
350 ) -> Vec<JobResult> {
351 let mut results = Vec::new();
354
355 for job_node in jobs {
356 let result = self.execute_job(job_node, stage_name, runtime).await;
357 results.push(result);
358 }
359
360 results
361 }
362
363 async fn execute_job(
365 &self,
366 job_node: &JobNode,
367 stage_name: &str,
368 runtime: &mut RuntimeContext,
369 ) -> JobResult {
370 let job = &job_node.job;
371 let job_name = job.identifier().unwrap_or("unknown").to_string();
372 let start = Instant::now();
373
374 if !job_node.dependencies.is_empty()
376 && !runtime.dependencies_succeeded(&job_node.dependencies, false)
377 {
378 self.event_tx.send_event(ExecutionEvent::JobSkipped {
379 stage_name: stage_name.to_string(),
380 job_name: job_name.clone(),
381 reason: "Dependencies failed".to_string(),
382 });
383
384 return JobResult {
385 job_name,
386 display_name: job.display_name.clone(),
387 status: JobStatus::Skipped,
388 steps: skipped_step_results(job),
389 duration: start.elapsed(),
390 outputs: HashMap::new(),
391 };
392 }
393
394 if let Some(condition) = &job.condition {
396 match runtime.evaluate_condition(condition) {
397 Ok(true) => {}
398 Ok(false) => {
399 self.event_tx.send_event(ExecutionEvent::JobSkipped {
400 stage_name: stage_name.to_string(),
401 job_name: job_name.clone(),
402 reason: format!("Condition '{}' evaluated to false", condition),
403 });
404
405 return JobResult {
406 job_name,
407 display_name: job.display_name.clone(),
408 status: JobStatus::Skipped,
409 steps: skipped_step_results(job),
410 duration: start.elapsed(),
411 outputs: HashMap::new(),
412 };
413 }
414 Err(e) => {
415 self.event_tx.send_event(ExecutionEvent::error(
416 format!("Condition evaluation failed: {}", e),
417 Some(stage_name.to_string()),
418 Some(job_name.clone()),
419 ));
420
421 return JobResult {
422 job_name,
423 display_name: job.display_name.clone(),
424 status: JobStatus::Failed,
425 steps: skipped_step_results(job),
426 duration: start.elapsed(),
427 outputs: HashMap::new(),
428 };
429 }
430 }
431 }
432
433 if let Some(strategy) = &job.strategy {
435 let instances = MatrixExpander::expand(strategy);
436 if !instances.is_empty() {
437 return self
439 .execute_matrix_job(job_node, stage_name, &instances, runtime)
440 .await;
441 }
442 }
443
444 self.execute_job_instance(job, stage_name, &job_name, None, runtime)
446 .await
447 }
448
449 async fn execute_matrix_job(
451 &self,
452 job_node: &JobNode,
453 stage_name: &str,
454 instances: &[super::matrix::MatrixInstance],
455 runtime: &mut RuntimeContext,
456 ) -> JobResult {
457 let job = &job_node.job;
458 let job_name = job.identifier().unwrap_or("unknown").to_string();
459 let start = Instant::now();
460
461 let _max_parallel = job
462 .strategy
463 .as_ref()
464 .and_then(|s| s.max_parallel)
465 .unwrap_or(instances.len() as u32);
466
467 let mut all_steps = Vec::new();
469 let mut overall_status = JobStatus::Succeeded;
470
471 for instance in instances {
473 for (var_name, var_value) in &instance.variables {
475 runtime.set_variable(var_name.clone(), var_value.clone());
476 }
477
478 let instance_result = self
479 .execute_job_instance(job, stage_name, &job_name, Some(&instance.name), runtime)
480 .await;
481
482 all_steps.extend(instance_result.steps);
483
484 if instance_result.status == JobStatus::Failed {
485 overall_status = JobStatus::Failed;
486 if !job.continue_on_error.as_bool() {
487 break;
488 }
489 } else if instance_result.status == JobStatus::SucceededWithIssues
490 && overall_status == JobStatus::Succeeded
491 {
492 overall_status = JobStatus::SucceededWithIssues;
493 }
494 }
495
496 JobResult {
497 job_name,
498 display_name: job.display_name.clone(),
499 status: overall_status,
500 steps: all_steps,
501 duration: start.elapsed(),
502 outputs: runtime
503 .step_outputs
504 .iter()
505 .flat_map(|(step_name, m)| {
506 m.iter()
507 .map(move |(k, v)| (format!("{}.{}", step_name, k), v.as_string()))
508 })
509 .collect(),
510 }
511 }
512
513 async fn execute_job_instance(
515 &self,
516 job: &Job,
517 stage_name: &str,
518 job_name: &str,
519 matrix_instance: Option<&str>,
520 runtime: &mut RuntimeContext,
521 ) -> JobResult {
522 let start = Instant::now();
523
524 runtime.enter_job(job);
525
526 let deployment_steps = if job.deployment.is_some() {
528 collect_deployment_steps(job)
529 } else {
530 Vec::new()
531 };
532
533 let effective_steps: Vec<&Step> = if !deployment_steps.is_empty() {
534 deployment_steps.iter().collect()
535 } else {
536 job.steps.iter().collect()
537 };
538
539 self.event_tx.send_event(ExecutionEvent::job_started(
540 stage_name,
541 job_name,
542 job.display_name.clone(),
543 matrix_instance.map(String::from),
544 effective_steps.len(),
545 ));
546
547 let mut step_results = Vec::new();
548 let mut job_status = JobStatus::Succeeded;
549 let mut should_run = true;
550
551 for (step_index, step) in effective_steps.iter().enumerate() {
552 if !should_run && !should_always_run(step) {
553 let resolved_display = step.display_name.as_ref().and_then(|dn| {
555 runtime
556 .substitute_variables(dn)
557 .ok()
558 .or_else(|| Some(dn.clone()))
559 });
560 let skipped = StepResult {
561 step_name: step.name.clone(),
562 display_name: resolved_display,
563 status: StepStatus::Skipped,
564 output: String::new(),
565 error: None,
566 duration: Duration::ZERO,
567 exit_code: None,
568 outputs: HashMap::new(),
569 };
570 step_results.push(skipped);
571 continue;
572 }
573
574 let result = self
575 .execute_step(step, step_index, stage_name, job_name, runtime)
576 .await;
577
578 runtime.record_step_result(result.clone());
579
580 match result.status {
581 StepStatus::Failed => {
582 if !step.continue_on_error.as_bool() {
583 should_run = false;
584 job_status = JobStatus::Failed;
585 } else {
586 job_status = JobStatus::SucceededWithIssues;
587 }
588 }
589 StepStatus::SucceededWithIssues => {
590 if job_status == JobStatus::Succeeded {
591 job_status = JobStatus::SucceededWithIssues;
592 }
593 }
594 _ => {}
595 }
596
597 step_results.push(result);
598 }
599
600 let duration = start.elapsed();
601
602 let result = JobResult {
603 job_name: job_name.to_string(),
604 display_name: job.display_name.clone(),
605 status: job_status.clone(),
606 steps: step_results,
607 duration,
608 outputs: runtime
609 .step_outputs
610 .iter()
611 .flat_map(|(step_name, m)| {
612 m.iter()
613 .map(move |(k, v)| (format!("{}.{}", step_name, k), v.as_string()))
614 })
615 .collect(),
616 };
617
618 runtime.exit_job(result.clone());
619
620 self.event_tx.send_event(ExecutionEvent::job_completed(
621 stage_name,
622 job_name,
623 matrix_instance.map(String::from),
624 job_status,
625 duration,
626 ));
627
628 result
629 }
630
631 async fn execute_step(
633 &self,
634 step: &Step,
635 step_index: usize,
636 stage_name: &str,
637 job_name: &str,
638 runtime: &mut RuntimeContext,
639 ) -> StepResult {
640 let start = Instant::now();
641 let step_name = step.name.clone();
642
643 let display_name = step.display_name.as_ref().and_then(|dn| {
645 runtime
646 .substitute_variables(dn)
647 .ok()
648 .or_else(|| Some(dn.clone()))
649 });
650
651 if !step.enabled {
653 self.event_tx.send_event(ExecutionEvent::StepSkipped {
654 stage_name: stage_name.to_string(),
655 job_name: job_name.to_string(),
656 step_name: step_name.clone(),
657 step_index,
658 reason: "Step is disabled".to_string(),
659 });
660
661 return StepResult {
662 step_name,
663 display_name: display_name.clone(),
664 status: StepStatus::Skipped,
665 output: String::new(),
666 error: None,
667 duration: start.elapsed(),
668 exit_code: None,
669 outputs: HashMap::new(),
670 };
671 }
672
673 if let Some(condition) = &step.condition {
675 match runtime.evaluate_condition(condition) {
676 Ok(true) => {}
677 Ok(false) => {
678 self.event_tx.send_event(ExecutionEvent::StepSkipped {
679 stage_name: stage_name.to_string(),
680 job_name: job_name.to_string(),
681 step_name: step_name.clone(),
682 step_index,
683 reason: format!("Condition '{}' evaluated to false", condition),
684 });
685
686 return StepResult {
687 step_name,
688 display_name: display_name.clone(),
689 status: StepStatus::Skipped,
690 output: String::new(),
691 error: None,
692 duration: start.elapsed(),
693 exit_code: None,
694 outputs: HashMap::new(),
695 };
696 }
697 Err(e) => {
698 return StepResult {
699 step_name,
700 display_name: display_name.clone(),
701 status: StepStatus::Failed,
702 output: String::new(),
703 error: Some(format!("Condition evaluation failed: {}", e)),
704 duration: start.elapsed(),
705 exit_code: None,
706 outputs: HashMap::new(),
707 };
708 }
709 }
710 }
711
712 self.event_tx.send_event(ExecutionEvent::step_started(
714 stage_name,
715 job_name,
716 step_name.clone(),
717 display_name.clone(),
718 step_index,
719 ));
720
721 let mut result = self
723 .execute_step_action(
724 &step.action,
725 step,
726 step_index,
727 stage_name,
728 job_name,
729 runtime,
730 )
731 .await;
732
733 result.display_name = display_name;
735
736 self.event_tx.send_event(ExecutionEvent::step_completed(
738 stage_name,
739 job_name,
740 step_name.clone(),
741 step_index,
742 result.status.clone(),
743 result.duration,
744 result.exit_code,
745 ));
746
747 result
748 }
749
750 async fn execute_step_action(
752 &self,
753 action: &StepAction,
754 step: &Step,
755 step_index: usize,
756 stage_name: &str,
757 job_name: &str,
758 runtime: &mut RuntimeContext,
759 ) -> StepResult {
760 let start = Instant::now();
761 let step_name = step.name.clone();
762
763 match action {
764 StepAction::Script(script_step) => {
765 self.execute_script(
766 &script_step.script,
767 script_step.working_directory.as_deref(),
768 script_step.fail_on_stderr,
769 step,
770 step_index,
771 stage_name,
772 job_name,
773 runtime,
774 )
775 .await
776 }
777 StepAction::Bash(bash_step) => {
778 self.execute_bash(
779 &bash_step.bash,
780 bash_step.working_directory.as_deref(),
781 bash_step.fail_on_stderr,
782 step,
783 step_index,
784 stage_name,
785 job_name,
786 runtime,
787 )
788 .await
789 }
790 StepAction::Pwsh(pwsh_step) => {
791 self.execute_pwsh(
792 &pwsh_step.pwsh,
793 pwsh_step.working_directory.as_deref(),
794 pwsh_step.fail_on_stderr,
795 step,
796 step_index,
797 stage_name,
798 job_name,
799 runtime,
800 )
801 .await
802 }
803 StepAction::PowerShell(ps_step) => {
804 self.execute_powershell(
805 &ps_step.powershell,
806 ps_step.working_directory.as_deref(),
807 ps_step.fail_on_stderr,
808 step,
809 step_index,
810 stage_name,
811 job_name,
812 runtime,
813 )
814 .await
815 }
816 StepAction::Task(task_step) => {
817 if let Some(task_runner) = &self.task_runner {
819 let working_dir = std::path::PathBuf::from(&runtime.base.working_dir);
820 let env = runtime.env_as_strings();
821
822 match task_runner
823 .execute_task(&task_step.task, &task_step.inputs, &env, &working_dir)
824 .await
825 {
826 Ok(mut result) => {
827 result.step_name = step_name;
828 result.display_name = step.display_name.clone();
829
830 if !result.output.is_empty() {
832 self.event_tx.send_event(ExecutionEvent::step_output(
833 stage_name,
834 job_name,
835 result.step_name.clone(),
836 step_index,
837 &result.output,
838 false,
839 ));
840 }
841
842 result
843 }
844 Err(e) => StepResult {
845 step_name,
846 display_name: step.display_name.clone(),
847 status: StepStatus::Failed,
848 output: String::new(),
849 error: Some(format!("Task execution failed: {}", e)),
850 duration: start.elapsed(),
851 exit_code: None,
852 outputs: HashMap::new(),
853 },
854 }
855 } else {
856 self.event_tx.send_event(ExecutionEvent::step_output(
858 stage_name,
859 job_name,
860 step_name.clone(),
861 step_index,
862 format!("Task runner not configured. Task: {}", task_step.task),
863 true,
864 ));
865
866 StepResult {
867 step_name,
868 display_name: step.display_name.clone(),
869 status: StepStatus::Skipped,
870 output: format!(
871 "Task: {} (skipped - task runner not configured)",
872 task_step.task
873 ),
874 error: None,
875 duration: start.elapsed(),
876 exit_code: None,
877 outputs: HashMap::new(),
878 }
879 }
880 }
881 StepAction::Checkout(_) => {
882 StepResult {
884 step_name,
885 display_name: step.display_name.clone(),
886 status: StepStatus::Succeeded,
887 output: "Checkout: Using existing working directory".to_string(),
888 error: None,
889 duration: start.elapsed(),
890 exit_code: Some(0),
891 outputs: HashMap::new(),
892 }
893 }
894 StepAction::Template(_) => {
895 StepResult {
897 step_name,
898 display_name: step.display_name.clone(),
899 status: StepStatus::Skipped,
900 output: "Template step (should be expanded)".to_string(),
901 error: None,
902 duration: start.elapsed(),
903 exit_code: None,
904 outputs: HashMap::new(),
905 }
906 }
907 StepAction::Download(_) | StepAction::Publish(_) => {
908 StepResult {
910 step_name,
911 display_name: step.display_name.clone(),
912 status: StepStatus::Succeeded,
913 output: "Artifact operation (placeholder)".to_string(),
914 error: None,
915 duration: start.elapsed(),
916 exit_code: Some(0),
917 outputs: HashMap::new(),
918 }
919 }
920 StepAction::GetPackage(_) | StepAction::ReviewApp(_) => {
921 StepResult {
923 step_name,
924 display_name: step.display_name.clone(),
925 status: StepStatus::Skipped,
926 output: "Step type not implemented".to_string(),
927 error: None,
928 duration: start.elapsed(),
929 exit_code: None,
930 outputs: HashMap::new(),
931 }
932 }
933 }
934 }
935
936 #[allow(clippy::too_many_arguments)]
938 async fn execute_script(
939 &self,
940 script: &str,
941 working_directory: Option<&str>,
942 fail_on_stderr: bool,
943 step: &Step,
944 step_index: usize,
945 stage_name: &str,
946 job_name: &str,
947 runtime: &mut RuntimeContext,
948 ) -> StepResult {
949 let script = match runtime.substitute_variables(script) {
951 Ok(s) => s,
952 Err(e) => {
953 return StepResult {
954 step_name: step.name.clone(),
955 display_name: step.display_name.clone(),
956 status: StepStatus::Failed,
957 output: String::new(),
958 error: Some(format!("Variable substitution failed: {}", e)),
959 duration: Duration::ZERO,
960 exit_code: None,
961 outputs: HashMap::new(),
962 };
963 }
964 };
965
966 self.run_shell_command(
967 &script,
968 "sh",
969 &["-c"],
970 working_directory,
971 fail_on_stderr,
972 step,
973 step_index,
974 stage_name,
975 job_name,
976 runtime,
977 )
978 .await
979 }
980
981 #[allow(clippy::too_many_arguments)]
983 async fn execute_bash(
984 &self,
985 script: &str,
986 working_directory: Option<&str>,
987 fail_on_stderr: bool,
988 step: &Step,
989 step_index: usize,
990 stage_name: &str,
991 job_name: &str,
992 runtime: &mut RuntimeContext,
993 ) -> StepResult {
994 let script = match runtime.substitute_variables(script) {
995 Ok(s) => s,
996 Err(e) => {
997 return StepResult {
998 step_name: step.name.clone(),
999 display_name: step.display_name.clone(),
1000 status: StepStatus::Failed,
1001 output: String::new(),
1002 error: Some(format!("Variable substitution failed: {}", e)),
1003 duration: Duration::ZERO,
1004 exit_code: None,
1005 outputs: HashMap::new(),
1006 };
1007 }
1008 };
1009
1010 self.run_shell_command(
1011 &script,
1012 "bash",
1013 &["-c"],
1014 working_directory,
1015 fail_on_stderr,
1016 step,
1017 step_index,
1018 stage_name,
1019 job_name,
1020 runtime,
1021 )
1022 .await
1023 }
1024
1025 #[allow(clippy::too_many_arguments)]
1027 async fn execute_pwsh(
1028 &self,
1029 script: &str,
1030 working_directory: Option<&str>,
1031 fail_on_stderr: bool,
1032 step: &Step,
1033 step_index: usize,
1034 stage_name: &str,
1035 job_name: &str,
1036 runtime: &mut RuntimeContext,
1037 ) -> StepResult {
1038 let script = match runtime.substitute_variables(script) {
1039 Ok(s) => s,
1040 Err(e) => {
1041 return StepResult {
1042 step_name: step.name.clone(),
1043 display_name: step.display_name.clone(),
1044 status: StepStatus::Failed,
1045 output: String::new(),
1046 error: Some(format!("Variable substitution failed: {}", e)),
1047 duration: Duration::ZERO,
1048 exit_code: None,
1049 outputs: HashMap::new(),
1050 };
1051 }
1052 };
1053
1054 self.run_shell_command(
1055 &script,
1056 "pwsh",
1057 &["-Command"],
1058 working_directory,
1059 fail_on_stderr,
1060 step,
1061 step_index,
1062 stage_name,
1063 job_name,
1064 runtime,
1065 )
1066 .await
1067 }
1068
1069 #[allow(clippy::too_many_arguments)]
1071 async fn execute_powershell(
1072 &self,
1073 script: &str,
1074 working_directory: Option<&str>,
1075 fail_on_stderr: bool,
1076 step: &Step,
1077 step_index: usize,
1078 stage_name: &str,
1079 job_name: &str,
1080 runtime: &mut RuntimeContext,
1081 ) -> StepResult {
1082 let script = match runtime.substitute_variables(script) {
1083 Ok(s) => s,
1084 Err(e) => {
1085 return StepResult {
1086 step_name: step.name.clone(),
1087 display_name: step.display_name.clone(),
1088 status: StepStatus::Failed,
1089 output: String::new(),
1090 error: Some(format!("Variable substitution failed: {}", e)),
1091 duration: Duration::ZERO,
1092 exit_code: None,
1093 outputs: HashMap::new(),
1094 };
1095 }
1096 };
1097
1098 let (shell, args): (&str, &[&str]) = if cfg!(target_os = "windows") {
1100 ("powershell.exe", &["-Command"])
1101 } else {
1102 ("pwsh", &["-Command"])
1103 };
1104
1105 self.run_shell_command(
1106 &script,
1107 shell,
1108 args,
1109 working_directory,
1110 fail_on_stderr,
1111 step,
1112 step_index,
1113 stage_name,
1114 job_name,
1115 runtime,
1116 )
1117 .await
1118 }
1119
1120 #[allow(clippy::too_many_arguments)]
1122 async fn run_shell_command(
1123 &self,
1124 script: &str,
1125 shell: &str,
1126 shell_args: &[&str],
1127 working_directory: Option<&str>,
1128 fail_on_stderr: bool,
1129 step: &Step,
1130 step_index: usize,
1131 stage_name: &str,
1132 job_name: &str,
1133 runtime: &mut RuntimeContext,
1134 ) -> StepResult {
1135 use tokio::process::Command;
1136 let start = Instant::now();
1137
1138 let working_dir = working_directory
1139 .map(|d| d.to_string())
1140 .unwrap_or_else(|| runtime.base.working_dir.clone());
1141
1142 let mut env = runtime.env_as_strings();
1144 for (k, v) in &step.env {
1145 let value = runtime
1147 .substitute_variables(v)
1148 .unwrap_or_else(|_| v.clone());
1149 env.insert(k.clone(), value);
1150 }
1151
1152 let mut cmd = Command::new(shell);
1153 cmd.args(shell_args);
1154 cmd.arg(script);
1155 cmd.current_dir(&working_dir);
1156 cmd.envs(&env);
1157
1158 cmd.stdout(std::process::Stdio::piped());
1160 cmd.stderr(std::process::Stdio::piped());
1161
1162 let output = match cmd.output().await {
1163 Ok(output) => output,
1164 Err(e) => {
1165 return StepResult {
1166 step_name: step.name.clone(),
1167 display_name: step.display_name.clone(),
1168 status: StepStatus::Failed,
1169 output: String::new(),
1170 error: Some(format!("Failed to execute command: {}", e)),
1171 duration: start.elapsed(),
1172 exit_code: None,
1173 outputs: HashMap::new(),
1174 };
1175 }
1176 };
1177
1178 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1179 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1180
1181 if !stdout.is_empty() {
1183 self.event_tx.send_event(ExecutionEvent::step_output(
1184 stage_name,
1185 job_name,
1186 step.name.clone(),
1187 step_index,
1188 &stdout,
1189 false,
1190 ));
1191 }
1192
1193 if !stderr.is_empty() {
1194 self.event_tx.send_event(ExecutionEvent::step_output(
1195 stage_name,
1196 job_name,
1197 step.name.clone(),
1198 step_index,
1199 &stderr,
1200 true,
1201 ));
1202 }
1203
1204 let outputs = parse_logging_commands(&stdout, runtime);
1206
1207 let exit_code = output.status.code();
1209 let status = if !output.status.success() || (fail_on_stderr && !stderr.is_empty()) {
1210 StepStatus::Failed
1211 } else {
1212 StepStatus::Succeeded
1213 };
1214
1215 StepResult {
1216 step_name: step.name.clone(),
1217 display_name: step.display_name.clone(),
1218 status,
1219 output: stdout,
1220 error: if stderr.is_empty() {
1221 None
1222 } else {
1223 Some(stderr)
1224 },
1225 duration: start.elapsed(),
1226 exit_code,
1227 outputs,
1228 }
1229 }
1230}
1231
1232fn should_always_run(step: &Step) -> bool {
1234 step.condition
1235 .as_ref()
1236 .map(|c| c.contains("always()"))
1237 .unwrap_or(false)
1238}
1239
1240fn skipped_step_results(job: &Job) -> Vec<StepResult> {
1242 let steps: Vec<&Step> = if job.deployment.is_some() {
1244 let deployment = collect_deployment_steps(job);
1245 if !deployment.is_empty() {
1246 return deployment
1247 .iter()
1248 .map(|step| StepResult {
1249 step_name: step.name.clone(),
1250 display_name: step.display_name.clone(),
1251 status: StepStatus::Skipped,
1252 output: String::new(),
1253 error: None,
1254 duration: Duration::ZERO,
1255 exit_code: None,
1256 outputs: HashMap::new(),
1257 })
1258 .collect();
1259 }
1260 job.steps.iter().collect()
1261 } else {
1262 job.steps.iter().collect()
1263 };
1264
1265 steps
1266 .iter()
1267 .map(|step| StepResult {
1268 step_name: step.name.clone(),
1269 display_name: step.display_name.clone(),
1270 status: StepStatus::Skipped,
1271 output: String::new(),
1272 error: None,
1273 duration: Duration::ZERO,
1274 exit_code: None,
1275 outputs: HashMap::new(),
1276 })
1277 .collect()
1278}
1279
1280fn collect_deployment_steps(job: &Job) -> Vec<Step> {
1287 let mut steps = Vec::new();
1288
1289 if let Some(strategy) = &job.strategy {
1290 let extract_from_hooks =
1292 |hooks: &crate::parser::models::DeploymentHooks, steps: &mut Vec<Step>| {
1293 if let Some(hook) = &hooks.pre_deploy {
1294 steps.extend(hook.steps.clone());
1295 }
1296 if let Some(hook) = &hooks.deploy {
1297 steps.extend(hook.steps.clone());
1298 }
1299 if let Some(hook) = &hooks.route_traffic {
1300 steps.extend(hook.steps.clone());
1301 }
1302 if let Some(hook) = &hooks.post_route_traffic {
1303 steps.extend(hook.steps.clone());
1304 }
1305 };
1306
1307 if let Some(run_once) = &strategy.run_once {
1308 extract_from_hooks(run_once, &mut steps);
1309 }
1310 if let Some(rolling) = &strategy.rolling {
1311 extract_from_hooks(&rolling.hooks, &mut steps);
1312 }
1313 if let Some(canary) = &strategy.canary {
1314 extract_from_hooks(&canary.hooks, &mut steps);
1315 }
1316 }
1317
1318 steps
1319}
1320
1321fn skipped_job_results(stage_node: &StageNode) -> Vec<JobResult> {
1323 stage_node
1324 .jobs
1325 .iter()
1326 .map(|job_node| {
1327 let job = &job_node.job;
1328 let job_name = job.identifier().unwrap_or("unknown").to_string();
1329 JobResult {
1330 job_name,
1331 display_name: job.display_name.clone(),
1332 status: JobStatus::Skipped,
1333 steps: skipped_step_results(job),
1334 duration: Duration::ZERO,
1335 outputs: HashMap::new(),
1336 }
1337 })
1338 .collect()
1339}
1340
1341fn parse_logging_commands(output: &str, runtime: &mut RuntimeContext) -> HashMap<String, String> {
1343 let mut outputs = HashMap::new();
1344
1345 for line in output.lines() {
1346 if let Some(rest) = line.strip_prefix("##vso[task.setvariable") {
1348 if let Some((props, value)) = rest.split_once(']') {
1349 let mut var_name = None;
1350 let mut is_output = false;
1351 let mut is_secret = false;
1352
1353 for prop in props.split(';') {
1354 let prop = prop.trim();
1355 if let Some(name) = prop.strip_prefix("variable=") {
1356 var_name = Some(name.to_string());
1357 } else if prop.eq_ignore_ascii_case("isoutput=true") {
1358 is_output = true;
1359 } else if prop.eq_ignore_ascii_case("issecret=true") {
1360 is_secret = true;
1361 }
1362 }
1363
1364 if let Some(name) = var_name {
1365 let value = value.to_string();
1366 if is_output {
1367 outputs.insert(name.clone(), value.clone());
1368 }
1369 if !is_secret {
1370 runtime.set_variable(name, crate::parser::models::Value::String(value));
1371 }
1372 }
1373 }
1374 }
1375 }
1376
1377 outputs
1378}
1379
1380#[cfg(test)]
1381mod tests {
1382 use super::*;
1383 use crate::parser::models::{BoolOrExpression, DependsOn, Job, ScriptStep, Stage, Step};
1384
1385 fn make_simple_pipeline() -> Pipeline {
1386 Pipeline {
1387 name: Some("test-pipeline".to_string()),
1388 stages: vec![Stage {
1389 stage: Some("Build".to_string()),
1390 display_name: None,
1391 depends_on: DependsOn::None,
1392 condition: None,
1393 variables: Vec::new(),
1394 jobs: vec![Job {
1395 job: Some("BuildJob".to_string()),
1396 deployment: None,
1397 display_name: None,
1398 depends_on: DependsOn::None,
1399 condition: None,
1400 strategy: None,
1401 pool: None,
1402 container: None,
1403 services: HashMap::new(),
1404 variables: Vec::new(),
1405 steps: vec![Step {
1406 name: Some("echo".to_string()),
1407 display_name: Some("Echo Hello".to_string()),
1408 condition: None,
1409 continue_on_error: BoolOrExpression::default(),
1410 enabled: true,
1411 timeout_in_minutes: None,
1412 retry_count_on_task_failure: None,
1413 env: HashMap::new(),
1414 action: StepAction::Script(ScriptStep {
1415 script: "echo Hello".to_string(),
1416 working_directory: None,
1417 fail_on_stderr: false,
1418 }),
1419 }],
1420 timeout_in_minutes: None,
1421 cancel_timeout_in_minutes: None,
1422 continue_on_error: BoolOrExpression::default(),
1423 workspace: None,
1424 uses: None,
1425 template: None,
1426 parameters: HashMap::new(),
1427 environment: None,
1428 has_template_directives: false,
1429 }],
1430 lock_behavior: None,
1431 template: None,
1432 parameters: HashMap::new(),
1433 pool: None,
1434 has_template_directives: false,
1435 }],
1436 ..Default::default()
1437 }
1438 }
1439
1440 #[test]
1441 fn test_executor_creation() {
1442 let pipeline = make_simple_pipeline();
1443 let executor = PipelineExecutor::from_pipeline(&pipeline).unwrap();
1444
1445 assert_eq!(executor.graph.stages.len(), 1);
1446 }
1447
1448 #[tokio::test]
1449 async fn test_simple_execution() {
1450 let pipeline = make_simple_pipeline();
1451 let executor = PipelineExecutor::from_pipeline(&pipeline).unwrap();
1452
1453 let context = ExecutionContext::new(
1454 "test".to_string(),
1455 std::env::current_dir()
1456 .unwrap()
1457 .to_string_lossy()
1458 .to_string(),
1459 );
1460
1461 let result = executor.execute(context).await;
1462
1463 assert_eq!(result.stages.len(), 1);
1464 assert_eq!(result.stages[0].stage_name, "Build");
1465 assert_eq!(result.stages[0].status, StageStatus::Succeeded);
1466 }
1467
1468 #[test]
1469 fn test_parse_logging_commands() {
1470 let base = ExecutionContext::new("test".to_string(), "/work".to_string());
1471 let mut runtime = RuntimeContext::new(base);
1472
1473 let output = r#"
1474Hello
1475##vso[task.setvariable variable=version]1.0.0
1476##vso[task.setvariable variable=output;isoutput=true]result
1477World
1478"#;
1479
1480 let outputs = parse_logging_commands(output, &mut runtime);
1481
1482 assert_eq!(outputs.get("output"), Some(&"result".to_string()));
1483 assert_eq!(
1484 runtime.variables.get("version"),
1485 Some(&crate::parser::models::Value::String("1.0.0".to_string()))
1486 );
1487 }
1488
1489 #[test]
1490 fn test_should_always_run() {
1491 let step_with_always = Step {
1492 name: None,
1493 display_name: None,
1494 condition: Some("always()".to_string()),
1495 continue_on_error: BoolOrExpression::default(),
1496 enabled: true,
1497 timeout_in_minutes: None,
1498 retry_count_on_task_failure: None,
1499 env: HashMap::new(),
1500 action: StepAction::Script(ScriptStep {
1501 script: "echo".to_string(),
1502 working_directory: None,
1503 fail_on_stderr: false,
1504 }),
1505 };
1506
1507 let step_without_always = Step {
1508 name: None,
1509 display_name: None,
1510 condition: Some("succeeded()".to_string()),
1511 continue_on_error: BoolOrExpression::default(),
1512 enabled: true,
1513 timeout_in_minutes: None,
1514 retry_count_on_task_failure: None,
1515 env: HashMap::new(),
1516 action: StepAction::Script(ScriptStep {
1517 script: "echo".to_string(),
1518 working_directory: None,
1519 fail_on_stderr: false,
1520 }),
1521 };
1522
1523 assert!(should_always_run(&step_with_always));
1524 assert!(!should_always_run(&step_without_always));
1525 }
1526}