Skip to main content

opal/compiler/
compile.rs

1use crate::compiler::{CompileContext, CompiledPipeline, JobInstance, JobVariantInfo};
2use crate::model::{
3    DependencySourceSpec, JobDependencySpec, JobSpec, ParallelConfigSpec, ParallelMatrixEntrySpec,
4    PipelineSpec,
5};
6use crate::pipeline::rules::{RuleEvaluation, apply_when_config, evaluate_rules, filters_allow};
7use anyhow::{Result, anyhow, bail};
8use std::collections::HashMap;
9use tracing::warn;
10
11#[derive(Clone)]
12struct LabelCombination {
13    ordered: Vec<(String, String)>,
14    lookup: HashMap<String, String>,
15}
16
17impl LabelCombination {
18    fn empty() -> Self {
19        Self {
20            ordered: Vec::new(),
21            lookup: HashMap::new(),
22        }
23    }
24
25    fn push(&self, key: String, value: String) -> Self {
26        let mut ordered = self.ordered.clone();
27        ordered.push((key.clone(), value.clone()));
28        let mut lookup = self.lookup.clone();
29        lookup.insert(key, value);
30        Self { ordered, lookup }
31    }
32}
33
34struct ExpandedVariant {
35    job: JobSpec,
36    labels: HashMap<String, String>,
37    base_name: String,
38    ordered_values: Vec<String>,
39}
40
41pub fn compile_pipeline(
42    pipeline: &PipelineSpec,
43    rule_ctx: Option<&CompileContext>,
44) -> Result<CompiledPipeline> {
45    let mut jobs = HashMap::new();
46    let mut ordered = Vec::new();
47    let mut expanded_jobs: HashMap<String, Vec<ExpandedVariant>> = HashMap::new();
48    let mut variant_lookup: HashMap<String, Vec<JobVariantInfo>> = HashMap::new();
49
50    for base_job in pipeline.jobs.values() {
51        let base_job = base_job.clone();
52        let variants = expand_job_variants(base_job.clone())?;
53        variant_lookup.insert(
54            base_job.name.clone(),
55            variants
56                .iter()
57                .map(|variant| JobVariantInfo {
58                    name: variant.job.name.clone(),
59                    labels: variant.labels.clone(),
60                    ordered_values: variant.ordered_values.clone(),
61                })
62                .collect(),
63        );
64        expanded_jobs.insert(base_job.name.clone(), variants);
65    }
66
67    // TODO: fox inside a for inside a for, this needs to be split up and structured properly
68
69    for (stage_idx, stage) in pipeline.stages.iter().enumerate() {
70        let default_deps: Vec<String> = if stage_idx == 0 {
71            Vec::new()
72        } else {
73            pipeline.stages[stage_idx - 1].jobs.clone()
74        };
75
76        for job_name in &stage.jobs {
77            let base_job = pipeline
78                .jobs
79                .get(job_name)
80                .cloned()
81                .ok_or_else(|| anyhow!("missing job '{}'", job_name))?;
82            let base_name = base_job.name.clone();
83            let variants = match expanded_jobs.remove(&base_name) {
84                Some(list) => list,
85                None => expand_job_variants(base_job.clone())?,
86            };
87            for mut expanded in variants {
88                let evaluation = if let Some(ctx) = rule_ctx {
89                    if !filters_allow(&expanded.job, ctx) {
90                        if let Some(entry) = variant_lookup.get_mut(&expanded.base_name) {
91                            entry.retain(|meta| meta.name != expanded.job.name);
92                        }
93                        continue;
94                    }
95                    evaluate_rules(&expanded.job, ctx)?
96                } else {
97                    RuleEvaluation::default()
98                };
99                let mut evaluation = evaluation;
100                if expanded.job.rules.is_empty() {
101                    apply_when_config(
102                        &mut evaluation,
103                        expanded.job.when.as_deref(),
104                        None,
105                        Some("manual job"),
106                    );
107                }
108                if !evaluation.included {
109                    if let Some(entry) = variant_lookup.get_mut(&expanded.base_name) {
110                        entry.retain(|meta| meta.name != expanded.job.name);
111                    }
112                    continue;
113                }
114                if !expanded.job.tags.is_empty() {
115                    warn!(
116                        job = %expanded.job.name,
117                        tags = ?expanded.job.tags,
118                        "job has runner tags, but Opal runs locally; ignoring tags"
119                    );
120                }
121                if !evaluation.variables.is_empty() {
122                    expanded.job.variables.extend(evaluation.variables.clone());
123                }
124                let resolved_deps = if expanded.job.explicit_needs {
125                    resolve_parallel_dependencies(
126                        &expanded.job.name,
127                        &expanded.job.needs,
128                        &variant_lookup,
129                    )?
130                } else {
131                    resolve_default_dependencies(&default_deps, &variant_lookup)
132                };
133                let job_timeout = expanded.job.timeout;
134                let job_retry = expanded.job.retry.clone();
135                let job_interruptible = expanded.job.interruptible;
136                let job_resource_group = expanded.job.resource_group.clone();
137                let job_name = expanded.job.name.clone();
138                let job_stage = stage.name.clone();
139                ordered.push(job_name.clone());
140                jobs.insert(
141                    job_name.clone(),
142                    JobInstance {
143                        job: expanded.job,
144                        stage_name: job_stage,
145                        dependencies: resolved_deps,
146                        rule: evaluation.clone(),
147                        timeout: job_timeout,
148                        retry: job_retry,
149                        interruptible: job_interruptible,
150                        resource_group: job_resource_group,
151                    },
152                );
153            }
154        }
155    }
156
157    let mut order_index = HashMap::new();
158    for (idx, name) in ordered.iter().enumerate() {
159        order_index.insert(name.clone(), idx);
160    }
161
162    let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
163    for (name, instance) in &jobs {
164        for dep in &instance.dependencies {
165            if !jobs.contains_key(dep) {
166                return Err(anyhow!("job '{}' depends on unknown job '{}'", name, dep));
167            }
168            dependents
169                .entry(dep.clone())
170                .or_default()
171                .push(name.clone());
172        }
173    }
174
175    for deps in dependents.values_mut() {
176        deps.sort_by_key(|name| order_index.get(name).copied().unwrap_or(usize::MAX));
177    }
178
179    Ok(CompiledPipeline {
180        ordered,
181        jobs,
182        dependents,
183        order_index,
184        variants: variant_lookup,
185    })
186}
187
188fn resolve_parallel_dependencies(
189    owner: &str,
190    deps: &[JobDependencySpec],
191    variant_lookup: &HashMap<String, Vec<JobVariantInfo>>,
192) -> Result<Vec<String>> {
193    let mut resolved = Vec::new();
194    for dep in deps {
195        if !matches!(dep.source, DependencySourceSpec::Local) {
196            continue;
197        }
198        let Some(variants) = variant_lookup.get(&dep.job) else {
199            if dep.optional {
200                continue;
201            } else {
202                return Err(anyhow!(
203                    "job '{}' depends on unknown job '{}'",
204                    owner,
205                    dep.job
206                ));
207            }
208        };
209        let selected = select_variants(variants, dep);
210        if selected.is_empty() {
211            if dep.optional {
212                continue;
213            } else {
214                return Err(anyhow!(
215                    "job '{}' depends on '{}', but no parallel variant matches the requested matrix",
216                    owner,
217                    dep.job
218                ));
219            }
220        }
221        resolved.extend(selected.into_iter().map(|variant| variant.name.clone()));
222    }
223    resolved.sort();
224    resolved.dedup();
225    Ok(resolved)
226}
227
228fn resolve_default_dependencies(
229    defaults: &[String],
230    variant_lookup: &HashMap<String, Vec<JobVariantInfo>>,
231) -> Vec<String> {
232    let mut deps = Vec::new();
233    for name in defaults {
234        if let Some(variants) = variant_lookup.get(name) {
235            deps.extend(variants.iter().map(|variant| variant.name.clone()));
236        }
237    }
238    deps.sort();
239    deps.dedup();
240    deps
241}
242
243fn select_variants<'a>(
244    variants: &'a [JobVariantInfo],
245    dep: &JobDependencySpec,
246) -> Vec<&'a JobVariantInfo> {
247    // TODO: this logic is very nested and seems brittle. evaluate if this should be split
248    // lots of if else if
249    if let Some(filters) = &dep.parallel {
250        variants
251            .iter()
252            .filter(|variant| {
253                filters.iter().any(|filter| {
254                    filter.iter().all(|(key, value)| {
255                        variant
256                            .labels
257                            .get(key)
258                            .map(|current| current == value)
259                            .unwrap_or(false)
260                    })
261                })
262            })
263            .collect()
264    } else if let Some(expected) = &dep.inline_variant {
265        variants
266            .iter()
267            .filter(|variant| &variant.ordered_values == expected)
268            .collect()
269    } else {
270        variants.iter().collect()
271    }
272}
273
274fn expand_job_variants(job: JobSpec) -> Result<Vec<ExpandedVariant>> {
275    let base_name = job.name.clone();
276    let mut variants = Vec::new();
277    match &job.parallel {
278        Some(ParallelConfigSpec::Count(count)) => {
279            let total = (*count).max(1);
280            for idx in 0..total {
281                let mut clone = job.clone();
282                clone.parallel = None;
283                clone.name = format!("{}: [{}]", base_name, idx + 1);
284                clone
285                    .variables
286                    .insert("CI_NODE_INDEX".into(), (idx + 1).to_string());
287                clone
288                    .variables
289                    .insert("CI_NODE_TOTAL".into(), total.to_string());
290                variants.push(ExpandedVariant {
291                    job: clone,
292                    labels: HashMap::new(),
293                    base_name: base_name.clone(),
294                    ordered_values: vec![(idx + 1).to_string()],
295                });
296            }
297        }
298        Some(ParallelConfigSpec::Matrix(entries)) => {
299            let combos = matrix_combinations(entries)?;
300            if combos.len() > 200 {
301                bail!(
302                    "parallel matrix for '{}' produces {} combinations, exceeding the limit of 200",
303                    base_name,
304                    combos.len()
305                );
306            }
307            let total = combos.len();
308            // TODO: for inside a for inside a for loop. garbage, must be refactored and properly structured
309            for (idx, combo) in combos.into_iter().enumerate() {
310                let mut clone = job.clone();
311                clone.parallel = None;
312                let label_text = format_gitlab_variant_values(&combo.ordered);
313                clone.name = format!("{}: [{}]", base_name, label_text);
314                for (key, value) in &combo.ordered {
315                    clone.variables.insert(key.clone(), value.clone());
316                }
317                clone
318                    .variables
319                    .insert("CI_NODE_INDEX".into(), (idx + 1).to_string());
320                clone
321                    .variables
322                    .insert("CI_NODE_TOTAL".into(), total.to_string());
323                let ordered_values = combo
324                    .ordered
325                    .iter()
326                    .map(|(_, value)| value.clone())
327                    .collect();
328                variants.push(ExpandedVariant {
329                    job: clone,
330                    labels: combo.lookup.clone(),
331                    base_name: base_name.clone(),
332                    ordered_values,
333                });
334            }
335        }
336        None => {
337            let mut clone = job.clone();
338            clone.parallel = None;
339            variants.push(ExpandedVariant {
340                job: clone,
341                labels: HashMap::new(),
342                base_name,
343                ordered_values: Vec::new(),
344            });
345            return Ok(variants);
346        }
347    }
348    Ok(variants)
349}
350
351fn matrix_combinations(entries: &[ParallelMatrixEntrySpec]) -> Result<Vec<LabelCombination>> {
352    if entries.is_empty() {
353        return Ok(vec![LabelCombination::empty()]);
354    }
355    // TODO: for inside a for inside a for, needs to be restructured and refactored
356    let mut combos = Vec::new();
357    for entry in entries {
358        let mut entry_combos = vec![LabelCombination::empty()];
359        for var in &entry.variables {
360            let mut new_sets = Vec::new();
361            for combo in &entry_combos {
362                for value in &var.values {
363                    new_sets.push(combo.push(var.name.clone(), value.clone()));
364                }
365            }
366            entry_combos = new_sets;
367        }
368        combos.extend(entry_combos);
369    }
370    Ok(combos)
371}
372
373fn format_gitlab_variant_values(labels: &[(String, String)]) -> String {
374    labels
375        .iter()
376        .map(|(_, value)| value.clone())
377        .collect::<Vec<_>>()
378        .join(", ")
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use crate::gitlab::rules::JobRule;
385    use crate::model::{
386        ArtifactSpec, PipelineDefaultsSpec, PipelineFilterSpec, PipelineSpec, RetryPolicySpec,
387        StageSpec,
388    };
389    use crate::pipeline::rules::RuleContext;
390    use std::path::Path;
391
392    #[test]
393    fn compile_pipeline_applies_rule_variables_and_excludes_unmatched_jobs() {
394        let included = JobSpec {
395            rules: vec![JobRule {
396                if_expr: Some("$CI_COMMIT_BRANCH == \"main\"".into()),
397                variables: HashMap::from([("FROM_RULE".into(), "1".into())]),
398                ..JobRule::default()
399            }],
400            ..job("lint", "test")
401        };
402        let excluded = JobSpec {
403            rules: vec![JobRule {
404                if_expr: Some("$CI_COMMIT_BRANCH == \"release\"".into()),
405                ..JobRule::default()
406            }],
407            ..job("publish", "deploy")
408        };
409        let pipeline = pipeline_spec(
410            vec![
411                StageSpec {
412                    name: "test".into(),
413                    jobs: vec!["lint".into()],
414                },
415                StageSpec {
416                    name: "deploy".into(),
417                    jobs: vec!["publish".into()],
418                },
419            ],
420            vec![included, excluded],
421        );
422        let ctx = RuleContext::new(Path::new("."));
423
424        let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
425
426        assert!(compiled.jobs.contains_key("lint"));
427        assert!(!compiled.jobs.contains_key("publish"));
428        assert_eq!(
429            compiled.jobs["lint"].job.variables.get("FROM_RULE"),
430            Some(&"1".to_string())
431        );
432        assert!(
433            compiled
434                .variants
435                .get("publish")
436                .is_none_or(|variants| variants.is_empty())
437        );
438    }
439
440    #[test]
441    fn compile_pipeline_resolves_matrix_needs_to_variant_dependencies() {
442        let pipeline = PipelineSpec::from_path(Path::new(
443            "pipelines/tests/needs-and-artifacts.gitlab-ci.yml",
444        ))
445        .expect("pipeline loads");
446        let ctx = RuleContext::new(Path::new("."));
447
448        let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
449
450        let package = compiled
451            .jobs
452            .get("package-linux")
453            .expect("package job exists");
454        assert!(compiled.jobs.contains_key("build-matrix: [linux, release]"));
455        assert!(
456            package
457                .dependencies
458                .iter()
459                .any(|dep| dep == "build-matrix: [linux, release]")
460        );
461
462        let matrix_need = package
463            .job
464            .needs
465            .iter()
466            .find(|need| need.job == "build-matrix")
467            .expect("matrix dependency present");
468        let variants = compiled.variants_for_dependency(matrix_need);
469        assert_eq!(variants, vec!["build-matrix: [linux, release]".to_string()]);
470    }
471
472    #[test]
473    fn compile_pipeline_applies_job_only_except_filters() {
474        let branch_only = JobSpec {
475            only: vec!["branches".into()],
476            ..job("branch-only", "build")
477        };
478        let tag_only = JobSpec {
479            only: vec!["tags".into()],
480            ..job("tag-only", "test")
481        };
482        let release_excluded = JobSpec {
483            only: vec!["branches".into()],
484            except: vec!["/^release\\/.*$/".into()],
485            ..job("no-release", "test")
486        };
487        let pipeline = pipeline_spec(
488            vec![
489                StageSpec {
490                    name: "build".into(),
491                    jobs: vec!["branch-only".into()],
492                },
493                StageSpec {
494                    name: "test".into(),
495                    jobs: vec!["tag-only".into(), "no-release".into()],
496                },
497            ],
498            vec![branch_only, tag_only, release_excluded],
499        );
500        let temp = tempfile::tempdir().expect("tempdir");
501        let branch_ctx = RuleContext::from_env(
502            temp.path(),
503            HashMap::from([
504                ("CI_PIPELINE_SOURCE".into(), "push".into()),
505                ("CI_COMMIT_BRANCH".into(), "main".into()),
506                ("CI_COMMIT_REF_NAME".into(), "main".into()),
507            ]),
508            false,
509        );
510
511        let branch_compiled =
512            compile_pipeline(&pipeline, Some(&branch_ctx)).expect("branch pipeline compiles");
513
514        assert!(branch_compiled.jobs.contains_key("branch-only"));
515        assert!(branch_compiled.jobs.contains_key("no-release"));
516        assert!(!branch_compiled.jobs.contains_key("tag-only"));
517
518        let tag_ctx = RuleContext::from_env(
519            Path::new("."),
520            HashMap::from([
521                ("CI_PIPELINE_SOURCE".into(), "push".into()),
522                ("CI_COMMIT_TAG".into(), "v1.2.0".into()),
523                ("CI_COMMIT_REF_NAME".into(), "v1.2.0".into()),
524            ]),
525            false,
526        );
527        let tag_compiled =
528            compile_pipeline(&pipeline, Some(&tag_ctx)).expect("tag pipeline compiles");
529
530        assert!(!tag_compiled.jobs.contains_key("branch-only"));
531        assert!(tag_compiled.jobs.contains_key("tag-only"));
532    }
533
534    #[test]
535    fn compile_pipeline_applies_job_level_manual_when() {
536        let pipeline = pipeline_spec(
537            vec![StageSpec {
538                name: "deploy".into(),
539                jobs: vec!["stop-review".into()],
540            }],
541            vec![JobSpec {
542                when: Some("manual".into()),
543                ..job("stop-review", "deploy")
544            }],
545        );
546
547        let compiled = compile_pipeline(&pipeline, Some(&RuleContext::new(Path::new("."))))
548            .expect("pipeline compiles");
549
550        let stop_review = compiled.jobs.get("stop-review").expect("job exists");
551        assert_eq!(stop_review.rule.when, crate::pipeline::RuleWhen::Manual);
552        assert_eq!(
553            stop_review.rule.manual_reason.as_deref(),
554            Some("manual job")
555        );
556    }
557
558    #[test]
559    fn compile_pipeline_preserves_job_level_manual_when_from_fixture() {
560        let pipeline =
561            PipelineSpec::from_path(Path::new("pipelines/tests/environments.gitlab-ci.yml"))
562                .expect("pipeline loads");
563        let ctx = RuleContext::from_env(
564            Path::new("."),
565            HashMap::from([
566                ("CI_PIPELINE_SOURCE".into(), "push".into()),
567                ("CI_COMMIT_BRANCH".into(), "main".into()),
568                ("CI_COMMIT_REF_NAME".into(), "main".into()),
569                ("CI_COMMIT_REF_SLUG".into(), "main".into()),
570            ]),
571            false,
572        );
573
574        let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
575        let stop_review = compiled.jobs.get("stop-review").expect("job exists");
576
577        assert_eq!(stop_review.rule.when, crate::pipeline::RuleWhen::Manual);
578    }
579
580    fn pipeline_spec(stages: Vec<StageSpec>, jobs: Vec<JobSpec>) -> PipelineSpec {
581        PipelineSpec {
582            stages,
583            jobs: jobs
584                .into_iter()
585                .map(|job| (job.name.clone(), job))
586                .collect::<HashMap<_, _>>(),
587            defaults: PipelineDefaultsSpec::default(),
588            workflow: None,
589            filters: PipelineFilterSpec::default(),
590        }
591    }
592
593    fn job(name: &str, stage: &str) -> JobSpec {
594        JobSpec {
595            name: name.into(),
596            stage: stage.into(),
597            commands: vec!["true".into()],
598            needs: Vec::new(),
599            explicit_needs: false,
600            dependencies: Vec::new(),
601            before_script: None,
602            after_script: None,
603            inherit_default_before_script: true,
604            inherit_default_after_script: true,
605            inherit_default_image: true,
606            inherit_default_cache: true,
607            inherit_default_services: true,
608            inherit_default_timeout: true,
609            inherit_default_retry: true,
610            inherit_default_interruptible: true,
611            when: None,
612            rules: Vec::new(),
613            only: Vec::new(),
614            except: Vec::new(),
615            artifacts: ArtifactSpec::default(),
616            cache: Vec::new(),
617            image: None,
618            variables: HashMap::new(),
619            services: Vec::new(),
620            timeout: None,
621            retry: RetryPolicySpec::default(),
622            interruptible: false,
623            resource_group: None,
624            parallel: None,
625            tags: Vec::new(),
626            environment: None,
627        }
628    }
629}