Skip to main content

pipeline_service/execution/
executor.rs

1// Pipeline Executor
2// Orchestrates pipeline execution with DAG-based scheduling
3
4use crate::execution::context::RuntimeContext;
5use crate::execution::events::{EventSender, ExecutionEvent, ProgressSender};
6use crate::execution::graph::{ExecutionGraph, GraphError, JobNode, StageNode};
7use crate::execution::matrix::MatrixExpander;
8use crate::parser::models::{
9    ExecutionContext, Job, JobResult, JobStatus, Pipeline, StageResult, StageStatus, Step,
10    StepAction, StepResult, StepStatus,
11};
12use crate::runners::container::ContainerRunner;
13use crate::runners::task::TaskRunner;
14
15use std::collections::HashMap;
16use std::path::PathBuf;
17use std::time::{Duration, Instant};
18
19/// Result of pipeline execution
20#[derive(Debug, Clone)]
21pub struct ExecutionResult {
22    /// All stage results
23    pub stages: Vec<StageResult>,
24    /// Total duration
25    pub duration: Duration,
26    /// Overall success
27    pub success: bool,
28    /// Final variables state
29    pub variables: HashMap<String, String>,
30}
31
32/// Configuration for pipeline execution
33#[derive(Debug, Clone)]
34pub struct ExecutorConfig {
35    /// Maximum parallel stages (0 = unlimited)
36    pub max_parallel_stages: usize,
37    /// Maximum parallel jobs within a stage (0 = unlimited)
38    pub max_parallel_jobs: usize,
39    /// Default timeout for steps (in minutes)
40    pub default_step_timeout: u32,
41    /// Whether to continue on error at pipeline level
42    pub continue_on_error: bool,
43    /// Task cache directory
44    pub task_cache_dir: Option<PathBuf>,
45    /// Whether to enable container support
46    pub enable_containers: bool,
47}
48
49impl Default for ExecutorConfig {
50    fn default() -> Self {
51        Self {
52            max_parallel_stages: 0,
53            max_parallel_jobs: 0,
54            default_step_timeout: 60,
55            continue_on_error: false,
56            task_cache_dir: None,
57            enable_containers: false,
58        }
59    }
60}
61
62/// Pipeline executor
63pub struct PipelineExecutor {
64    /// Execution graph
65    graph: ExecutionGraph,
66    /// Configuration
67    config: ExecutorConfig,
68    /// Progress event sender
69    event_tx: Option<ProgressSender>,
70    /// Task runner for Azure DevOps tasks
71    task_runner: Option<TaskRunner>,
72    /// Container runner for Docker-based jobs
73    container_runner: Option<ContainerRunner>,
74}
75
76impl PipelineExecutor {
77    /// Create a new executor from a pipeline
78    pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, GraphError> {
79        let graph = ExecutionGraph::from_pipeline(pipeline)?;
80        Ok(Self {
81            graph,
82            config: ExecutorConfig::default(),
83            event_tx: None,
84            task_runner: None,
85            container_runner: None,
86        })
87    }
88
89    /// Create a new executor from an execution graph
90    pub fn new(graph: ExecutionGraph) -> Self {
91        Self {
92            graph,
93            config: ExecutorConfig::default(),
94            event_tx: None,
95            task_runner: None,
96            container_runner: None,
97        }
98    }
99
100    /// Set executor configuration
101    pub fn with_config(mut self, config: ExecutorConfig) -> Self {
102        // Set up task runner if cache dir is specified
103        if let Some(cache_dir) = &config.task_cache_dir {
104            self.task_runner = Some(TaskRunner::new(cache_dir.clone()));
105        }
106
107        // Set up container runner if enabled
108        if config.enable_containers {
109            self.container_runner = Some(ContainerRunner::new());
110        }
111
112        self.config = config;
113        self
114    }
115
116    /// Set progress event sender
117    pub fn with_progress(mut self, tx: ProgressSender) -> Self {
118        self.event_tx = Some(tx);
119        self
120    }
121
122    /// Enable task execution with the specified cache directory
123    pub fn with_task_runner(mut self, cache_dir: PathBuf) -> Self {
124        self.task_runner = Some(TaskRunner::new(cache_dir));
125        self
126    }
127
128    /// Enable container execution
129    pub fn with_container_runner(mut self) -> Self {
130        self.container_runner = Some(ContainerRunner::new());
131        self
132    }
133
134    /// Execute the pipeline
135    pub async fn execute(&self, context: ExecutionContext) -> ExecutionResult {
136        let start = Instant::now();
137        let mut runtime = RuntimeContext::new(context);
138
139        // Merge pipeline-level variables (test-provided variables from ExecutionContext
140        // were already loaded in RuntimeContext::new, so save them, merge pipeline vars,
141        // then re-apply test vars so they take precedence)
142        let test_vars = runtime.variables.clone();
143        runtime.merge_pipeline_variables(&self.graph.variables);
144        for (k, v) in test_vars {
145            runtime.variables.insert(k, v);
146        }
147
148        let mut stage_results = Vec::new();
149        let mut overall_success = true;
150
151        // Send pipeline started event
152        self.event_tx.send_event(ExecutionEvent::pipeline_started(
153            &runtime.base.pipeline_name,
154            self.graph.stages.len(),
155        ));
156
157        // Execute stages in topological order, respecting parallelism
158        let parallel_stages = self.graph.parallel_stages();
159
160        for stage_level in parallel_stages {
161            // Execute stages at this level (potentially in parallel)
162            let level_results = self.execute_stage_level(&stage_level, &mut runtime).await;
163
164            for result in level_results {
165                let failed = result.status == StageStatus::Failed;
166                stage_results.push(result);
167
168                if failed {
169                    overall_success = false;
170                    if !self.config.continue_on_error {
171                        break;
172                    }
173                }
174            }
175
176            if !overall_success && !self.config.continue_on_error {
177                break;
178            }
179        }
180
181        let duration = start.elapsed();
182
183        // Send pipeline completed event
184        self.event_tx.send_event(ExecutionEvent::pipeline_completed(
185            &runtime.base.pipeline_name,
186            overall_success,
187            duration,
188        ));
189
190        ExecutionResult {
191            stages: stage_results,
192            duration,
193            success: overall_success,
194            variables: runtime
195                .variables
196                .iter()
197                .map(|(k, v)| (k.clone(), v.as_string()))
198                .collect(),
199        }
200    }
201
202    /// Execute a level of stages (can run in parallel)
203    async fn execute_stage_level(
204        &self,
205        stages: &[&StageNode],
206        runtime: &mut RuntimeContext,
207    ) -> Vec<StageResult> {
208        // For now, execute sequentially
209        // TODO: Add parallel execution with semaphore limiting
210        let mut results = Vec::new();
211
212        for stage in stages {
213            let result = self.execute_stage(stage, runtime).await;
214            results.push(result);
215        }
216
217        results
218    }
219
220    /// Execute a single stage
221    async fn execute_stage(
222        &self,
223        stage_node: &StageNode,
224        runtime: &mut RuntimeContext,
225    ) -> StageResult {
226        let start = Instant::now();
227        let stage = &stage_node.stage;
228        let stage_name = stage.stage.clone().unwrap_or_default();
229
230        // Check dependencies
231        if !stage_node.dependencies.is_empty()
232            && !runtime.dependencies_succeeded(&stage_node.dependencies, true)
233        {
234            self.event_tx.send_event(ExecutionEvent::StageSkipped {
235                stage_name: stage_name.clone(),
236                reason: "Dependencies failed".to_string(),
237            });
238
239            return StageResult {
240                stage_name: stage_name.clone(),
241                display_name: stage.display_name.clone(),
242                status: StageStatus::Skipped,
243                jobs: skipped_job_results(stage_node),
244                duration: start.elapsed(),
245            };
246        }
247
248        // Evaluate condition
249        if let Some(condition) = &stage.condition {
250            match runtime.evaluate_condition(condition) {
251                Ok(true) => {} // Continue
252                Ok(false) => {
253                    self.event_tx.send_event(ExecutionEvent::StageSkipped {
254                        stage_name: stage_name.clone(),
255                        reason: format!("Condition '{}' evaluated to false", condition),
256                    });
257
258                    return StageResult {
259                        stage_name: stage_name.clone(),
260                        display_name: stage.display_name.clone(),
261                        status: StageStatus::Skipped,
262                        jobs: skipped_job_results(stage_node),
263                        duration: start.elapsed(),
264                    };
265                }
266                Err(e) => {
267                    self.event_tx.send_event(ExecutionEvent::error(
268                        format!("Condition evaluation failed: {}", e),
269                        Some(stage_name.clone()),
270                        None,
271                    ));
272
273                    return StageResult {
274                        stage_name: stage_name.clone(),
275                        display_name: stage.display_name.clone(),
276                        status: StageStatus::Failed,
277                        jobs: skipped_job_results(stage_node),
278                        duration: start.elapsed(),
279                    };
280                }
281            }
282        }
283
284        // Enter stage
285        runtime.enter_stage(stage);
286
287        self.event_tx.send_event(ExecutionEvent::stage_started(
288            &stage_name,
289            stage.display_name.clone(),
290            stage_node.jobs.len(),
291        ));
292
293        // Execute jobs
294        let mut job_results = Vec::new();
295        let mut stage_status = StageStatus::Succeeded;
296
297        // Get jobs in topological order with parallel levels
298        let parallel_jobs = self.graph.parallel_jobs(stage_node);
299
300        for job_level in parallel_jobs {
301            // Execute jobs at this level
302            let level_results = self
303                .execute_job_level(&job_level, &stage_name, runtime)
304                .await;
305
306            for result in level_results {
307                if result.status == JobStatus::Failed {
308                    stage_status = StageStatus::Failed;
309                } else if result.status == JobStatus::SucceededWithIssues
310                    && stage_status == StageStatus::Succeeded
311                {
312                    stage_status = StageStatus::SucceededWithIssues;
313                }
314                job_results.push(result);
315            }
316
317            if stage_status == StageStatus::Failed && !self.config.continue_on_error {
318                break;
319            }
320        }
321
322        let duration = start.elapsed();
323
324        // Exit stage
325        let result = StageResult {
326            stage_name: stage_name.clone(),
327            display_name: stage.display_name.clone(),
328            status: stage_status.clone(),
329            jobs: job_results,
330            duration,
331        };
332
333        runtime.exit_stage(result.clone());
334
335        self.event_tx.send_event(ExecutionEvent::stage_completed(
336            &stage_name,
337            stage_status,
338            duration,
339        ));
340
341        result
342    }
343
344    /// Execute a level of jobs (can run in parallel)
345    async fn execute_job_level(
346        &self,
347        jobs: &[&JobNode],
348        stage_name: &str,
349        runtime: &mut RuntimeContext,
350    ) -> Vec<JobResult> {
351        // For now, execute sequentially
352        // TODO: Add parallel execution with semaphore and max_parallel
353        let mut results = Vec::new();
354
355        for job_node in jobs {
356            let result = self.execute_job(job_node, stage_name, runtime).await;
357            results.push(result);
358        }
359
360        results
361    }
362
363    /// Execute a single job (potentially with matrix expansion)
364    async fn execute_job(
365        &self,
366        job_node: &JobNode,
367        stage_name: &str,
368        runtime: &mut RuntimeContext,
369    ) -> JobResult {
370        let job = &job_node.job;
371        let job_name = job.identifier().unwrap_or("unknown").to_string();
372        let start = Instant::now();
373
374        // Check dependencies
375        if !job_node.dependencies.is_empty()
376            && !runtime.dependencies_succeeded(&job_node.dependencies, false)
377        {
378            self.event_tx.send_event(ExecutionEvent::JobSkipped {
379                stage_name: stage_name.to_string(),
380                job_name: job_name.clone(),
381                reason: "Dependencies failed".to_string(),
382            });
383
384            return JobResult {
385                job_name,
386                display_name: job.display_name.clone(),
387                status: JobStatus::Skipped,
388                steps: skipped_step_results(job),
389                duration: start.elapsed(),
390                outputs: HashMap::new(),
391            };
392        }
393
394        // Evaluate condition
395        if let Some(condition) = &job.condition {
396            match runtime.evaluate_condition(condition) {
397                Ok(true) => {}
398                Ok(false) => {
399                    self.event_tx.send_event(ExecutionEvent::JobSkipped {
400                        stage_name: stage_name.to_string(),
401                        job_name: job_name.clone(),
402                        reason: format!("Condition '{}' evaluated to false", condition),
403                    });
404
405                    return JobResult {
406                        job_name,
407                        display_name: job.display_name.clone(),
408                        status: JobStatus::Skipped,
409                        steps: skipped_step_results(job),
410                        duration: start.elapsed(),
411                        outputs: HashMap::new(),
412                    };
413                }
414                Err(e) => {
415                    self.event_tx.send_event(ExecutionEvent::error(
416                        format!("Condition evaluation failed: {}", e),
417                        Some(stage_name.to_string()),
418                        Some(job_name.clone()),
419                    ));
420
421                    return JobResult {
422                        job_name,
423                        display_name: job.display_name.clone(),
424                        status: JobStatus::Failed,
425                        steps: skipped_step_results(job),
426                        duration: start.elapsed(),
427                        outputs: HashMap::new(),
428                    };
429                }
430            }
431        }
432
433        // Handle matrix expansion
434        if let Some(strategy) = &job.strategy {
435            let instances = MatrixExpander::expand(strategy);
436            if !instances.is_empty() {
437                // Execute matrix instances
438                return self
439                    .execute_matrix_job(job_node, stage_name, &instances, runtime)
440                    .await;
441            }
442        }
443
444        // Execute single job instance
445        self.execute_job_instance(job, stage_name, &job_name, None, runtime)
446            .await
447    }
448
449    /// Execute a job with matrix expansion
450    async fn execute_matrix_job(
451        &self,
452        job_node: &JobNode,
453        stage_name: &str,
454        instances: &[super::matrix::MatrixInstance],
455        runtime: &mut RuntimeContext,
456    ) -> JobResult {
457        let job = &job_node.job;
458        let job_name = job.identifier().unwrap_or("unknown").to_string();
459        let start = Instant::now();
460
461        let _max_parallel = job
462            .strategy
463            .as_ref()
464            .and_then(|s| s.max_parallel)
465            .unwrap_or(instances.len() as u32);
466
467        // TODO: Use max_parallel with a semaphore for parallel matrix instance execution
468        let mut all_steps = Vec::new();
469        let mut overall_status = JobStatus::Succeeded;
470
471        // Execute each matrix instance
472        for instance in instances {
473            // Apply matrix variables to runtime
474            for (var_name, var_value) in &instance.variables {
475                runtime.set_variable(var_name.clone(), var_value.clone());
476            }
477
478            let instance_result = self
479                .execute_job_instance(job, stage_name, &job_name, Some(&instance.name), runtime)
480                .await;
481
482            all_steps.extend(instance_result.steps);
483
484            if instance_result.status == JobStatus::Failed {
485                overall_status = JobStatus::Failed;
486                if !job.continue_on_error.as_bool() {
487                    break;
488                }
489            } else if instance_result.status == JobStatus::SucceededWithIssues
490                && overall_status == JobStatus::Succeeded
491            {
492                overall_status = JobStatus::SucceededWithIssues;
493            }
494        }
495
496        JobResult {
497            job_name,
498            display_name: job.display_name.clone(),
499            status: overall_status,
500            steps: all_steps,
501            duration: start.elapsed(),
502            outputs: runtime
503                .step_outputs
504                .iter()
505                .flat_map(|(step_name, m)| {
506                    m.iter()
507                        .map(move |(k, v)| (format!("{}.{}", step_name, k), v.as_string()))
508                })
509                .collect(),
510        }
511    }
512
513    /// Execute a single job instance
514    async fn execute_job_instance(
515        &self,
516        job: &Job,
517        stage_name: &str,
518        job_name: &str,
519        matrix_instance: Option<&str>,
520        runtime: &mut RuntimeContext,
521    ) -> JobResult {
522        let start = Instant::now();
523
524        runtime.enter_job(job);
525
526        // For deployment jobs, collect steps from strategy hooks
527        let deployment_steps = if job.deployment.is_some() {
528            collect_deployment_steps(job)
529        } else {
530            Vec::new()
531        };
532
533        let effective_steps: Vec<&Step> = if !deployment_steps.is_empty() {
534            deployment_steps.iter().collect()
535        } else {
536            job.steps.iter().collect()
537        };
538
539        self.event_tx.send_event(ExecutionEvent::job_started(
540            stage_name,
541            job_name,
542            job.display_name.clone(),
543            matrix_instance.map(String::from),
544            effective_steps.len(),
545        ));
546
547        let mut step_results = Vec::new();
548        let mut job_status = JobStatus::Succeeded;
549        let mut should_run = true;
550
551        for (step_index, step) in effective_steps.iter().enumerate() {
552            if !should_run && !should_always_run(step) {
553                // Skip remaining steps if a previous step failed
554                let resolved_display = step.display_name.as_ref().and_then(|dn| {
555                    runtime
556                        .substitute_variables(dn)
557                        .ok()
558                        .or_else(|| Some(dn.clone()))
559                });
560                let skipped = StepResult {
561                    step_name: step.name.clone(),
562                    display_name: resolved_display,
563                    status: StepStatus::Skipped,
564                    output: String::new(),
565                    error: None,
566                    duration: Duration::ZERO,
567                    exit_code: None,
568                    outputs: HashMap::new(),
569                };
570                step_results.push(skipped);
571                continue;
572            }
573
574            let result = self
575                .execute_step(step, step_index, stage_name, job_name, runtime)
576                .await;
577
578            runtime.record_step_result(result.clone());
579
580            match result.status {
581                StepStatus::Failed => {
582                    if !step.continue_on_error.as_bool() {
583                        should_run = false;
584                        job_status = JobStatus::Failed;
585                    } else {
586                        job_status = JobStatus::SucceededWithIssues;
587                    }
588                }
589                StepStatus::SucceededWithIssues => {
590                    if job_status == JobStatus::Succeeded {
591                        job_status = JobStatus::SucceededWithIssues;
592                    }
593                }
594                _ => {}
595            }
596
597            step_results.push(result);
598        }
599
600        let duration = start.elapsed();
601
602        let result = JobResult {
603            job_name: job_name.to_string(),
604            display_name: job.display_name.clone(),
605            status: job_status.clone(),
606            steps: step_results,
607            duration,
608            outputs: runtime
609                .step_outputs
610                .iter()
611                .flat_map(|(step_name, m)| {
612                    m.iter()
613                        .map(move |(k, v)| (format!("{}.{}", step_name, k), v.as_string()))
614                })
615                .collect(),
616        };
617
618        runtime.exit_job(result.clone());
619
620        self.event_tx.send_event(ExecutionEvent::job_completed(
621            stage_name,
622            job_name,
623            matrix_instance.map(String::from),
624            job_status,
625            duration,
626        ));
627
628        result
629    }
630
631    /// Execute a single step
632    async fn execute_step(
633        &self,
634        step: &Step,
635        step_index: usize,
636        stage_name: &str,
637        job_name: &str,
638        runtime: &mut RuntimeContext,
639    ) -> StepResult {
640        let start = Instant::now();
641        let step_name = step.name.clone();
642
643        // Resolve display name by substituting variables (e.g., "Build for $(targetTriple)")
644        let display_name = step.display_name.as_ref().and_then(|dn| {
645            runtime
646                .substitute_variables(dn)
647                .ok()
648                .or_else(|| Some(dn.clone()))
649        });
650
651        // Check if step is enabled
652        if !step.enabled {
653            self.event_tx.send_event(ExecutionEvent::StepSkipped {
654                stage_name: stage_name.to_string(),
655                job_name: job_name.to_string(),
656                step_name: step_name.clone(),
657                step_index,
658                reason: "Step is disabled".to_string(),
659            });
660
661            return StepResult {
662                step_name,
663                display_name: display_name.clone(),
664                status: StepStatus::Skipped,
665                output: String::new(),
666                error: None,
667                duration: start.elapsed(),
668                exit_code: None,
669                outputs: HashMap::new(),
670            };
671        }
672
673        // Evaluate condition
674        if let Some(condition) = &step.condition {
675            match runtime.evaluate_condition(condition) {
676                Ok(true) => {}
677                Ok(false) => {
678                    self.event_tx.send_event(ExecutionEvent::StepSkipped {
679                        stage_name: stage_name.to_string(),
680                        job_name: job_name.to_string(),
681                        step_name: step_name.clone(),
682                        step_index,
683                        reason: format!("Condition '{}' evaluated to false", condition),
684                    });
685
686                    return StepResult {
687                        step_name,
688                        display_name: display_name.clone(),
689                        status: StepStatus::Skipped,
690                        output: String::new(),
691                        error: None,
692                        duration: start.elapsed(),
693                        exit_code: None,
694                        outputs: HashMap::new(),
695                    };
696                }
697                Err(e) => {
698                    return StepResult {
699                        step_name,
700                        display_name: display_name.clone(),
701                        status: StepStatus::Failed,
702                        output: String::new(),
703                        error: Some(format!("Condition evaluation failed: {}", e)),
704                        duration: start.elapsed(),
705                        exit_code: None,
706                        outputs: HashMap::new(),
707                    };
708                }
709            }
710        }
711
712        // Send step started event
713        self.event_tx.send_event(ExecutionEvent::step_started(
714            stage_name,
715            job_name,
716            step_name.clone(),
717            display_name.clone(),
718            step_index,
719        ));
720
721        // Execute the step based on its action type
722        let mut result = self
723            .execute_step_action(
724                &step.action,
725                step,
726                step_index,
727                stage_name,
728                job_name,
729                runtime,
730            )
731            .await;
732
733        // Override display_name with variable-substituted version
734        result.display_name = display_name;
735
736        // Send step completed event
737        self.event_tx.send_event(ExecutionEvent::step_completed(
738            stage_name,
739            job_name,
740            step_name.clone(),
741            step_index,
742            result.status.clone(),
743            result.duration,
744            result.exit_code,
745        ));
746
747        result
748    }
749
750    /// Execute a step action
751    async fn execute_step_action(
752        &self,
753        action: &StepAction,
754        step: &Step,
755        step_index: usize,
756        stage_name: &str,
757        job_name: &str,
758        runtime: &mut RuntimeContext,
759    ) -> StepResult {
760        let start = Instant::now();
761        let step_name = step.name.clone();
762
763        match action {
764            StepAction::Script(script_step) => {
765                self.execute_script(
766                    &script_step.script,
767                    script_step.working_directory.as_deref(),
768                    script_step.fail_on_stderr,
769                    step,
770                    step_index,
771                    stage_name,
772                    job_name,
773                    runtime,
774                )
775                .await
776            }
777            StepAction::Bash(bash_step) => {
778                self.execute_bash(
779                    &bash_step.bash,
780                    bash_step.working_directory.as_deref(),
781                    bash_step.fail_on_stderr,
782                    step,
783                    step_index,
784                    stage_name,
785                    job_name,
786                    runtime,
787                )
788                .await
789            }
790            StepAction::Pwsh(pwsh_step) => {
791                self.execute_pwsh(
792                    &pwsh_step.pwsh,
793                    pwsh_step.working_directory.as_deref(),
794                    pwsh_step.fail_on_stderr,
795                    step,
796                    step_index,
797                    stage_name,
798                    job_name,
799                    runtime,
800                )
801                .await
802            }
803            StepAction::PowerShell(ps_step) => {
804                self.execute_powershell(
805                    &ps_step.powershell,
806                    ps_step.working_directory.as_deref(),
807                    ps_step.fail_on_stderr,
808                    step,
809                    step_index,
810                    stage_name,
811                    job_name,
812                    runtime,
813                )
814                .await
815            }
816            StepAction::Task(task_step) => {
817                // Execute task using TaskRunner
818                if let Some(task_runner) = &self.task_runner {
819                    let working_dir = std::path::PathBuf::from(&runtime.base.working_dir);
820                    let env = runtime.env_as_strings();
821
822                    match task_runner
823                        .execute_task(&task_step.task, &task_step.inputs, &env, &working_dir)
824                        .await
825                    {
826                        Ok(mut result) => {
827                            result.step_name = step_name;
828                            result.display_name = step.display_name.clone();
829
830                            // Send output event
831                            if !result.output.is_empty() {
832                                self.event_tx.send_event(ExecutionEvent::step_output(
833                                    stage_name,
834                                    job_name,
835                                    result.step_name.clone(),
836                                    step_index,
837                                    &result.output,
838                                    false,
839                                ));
840                            }
841
842                            result
843                        }
844                        Err(e) => StepResult {
845                            step_name,
846                            display_name: step.display_name.clone(),
847                            status: StepStatus::Failed,
848                            output: String::new(),
849                            error: Some(format!("Task execution failed: {}", e)),
850                            duration: start.elapsed(),
851                            exit_code: None,
852                            outputs: HashMap::new(),
853                        },
854                    }
855                } else {
856                    // Task runner not configured - log a warning
857                    self.event_tx.send_event(ExecutionEvent::step_output(
858                        stage_name,
859                        job_name,
860                        step_name.clone(),
861                        step_index,
862                        format!("Task runner not configured. Task: {}", task_step.task),
863                        true,
864                    ));
865
866                    StepResult {
867                        step_name,
868                        display_name: step.display_name.clone(),
869                        status: StepStatus::Skipped,
870                        output: format!(
871                            "Task: {} (skipped - task runner not configured)",
872                            task_step.task
873                        ),
874                        error: None,
875                        duration: start.elapsed(),
876                        exit_code: None,
877                        outputs: HashMap::new(),
878                    }
879                }
880            }
881            StepAction::Checkout(_) => {
882                // Checkout - for now, assume already checked out
883                StepResult {
884                    step_name,
885                    display_name: step.display_name.clone(),
886                    status: StepStatus::Succeeded,
887                    output: "Checkout: Using existing working directory".to_string(),
888                    error: None,
889                    duration: start.elapsed(),
890                    exit_code: Some(0),
891                    outputs: HashMap::new(),
892                }
893            }
894            StepAction::Template(_) => {
895                // Templates should be expanded earlier
896                StepResult {
897                    step_name,
898                    display_name: step.display_name.clone(),
899                    status: StepStatus::Skipped,
900                    output: "Template step (should be expanded)".to_string(),
901                    error: None,
902                    duration: start.elapsed(),
903                    exit_code: None,
904                    outputs: HashMap::new(),
905                }
906            }
907            StepAction::Download(_) | StepAction::Publish(_) => {
908                // Download/Publish - placeholder for now
909                StepResult {
910                    step_name,
911                    display_name: step.display_name.clone(),
912                    status: StepStatus::Succeeded,
913                    output: "Artifact operation (placeholder)".to_string(),
914                    error: None,
915                    duration: start.elapsed(),
916                    exit_code: Some(0),
917                    outputs: HashMap::new(),
918                }
919            }
920            StepAction::GetPackage(_) | StepAction::ReviewApp(_) => {
921                // Other steps - placeholder
922                StepResult {
923                    step_name,
924                    display_name: step.display_name.clone(),
925                    status: StepStatus::Skipped,
926                    output: "Step type not implemented".to_string(),
927                    error: None,
928                    duration: start.elapsed(),
929                    exit_code: None,
930                    outputs: HashMap::new(),
931                }
932            }
933        }
934    }
935
936    /// Execute a script step
937    #[allow(clippy::too_many_arguments)]
938    async fn execute_script(
939        &self,
940        script: &str,
941        working_directory: Option<&str>,
942        fail_on_stderr: bool,
943        step: &Step,
944        step_index: usize,
945        stage_name: &str,
946        job_name: &str,
947        runtime: &mut RuntimeContext,
948    ) -> StepResult {
949        // Substitute variables in script
950        let script = match runtime.substitute_variables(script) {
951            Ok(s) => s,
952            Err(e) => {
953                return StepResult {
954                    step_name: step.name.clone(),
955                    display_name: step.display_name.clone(),
956                    status: StepStatus::Failed,
957                    output: String::new(),
958                    error: Some(format!("Variable substitution failed: {}", e)),
959                    duration: Duration::ZERO,
960                    exit_code: None,
961                    outputs: HashMap::new(),
962                };
963            }
964        };
965
966        self.run_shell_command(
967            &script,
968            "sh",
969            &["-c"],
970            working_directory,
971            fail_on_stderr,
972            step,
973            step_index,
974            stage_name,
975            job_name,
976            runtime,
977        )
978        .await
979    }
980
981    /// Execute a bash step
982    #[allow(clippy::too_many_arguments)]
983    async fn execute_bash(
984        &self,
985        script: &str,
986        working_directory: Option<&str>,
987        fail_on_stderr: bool,
988        step: &Step,
989        step_index: usize,
990        stage_name: &str,
991        job_name: &str,
992        runtime: &mut RuntimeContext,
993    ) -> StepResult {
994        let script = match runtime.substitute_variables(script) {
995            Ok(s) => s,
996            Err(e) => {
997                return StepResult {
998                    step_name: step.name.clone(),
999                    display_name: step.display_name.clone(),
1000                    status: StepStatus::Failed,
1001                    output: String::new(),
1002                    error: Some(format!("Variable substitution failed: {}", e)),
1003                    duration: Duration::ZERO,
1004                    exit_code: None,
1005                    outputs: HashMap::new(),
1006                };
1007            }
1008        };
1009
1010        self.run_shell_command(
1011            &script,
1012            "bash",
1013            &["-c"],
1014            working_directory,
1015            fail_on_stderr,
1016            step,
1017            step_index,
1018            stage_name,
1019            job_name,
1020            runtime,
1021        )
1022        .await
1023    }
1024
1025    /// Execute a pwsh (PowerShell Core) step
1026    #[allow(clippy::too_many_arguments)]
1027    async fn execute_pwsh(
1028        &self,
1029        script: &str,
1030        working_directory: Option<&str>,
1031        fail_on_stderr: bool,
1032        step: &Step,
1033        step_index: usize,
1034        stage_name: &str,
1035        job_name: &str,
1036        runtime: &mut RuntimeContext,
1037    ) -> StepResult {
1038        let script = match runtime.substitute_variables(script) {
1039            Ok(s) => s,
1040            Err(e) => {
1041                return StepResult {
1042                    step_name: step.name.clone(),
1043                    display_name: step.display_name.clone(),
1044                    status: StepStatus::Failed,
1045                    output: String::new(),
1046                    error: Some(format!("Variable substitution failed: {}", e)),
1047                    duration: Duration::ZERO,
1048                    exit_code: None,
1049                    outputs: HashMap::new(),
1050                };
1051            }
1052        };
1053
1054        self.run_shell_command(
1055            &script,
1056            "pwsh",
1057            &["-Command"],
1058            working_directory,
1059            fail_on_stderr,
1060            step,
1061            step_index,
1062            stage_name,
1063            job_name,
1064            runtime,
1065        )
1066        .await
1067    }
1068
1069    /// Execute a PowerShell (Windows) step
1070    #[allow(clippy::too_many_arguments)]
1071    async fn execute_powershell(
1072        &self,
1073        script: &str,
1074        working_directory: Option<&str>,
1075        fail_on_stderr: bool,
1076        step: &Step,
1077        step_index: usize,
1078        stage_name: &str,
1079        job_name: &str,
1080        runtime: &mut RuntimeContext,
1081    ) -> StepResult {
1082        let script = match runtime.substitute_variables(script) {
1083            Ok(s) => s,
1084            Err(e) => {
1085                return StepResult {
1086                    step_name: step.name.clone(),
1087                    display_name: step.display_name.clone(),
1088                    status: StepStatus::Failed,
1089                    output: String::new(),
1090                    error: Some(format!("Variable substitution failed: {}", e)),
1091                    duration: Duration::ZERO,
1092                    exit_code: None,
1093                    outputs: HashMap::new(),
1094                };
1095            }
1096        };
1097
1098        // On Windows, use powershell.exe; on other platforms, fall back to pwsh
1099        let (shell, args): (&str, &[&str]) = if cfg!(target_os = "windows") {
1100            ("powershell.exe", &["-Command"])
1101        } else {
1102            ("pwsh", &["-Command"])
1103        };
1104
1105        self.run_shell_command(
1106            &script,
1107            shell,
1108            args,
1109            working_directory,
1110            fail_on_stderr,
1111            step,
1112            step_index,
1113            stage_name,
1114            job_name,
1115            runtime,
1116        )
1117        .await
1118    }
1119
1120    /// Run a shell command
1121    #[allow(clippy::too_many_arguments)]
1122    async fn run_shell_command(
1123        &self,
1124        script: &str,
1125        shell: &str,
1126        shell_args: &[&str],
1127        working_directory: Option<&str>,
1128        fail_on_stderr: bool,
1129        step: &Step,
1130        step_index: usize,
1131        stage_name: &str,
1132        job_name: &str,
1133        runtime: &mut RuntimeContext,
1134    ) -> StepResult {
1135        use tokio::process::Command;
1136        let start = Instant::now();
1137
1138        let working_dir = working_directory
1139            .map(|d| d.to_string())
1140            .unwrap_or_else(|| runtime.base.working_dir.clone());
1141
1142        // Build environment
1143        let mut env = runtime.env_as_strings();
1144        for (k, v) in &step.env {
1145            // Substitute variables in env values
1146            let value = runtime
1147                .substitute_variables(v)
1148                .unwrap_or_else(|_| v.clone());
1149            env.insert(k.clone(), value);
1150        }
1151
1152        let mut cmd = Command::new(shell);
1153        cmd.args(shell_args);
1154        cmd.arg(script);
1155        cmd.current_dir(&working_dir);
1156        cmd.envs(&env);
1157
1158        // Capture output
1159        cmd.stdout(std::process::Stdio::piped());
1160        cmd.stderr(std::process::Stdio::piped());
1161
1162        let output = match cmd.output().await {
1163            Ok(output) => output,
1164            Err(e) => {
1165                return StepResult {
1166                    step_name: step.name.clone(),
1167                    display_name: step.display_name.clone(),
1168                    status: StepStatus::Failed,
1169                    output: String::new(),
1170                    error: Some(format!("Failed to execute command: {}", e)),
1171                    duration: start.elapsed(),
1172                    exit_code: None,
1173                    outputs: HashMap::new(),
1174                };
1175            }
1176        };
1177
1178        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1179        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1180
1181        // Send output events
1182        if !stdout.is_empty() {
1183            self.event_tx.send_event(ExecutionEvent::step_output(
1184                stage_name,
1185                job_name,
1186                step.name.clone(),
1187                step_index,
1188                &stdout,
1189                false,
1190            ));
1191        }
1192
1193        if !stderr.is_empty() {
1194            self.event_tx.send_event(ExecutionEvent::step_output(
1195                stage_name,
1196                job_name,
1197                step.name.clone(),
1198                step_index,
1199                &stderr,
1200                true,
1201            ));
1202        }
1203
1204        // Parse output for Azure DevOps logging commands
1205        let outputs = parse_logging_commands(&stdout, runtime);
1206
1207        // Determine status
1208        let exit_code = output.status.code();
1209        let status = if !output.status.success() || (fail_on_stderr && !stderr.is_empty()) {
1210            StepStatus::Failed
1211        } else {
1212            StepStatus::Succeeded
1213        };
1214
1215        StepResult {
1216            step_name: step.name.clone(),
1217            display_name: step.display_name.clone(),
1218            status,
1219            output: stdout,
1220            error: if stderr.is_empty() {
1221                None
1222            } else {
1223                Some(stderr)
1224            },
1225            duration: start.elapsed(),
1226            exit_code,
1227            outputs,
1228        }
1229    }
1230}
1231
1232/// Check if a step should always run (has always() condition)
1233fn should_always_run(step: &Step) -> bool {
1234    step.condition
1235        .as_ref()
1236        .map(|c| c.contains("always()"))
1237        .unwrap_or(false)
1238}
1239
1240/// Build synthetic skipped step results for all steps in a job
1241fn skipped_step_results(job: &Job) -> Vec<StepResult> {
1242    // Use deployment steps if this is a deployment job
1243    let steps: Vec<&Step> = if job.deployment.is_some() {
1244        let deployment = collect_deployment_steps(job);
1245        if !deployment.is_empty() {
1246            return deployment
1247                .iter()
1248                .map(|step| StepResult {
1249                    step_name: step.name.clone(),
1250                    display_name: step.display_name.clone(),
1251                    status: StepStatus::Skipped,
1252                    output: String::new(),
1253                    error: None,
1254                    duration: Duration::ZERO,
1255                    exit_code: None,
1256                    outputs: HashMap::new(),
1257                })
1258                .collect();
1259        }
1260        job.steps.iter().collect()
1261    } else {
1262        job.steps.iter().collect()
1263    };
1264
1265    steps
1266        .iter()
1267        .map(|step| StepResult {
1268            step_name: step.name.clone(),
1269            display_name: step.display_name.clone(),
1270            status: StepStatus::Skipped,
1271            output: String::new(),
1272            error: None,
1273            duration: Duration::ZERO,
1274            exit_code: None,
1275            outputs: HashMap::new(),
1276        })
1277        .collect()
1278}
1279
1280/// Collect steps from deployment strategy hooks in execution order.
1281///
1282/// Deployment jobs define steps inside strategy hooks (runOnce, rolling, canary)
1283/// rather than in the top-level `steps` field. This function extracts steps from
1284/// the hooks in the correct Azure DevOps execution order:
1285/// preDeploy → deploy → routeTraffic → postRouteTraffic
1286fn collect_deployment_steps(job: &Job) -> Vec<Step> {
1287    let mut steps = Vec::new();
1288
1289    if let Some(strategy) = &job.strategy {
1290        // Helper to extract steps from DeploymentHooks
1291        let extract_from_hooks =
1292            |hooks: &crate::parser::models::DeploymentHooks, steps: &mut Vec<Step>| {
1293                if let Some(hook) = &hooks.pre_deploy {
1294                    steps.extend(hook.steps.clone());
1295                }
1296                if let Some(hook) = &hooks.deploy {
1297                    steps.extend(hook.steps.clone());
1298                }
1299                if let Some(hook) = &hooks.route_traffic {
1300                    steps.extend(hook.steps.clone());
1301                }
1302                if let Some(hook) = &hooks.post_route_traffic {
1303                    steps.extend(hook.steps.clone());
1304                }
1305            };
1306
1307        if let Some(run_once) = &strategy.run_once {
1308            extract_from_hooks(run_once, &mut steps);
1309        }
1310        if let Some(rolling) = &strategy.rolling {
1311            extract_from_hooks(&rolling.hooks, &mut steps);
1312        }
1313        if let Some(canary) = &strategy.canary {
1314            extract_from_hooks(&canary.hooks, &mut steps);
1315        }
1316    }
1317
1318    steps
1319}
1320
1321/// Build synthetic skipped job results for all jobs in a stage node
1322fn skipped_job_results(stage_node: &StageNode) -> Vec<JobResult> {
1323    stage_node
1324        .jobs
1325        .iter()
1326        .map(|job_node| {
1327            let job = &job_node.job;
1328            let job_name = job.identifier().unwrap_or("unknown").to_string();
1329            JobResult {
1330                job_name,
1331                display_name: job.display_name.clone(),
1332                status: JobStatus::Skipped,
1333                steps: skipped_step_results(job),
1334                duration: Duration::ZERO,
1335                outputs: HashMap::new(),
1336            }
1337        })
1338        .collect()
1339}
1340
1341/// Parse Azure DevOps logging commands from output
1342fn parse_logging_commands(output: &str, runtime: &mut RuntimeContext) -> HashMap<String, String> {
1343    let mut outputs = HashMap::new();
1344
1345    for line in output.lines() {
1346        // ##vso[task.setvariable variable=name]value
1347        if let Some(rest) = line.strip_prefix("##vso[task.setvariable") {
1348            if let Some((props, value)) = rest.split_once(']') {
1349                let mut var_name = None;
1350                let mut is_output = false;
1351                let mut is_secret = false;
1352
1353                for prop in props.split(';') {
1354                    let prop = prop.trim();
1355                    if let Some(name) = prop.strip_prefix("variable=") {
1356                        var_name = Some(name.to_string());
1357                    } else if prop.eq_ignore_ascii_case("isoutput=true") {
1358                        is_output = true;
1359                    } else if prop.eq_ignore_ascii_case("issecret=true") {
1360                        is_secret = true;
1361                    }
1362                }
1363
1364                if let Some(name) = var_name {
1365                    let value = value.to_string();
1366                    if is_output {
1367                        outputs.insert(name.clone(), value.clone());
1368                    }
1369                    if !is_secret {
1370                        runtime.set_variable(name, crate::parser::models::Value::String(value));
1371                    }
1372                }
1373            }
1374        }
1375    }
1376
1377    outputs
1378}
1379
1380#[cfg(test)]
1381mod tests {
1382    use super::*;
1383    use crate::parser::models::{BoolOrExpression, DependsOn, Job, ScriptStep, Stage, Step};
1384
1385    fn make_simple_pipeline() -> Pipeline {
1386        Pipeline {
1387            name: Some("test-pipeline".to_string()),
1388            stages: vec![Stage {
1389                stage: Some("Build".to_string()),
1390                display_name: None,
1391                depends_on: DependsOn::None,
1392                condition: None,
1393                variables: Vec::new(),
1394                jobs: vec![Job {
1395                    job: Some("BuildJob".to_string()),
1396                    deployment: None,
1397                    display_name: None,
1398                    depends_on: DependsOn::None,
1399                    condition: None,
1400                    strategy: None,
1401                    pool: None,
1402                    container: None,
1403                    services: HashMap::new(),
1404                    variables: Vec::new(),
1405                    steps: vec![Step {
1406                        name: Some("echo".to_string()),
1407                        display_name: Some("Echo Hello".to_string()),
1408                        condition: None,
1409                        continue_on_error: BoolOrExpression::default(),
1410                        enabled: true,
1411                        timeout_in_minutes: None,
1412                        retry_count_on_task_failure: None,
1413                        env: HashMap::new(),
1414                        action: StepAction::Script(ScriptStep {
1415                            script: "echo Hello".to_string(),
1416                            working_directory: None,
1417                            fail_on_stderr: false,
1418                        }),
1419                    }],
1420                    timeout_in_minutes: None,
1421                    cancel_timeout_in_minutes: None,
1422                    continue_on_error: BoolOrExpression::default(),
1423                    workspace: None,
1424                    uses: None,
1425                    template: None,
1426                    parameters: HashMap::new(),
1427                    environment: None,
1428                    has_template_directives: false,
1429                }],
1430                lock_behavior: None,
1431                template: None,
1432                parameters: HashMap::new(),
1433                pool: None,
1434                has_template_directives: false,
1435            }],
1436            ..Default::default()
1437        }
1438    }
1439
1440    #[test]
1441    fn test_executor_creation() {
1442        let pipeline = make_simple_pipeline();
1443        let executor = PipelineExecutor::from_pipeline(&pipeline).unwrap();
1444
1445        assert_eq!(executor.graph.stages.len(), 1);
1446    }
1447
1448    #[tokio::test]
1449    async fn test_simple_execution() {
1450        let pipeline = make_simple_pipeline();
1451        let executor = PipelineExecutor::from_pipeline(&pipeline).unwrap();
1452
1453        let context = ExecutionContext::new(
1454            "test".to_string(),
1455            std::env::current_dir()
1456                .unwrap()
1457                .to_string_lossy()
1458                .to_string(),
1459        );
1460
1461        let result = executor.execute(context).await;
1462
1463        assert_eq!(result.stages.len(), 1);
1464        assert_eq!(result.stages[0].stage_name, "Build");
1465        assert_eq!(result.stages[0].status, StageStatus::Succeeded);
1466    }
1467
1468    #[test]
1469    fn test_parse_logging_commands() {
1470        let base = ExecutionContext::new("test".to_string(), "/work".to_string());
1471        let mut runtime = RuntimeContext::new(base);
1472
1473        let output = r#"
1474Hello
1475##vso[task.setvariable variable=version]1.0.0
1476##vso[task.setvariable variable=output;isoutput=true]result
1477World
1478"#;
1479
1480        let outputs = parse_logging_commands(output, &mut runtime);
1481
1482        assert_eq!(outputs.get("output"), Some(&"result".to_string()));
1483        assert_eq!(
1484            runtime.variables.get("version"),
1485            Some(&crate::parser::models::Value::String("1.0.0".to_string()))
1486        );
1487    }
1488
1489    #[test]
1490    fn test_should_always_run() {
1491        let step_with_always = Step {
1492            name: None,
1493            display_name: None,
1494            condition: Some("always()".to_string()),
1495            continue_on_error: BoolOrExpression::default(),
1496            enabled: true,
1497            timeout_in_minutes: None,
1498            retry_count_on_task_failure: None,
1499            env: HashMap::new(),
1500            action: StepAction::Script(ScriptStep {
1501                script: "echo".to_string(),
1502                working_directory: None,
1503                fail_on_stderr: false,
1504            }),
1505        };
1506
1507        let step_without_always = Step {
1508            name: None,
1509            display_name: None,
1510            condition: Some("succeeded()".to_string()),
1511            continue_on_error: BoolOrExpression::default(),
1512            enabled: true,
1513            timeout_in_minutes: None,
1514            retry_count_on_task_failure: None,
1515            env: HashMap::new(),
1516            action: StepAction::Script(ScriptStep {
1517                script: "echo".to_string(),
1518                working_directory: None,
1519                fail_on_stderr: false,
1520            }),
1521        };
1522
1523        assert!(should_always_run(&step_with_always));
1524        assert!(!should_always_run(&step_without_always));
1525    }
1526}