Skip to main content

opal/compiler/
compile.rs

1use crate::compiler::{CompileContext, CompiledPipeline, JobInstance, JobVariantInfo};
2use crate::model::{
3    DependencySourceSpec, JobDependencySpec, JobSpec, ParallelConfigSpec, ParallelMatrixEntrySpec,
4    PipelineSpec,
5};
6use crate::pipeline::rules::{RuleEvaluation, apply_when_config, evaluate_rules, filters_allow};
7use anyhow::{Result, anyhow, bail};
8use std::collections::HashMap;
9use tracing::warn;
10
11#[derive(Clone)]
12struct LabelCombination {
13    ordered: Vec<(String, String)>,
14    lookup: HashMap<String, String>,
15}
16
17impl LabelCombination {
18    fn empty() -> Self {
19        Self {
20            ordered: Vec::new(),
21            lookup: HashMap::new(),
22        }
23    }
24
25    fn push(&self, key: String, value: String) -> Self {
26        let mut ordered = self.ordered.clone();
27        ordered.push((key.clone(), value.clone()));
28        let mut lookup = self.lookup.clone();
29        lookup.insert(key, value);
30        Self { ordered, lookup }
31    }
32}
33
34struct ExpandedVariant {
35    job: JobSpec,
36    labels: HashMap<String, String>,
37    base_name: String,
38    ordered_values: Vec<String>,
39}
40
41pub fn compile_pipeline(
42    pipeline: &PipelineSpec,
43    rule_ctx: Option<&CompileContext>,
44) -> Result<CompiledPipeline> {
45    let mut jobs = HashMap::new();
46    let mut ordered = Vec::new();
47    let mut expanded_jobs: HashMap<String, Vec<ExpandedVariant>> = HashMap::new();
48    let mut variant_lookup: HashMap<String, Vec<JobVariantInfo>> = HashMap::new();
49
50    for base_job in pipeline.jobs.values() {
51        let base_job = base_job.clone();
52        let variants = expand_job_variants(base_job.clone())?;
53        variant_lookup.insert(
54            base_job.name.clone(),
55            variants
56                .iter()
57                .map(|variant| JobVariantInfo {
58                    name: variant.job.name.clone(),
59                    labels: variant.labels.clone(),
60                    ordered_values: variant.ordered_values.clone(),
61                })
62                .collect(),
63        );
64        expanded_jobs.insert(base_job.name.clone(), variants);
65    }
66
67    // TODO: fox inside a for inside a for, this needs to be split up and structured properly
68
69    for (stage_idx, stage) in pipeline.stages.iter().enumerate() {
70        let default_deps: Vec<String> = if stage_idx == 0 {
71            Vec::new()
72        } else {
73            pipeline.stages[stage_idx - 1].jobs.clone()
74        };
75
76        for job_name in &stage.jobs {
77            let base_job = pipeline
78                .jobs
79                .get(job_name)
80                .cloned()
81                .ok_or_else(|| anyhow!("missing job '{}'", job_name))?;
82            let base_name = base_job.name.clone();
83            let variants = match expanded_jobs.remove(&base_name) {
84                Some(list) => list,
85                None => expand_job_variants(base_job.clone())?,
86            };
87            for mut expanded in variants {
88                let evaluation = if let Some(ctx) = rule_ctx {
89                    if !filters_allow(&expanded.job, ctx) {
90                        if let Some(entry) = variant_lookup.get_mut(&expanded.base_name) {
91                            entry.retain(|meta| meta.name != expanded.job.name);
92                        }
93                        continue;
94                    }
95                    evaluate_rules(&expanded.job, ctx)?
96                } else {
97                    RuleEvaluation::default()
98                };
99                let mut evaluation = evaluation;
100                if expanded.job.rules.is_empty() {
101                    apply_when_config(
102                        &mut evaluation,
103                        expanded.job.when.as_deref(),
104                        None,
105                        Some("manual job"),
106                    );
107                }
108                if !evaluation.included {
109                    if let Some(entry) = variant_lookup.get_mut(&expanded.base_name) {
110                        entry.retain(|meta| meta.name != expanded.job.name);
111                    }
112                    continue;
113                }
114                if !expanded.job.tags.is_empty() {
115                    warn!(
116                        job = %expanded.job.name,
117                        tags = ?expanded.job.tags,
118                        "job has runner tags, but Opal runs locally; ignoring tags"
119                    );
120                }
121                if !evaluation.variables.is_empty() {
122                    expanded.job.variables.extend(evaluation.variables.clone());
123                }
124                let resolved_deps = if expanded.job.explicit_needs {
125                    resolve_parallel_dependencies(
126                        &expanded.job.name,
127                        &expanded.job.needs,
128                        &variant_lookup,
129                    )?
130                } else {
131                    resolve_default_dependencies(&default_deps, &variant_lookup)
132                };
133                let job_timeout = expanded.job.timeout;
134                let job_retry = expanded.job.retry.clone();
135                let job_interruptible = expanded.job.interruptible;
136                let job_resource_group = expanded.job.resource_group.clone();
137                let job_name = expanded.job.name.clone();
138                let job_stage = stage.name.clone();
139                ordered.push(job_name.clone());
140                jobs.insert(
141                    job_name.clone(),
142                    JobInstance {
143                        job: expanded.job,
144                        stage_name: job_stage,
145                        dependencies: resolved_deps,
146                        rule: evaluation.clone(),
147                        timeout: job_timeout,
148                        retry: job_retry,
149                        interruptible: job_interruptible,
150                        resource_group: job_resource_group,
151                    },
152                );
153            }
154        }
155    }
156
157    let mut order_index = HashMap::new();
158    for (idx, name) in ordered.iter().enumerate() {
159        order_index.insert(name.clone(), idx);
160    }
161
162    let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
163    for (name, instance) in &jobs {
164        for dep in &instance.dependencies {
165            if !jobs.contains_key(dep) {
166                return Err(anyhow!("job '{}' depends on unknown job '{}'", name, dep));
167            }
168            dependents
169                .entry(dep.clone())
170                .or_default()
171                .push(name.clone());
172        }
173    }
174
175    for deps in dependents.values_mut() {
176        deps.sort_by_key(|name| order_index.get(name).copied().unwrap_or(usize::MAX));
177    }
178
179    Ok(CompiledPipeline {
180        ordered,
181        jobs,
182        dependents,
183        order_index,
184        variants: variant_lookup,
185    })
186}
187
188fn resolve_parallel_dependencies(
189    owner: &str,
190    deps: &[JobDependencySpec],
191    variant_lookup: &HashMap<String, Vec<JobVariantInfo>>,
192) -> Result<Vec<String>> {
193    let mut resolved = Vec::new();
194    for dep in deps {
195        if !matches!(dep.source, DependencySourceSpec::Local) {
196            continue;
197        }
198        let Some(variants) = variant_lookup.get(&dep.job) else {
199            if dep.optional {
200                continue;
201            } else {
202                return Err(anyhow!(
203                    "job '{}' depends on unknown job '{}'",
204                    owner,
205                    dep.job
206                ));
207            }
208        };
209        let selected = select_variants(variants, dep);
210        if selected.is_empty() {
211            if dep.optional {
212                continue;
213            } else {
214                return Err(anyhow!(
215                    "job '{}' depends on '{}', but no parallel variant matches the requested matrix",
216                    owner,
217                    dep.job
218                ));
219            }
220        }
221        resolved.extend(selected.into_iter().map(|variant| variant.name.clone()));
222    }
223    resolved.sort();
224    resolved.dedup();
225    Ok(resolved)
226}
227
228fn resolve_default_dependencies(
229    defaults: &[String],
230    variant_lookup: &HashMap<String, Vec<JobVariantInfo>>,
231) -> Vec<String> {
232    let mut deps = Vec::new();
233    for name in defaults {
234        if let Some(variants) = variant_lookup.get(name) {
235            deps.extend(variants.iter().map(|variant| variant.name.clone()));
236        }
237    }
238    deps.sort();
239    deps.dedup();
240    deps
241}
242
243fn select_variants<'a>(
244    variants: &'a [JobVariantInfo],
245    dep: &JobDependencySpec,
246) -> Vec<&'a JobVariantInfo> {
247    // TODO: this logic is very nested and seems brittle. evaluate if this should be split
248    // lots of if else if
249    if let Some(filters) = &dep.parallel {
250        variants
251            .iter()
252            .filter(|variant| {
253                filters.iter().any(|filter| {
254                    filter.iter().all(|(key, value)| {
255                        variant
256                            .labels
257                            .get(key)
258                            .map(|current| current == value)
259                            .unwrap_or(false)
260                    })
261                })
262            })
263            .collect()
264    } else if let Some(expected) = &dep.inline_variant {
265        variants
266            .iter()
267            .filter(|variant| &variant.ordered_values == expected)
268            .collect()
269    } else {
270        variants.iter().collect()
271    }
272}
273
274fn expand_job_variants(job: JobSpec) -> Result<Vec<ExpandedVariant>> {
275    let base_name = job.name.clone();
276    let mut variants = Vec::new();
277    match &job.parallel {
278        Some(ParallelConfigSpec::Count(count)) => {
279            let total = (*count).max(1);
280            for idx in 0..total {
281                let mut clone = job.clone();
282                clone.parallel = None;
283                clone.name = format!("{}: [{}]", base_name, idx + 1);
284                clone
285                    .variables
286                    .insert("CI_NODE_INDEX".into(), (idx + 1).to_string());
287                clone
288                    .variables
289                    .insert("CI_NODE_TOTAL".into(), total.to_string());
290                variants.push(ExpandedVariant {
291                    job: clone,
292                    labels: HashMap::new(),
293                    base_name: base_name.clone(),
294                    ordered_values: vec![(idx + 1).to_string()],
295                });
296            }
297        }
298        Some(ParallelConfigSpec::Matrix(entries)) => {
299            let combos = matrix_combinations(entries)?;
300            if combos.len() > 200 {
301                bail!(
302                    "parallel matrix for '{}' produces {} combinations, exceeding the limit of 200",
303                    base_name,
304                    combos.len()
305                );
306            }
307            let total = combos.len();
308            // TODO: for inside a for inside a for loop. garbage, must be refactored and properly structured
309            for (idx, combo) in combos.into_iter().enumerate() {
310                let mut clone = job.clone();
311                clone.parallel = None;
312                let label_text = format_gitlab_variant_values(&combo.ordered);
313                clone.name = format!("{}: [{}]", base_name, label_text);
314                for (key, value) in &combo.ordered {
315                    clone.variables.insert(key.clone(), value.clone());
316                }
317                clone
318                    .variables
319                    .insert("CI_NODE_INDEX".into(), (idx + 1).to_string());
320                clone
321                    .variables
322                    .insert("CI_NODE_TOTAL".into(), total.to_string());
323                let ordered_values = combo
324                    .ordered
325                    .iter()
326                    .map(|(_, value)| value.clone())
327                    .collect();
328                variants.push(ExpandedVariant {
329                    job: clone,
330                    labels: combo.lookup.clone(),
331                    base_name: base_name.clone(),
332                    ordered_values,
333                });
334            }
335        }
336        None => {
337            let mut clone = job.clone();
338            clone.parallel = None;
339            variants.push(ExpandedVariant {
340                job: clone,
341                labels: HashMap::new(),
342                base_name,
343                ordered_values: Vec::new(),
344            });
345            return Ok(variants);
346        }
347    }
348    Ok(variants)
349}
350
351fn matrix_combinations(entries: &[ParallelMatrixEntrySpec]) -> Result<Vec<LabelCombination>> {
352    if entries.is_empty() {
353        return Ok(vec![LabelCombination::empty()]);
354    }
355    // TODO: for inside a for inside a for, needs to be restructured and refactored
356    let mut combos = Vec::new();
357    for entry in entries {
358        let mut entry_combos = vec![LabelCombination::empty()];
359        for var in &entry.variables {
360            let mut new_sets = Vec::new();
361            for combo in &entry_combos {
362                for value in &var.values {
363                    new_sets.push(combo.push(var.name.clone(), value.clone()));
364                }
365            }
366            entry_combos = new_sets;
367        }
368        combos.extend(entry_combos);
369    }
370    Ok(combos)
371}
372
373fn format_gitlab_variant_values(labels: &[(String, String)]) -> String {
374    labels
375        .iter()
376        .map(|(_, value)| value.clone())
377        .collect::<Vec<_>>()
378        .join(", ")
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use crate::gitlab::rules::JobRule;
385    use crate::model::{
386        ArtifactSpec, PipelineDefaultsSpec, PipelineFilterSpec, PipelineSpec, RetryPolicySpec,
387        StageSpec,
388    };
389    use crate::pipeline::rules::RuleContext;
390    use std::collections::HashMap;
391    use std::path::Path;
392
393    #[test]
394    fn compile_pipeline_applies_rule_variables_and_excludes_unmatched_jobs() {
395        let included = JobSpec {
396            rules: vec![JobRule {
397                if_expr: Some("$CI_COMMIT_BRANCH == \"main\"".into()),
398                variables: HashMap::from([("FROM_RULE".into(), "1".into())]),
399                ..JobRule::default()
400            }],
401            ..job("lint", "test")
402        };
403        let excluded = JobSpec {
404            rules: vec![JobRule {
405                if_expr: Some("$CI_COMMIT_BRANCH == \"release\"".into()),
406                ..JobRule::default()
407            }],
408            ..job("publish", "deploy")
409        };
410        let pipeline = pipeline_spec(
411            vec![
412                StageSpec {
413                    name: "test".into(),
414                    jobs: vec!["lint".into()],
415                },
416                StageSpec {
417                    name: "deploy".into(),
418                    jobs: vec!["publish".into()],
419                },
420            ],
421            vec![included, excluded],
422        );
423        let temp = tempfile::tempdir().expect("tempdir");
424        let ctx = RuleContext::from_env(
425            temp.path(),
426            HashMap::from([
427                ("CI_PIPELINE_SOURCE".into(), "push".into()),
428                ("CI_COMMIT_BRANCH".into(), "main".into()),
429                ("CI_COMMIT_REF_NAME".into(), "main".into()),
430                ("CI_COMMIT_REF_SLUG".into(), "main".into()),
431            ]),
432            false,
433        );
434
435        let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
436
437        assert!(compiled.jobs.contains_key("lint"));
438        assert!(!compiled.jobs.contains_key("publish"));
439        assert_eq!(
440            compiled.jobs["lint"].job.variables.get("FROM_RULE"),
441            Some(&"1".to_string())
442        );
443        assert!(
444            compiled
445                .variants
446                .get("publish")
447                .is_none_or(|variants| variants.is_empty())
448        );
449    }
450
451    #[test]
452    fn compile_pipeline_resolves_matrix_needs_to_variant_dependencies() {
453        let pipeline = PipelineSpec::from_path(Path::new(
454            "pipelines/tests/needs-and-artifacts.gitlab-ci.yml",
455        ))
456        .expect("pipeline loads");
457        let ctx = RuleContext::from_env(
458            Path::new("."),
459            HashMap::from([
460                ("CI_PIPELINE_SOURCE".into(), "push".into()),
461                ("CI_COMMIT_BRANCH".into(), "main".into()),
462                ("CI_COMMIT_REF_NAME".into(), "main".into()),
463                ("CI_COMMIT_REF_SLUG".into(), "main".into()),
464            ]),
465            false,
466        );
467
468        let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
469
470        let package = compiled
471            .jobs
472            .get("package-linux")
473            .expect("package job exists");
474        assert!(compiled.jobs.contains_key("build-matrix: [linux, release]"));
475        assert!(
476            package
477                .dependencies
478                .iter()
479                .any(|dep| dep == "build-matrix: [linux, release]")
480        );
481
482        let matrix_need = package
483            .job
484            .needs
485            .iter()
486            .find(|need| need.job == "build-matrix")
487            .expect("matrix dependency present");
488        let variants = compiled.variants_for_dependency(matrix_need);
489        assert_eq!(variants, vec!["build-matrix: [linux, release]".to_string()]);
490    }
491
492    #[test]
493    fn compile_pipeline_applies_job_only_except_filters() {
494        let branch_only = JobSpec {
495            only: vec!["branches".into()],
496            ..job("branch-only", "build")
497        };
498        let tag_only = JobSpec {
499            only: vec!["tags".into()],
500            ..job("tag-only", "test")
501        };
502        let release_excluded = JobSpec {
503            only: vec!["branches".into()],
504            except: vec!["/^release\\/.*$/".into()],
505            ..job("no-release", "test")
506        };
507        let pipeline = pipeline_spec(
508            vec![
509                StageSpec {
510                    name: "build".into(),
511                    jobs: vec!["branch-only".into()],
512                },
513                StageSpec {
514                    name: "test".into(),
515                    jobs: vec!["tag-only".into(), "no-release".into()],
516                },
517            ],
518            vec![branch_only, tag_only, release_excluded],
519        );
520        let temp = tempfile::tempdir().expect("tempdir");
521        let branch_ctx = RuleContext::from_env(
522            temp.path(),
523            HashMap::from([
524                ("CI_PIPELINE_SOURCE".into(), "push".into()),
525                ("CI_COMMIT_BRANCH".into(), "main".into()),
526                ("CI_COMMIT_REF_NAME".into(), "main".into()),
527            ]),
528            false,
529        );
530
531        let branch_compiled =
532            compile_pipeline(&pipeline, Some(&branch_ctx)).expect("branch pipeline compiles");
533
534        assert!(branch_compiled.jobs.contains_key("branch-only"));
535        assert!(branch_compiled.jobs.contains_key("no-release"));
536        assert!(!branch_compiled.jobs.contains_key("tag-only"));
537
538        let tag_ctx = RuleContext::from_env(
539            Path::new("."),
540            HashMap::from([
541                ("CI_PIPELINE_SOURCE".into(), "push".into()),
542                ("CI_COMMIT_TAG".into(), "v1.2.0".into()),
543                ("CI_COMMIT_REF_NAME".into(), "v1.2.0".into()),
544            ]),
545            false,
546        );
547        let tag_compiled =
548            compile_pipeline(&pipeline, Some(&tag_ctx)).expect("tag pipeline compiles");
549
550        assert!(!tag_compiled.jobs.contains_key("branch-only"));
551        assert!(tag_compiled.jobs.contains_key("tag-only"));
552    }
553
554    #[test]
555    fn compile_pipeline_applies_job_level_manual_when() {
556        let pipeline = pipeline_spec(
557            vec![StageSpec {
558                name: "deploy".into(),
559                jobs: vec!["stop-review".into()],
560            }],
561            vec![JobSpec {
562                when: Some("manual".into()),
563                ..job("stop-review", "deploy")
564            }],
565        );
566
567        let temp = tempfile::tempdir().expect("tempdir");
568        let ctx = RuleContext::from_env(temp.path(), HashMap::new(), false);
569        let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
570
571        let stop_review = compiled.jobs.get("stop-review").expect("job exists");
572        assert_eq!(stop_review.rule.when, crate::pipeline::RuleWhen::Manual);
573        assert_eq!(
574            stop_review.rule.manual_reason.as_deref(),
575            Some("manual job")
576        );
577    }
578
579    #[test]
580    fn compile_pipeline_preserves_job_level_manual_when_from_fixture() {
581        let pipeline =
582            PipelineSpec::from_path(Path::new("pipelines/tests/environments.gitlab-ci.yml"))
583                .expect("pipeline loads");
584        let ctx = RuleContext::from_env(
585            Path::new("."),
586            HashMap::from([
587                ("CI_PIPELINE_SOURCE".into(), "push".into()),
588                ("CI_COMMIT_BRANCH".into(), "main".into()),
589                ("CI_COMMIT_REF_NAME".into(), "main".into()),
590                ("CI_COMMIT_REF_SLUG".into(), "main".into()),
591            ]),
592            false,
593        );
594
595        let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
596        let stop_review = compiled.jobs.get("stop-review").expect("job exists");
597
598        assert_eq!(stop_review.rule.when, crate::pipeline::RuleWhen::Manual);
599    }
600
601    fn pipeline_spec(stages: Vec<StageSpec>, jobs: Vec<JobSpec>) -> PipelineSpec {
602        PipelineSpec {
603            stages,
604            jobs: jobs
605                .into_iter()
606                .map(|job| (job.name.clone(), job))
607                .collect::<HashMap<_, _>>(),
608            defaults: PipelineDefaultsSpec::default(),
609            workflow: None,
610            filters: PipelineFilterSpec::default(),
611        }
612    }
613
614    fn job(name: &str, stage: &str) -> JobSpec {
615        JobSpec {
616            name: name.into(),
617            stage: stage.into(),
618            commands: vec!["true".into()],
619            needs: Vec::new(),
620            explicit_needs: false,
621            dependencies: Vec::new(),
622            before_script: None,
623            after_script: None,
624            inherit_default_before_script: true,
625            inherit_default_after_script: true,
626            inherit_default_image: true,
627            inherit_default_cache: true,
628            inherit_default_services: true,
629            inherit_default_timeout: true,
630            inherit_default_retry: true,
631            inherit_default_interruptible: true,
632            when: None,
633            rules: Vec::new(),
634            only: Vec::new(),
635            except: Vec::new(),
636            artifacts: ArtifactSpec::default(),
637            cache: Vec::new(),
638            image: None,
639            variables: HashMap::new(),
640            services: Vec::new(),
641            timeout: None,
642            retry: RetryPolicySpec::default(),
643            interruptible: false,
644            resource_group: None,
645            parallel: None,
646            tags: Vec::new(),
647            environment: None,
648        }
649    }
650}