1use crate::compiler::{CompileContext, CompiledPipeline, JobInstance, JobVariantInfo};
2use crate::model::{
3 DependencySourceSpec, JobDependencySpec, JobSpec, ParallelConfigSpec, ParallelMatrixEntrySpec,
4 PipelineSpec,
5};
6use crate::pipeline::rules::{RuleEvaluation, apply_when_config, evaluate_rules, filters_allow};
7use anyhow::{Result, anyhow, bail};
8use std::collections::HashMap;
9use tracing::warn;
10
11#[derive(Clone)]
12struct LabelCombination {
13 ordered: Vec<(String, String)>,
14 lookup: HashMap<String, String>,
15}
16
17impl LabelCombination {
18 fn empty() -> Self {
19 Self {
20 ordered: Vec::new(),
21 lookup: HashMap::new(),
22 }
23 }
24
25 fn push(&self, key: String, value: String) -> Self {
26 let mut ordered = self.ordered.clone();
27 ordered.push((key.clone(), value.clone()));
28 let mut lookup = self.lookup.clone();
29 lookup.insert(key, value);
30 Self { ordered, lookup }
31 }
32}
33
34struct ExpandedVariant {
35 job: JobSpec,
36 labels: HashMap<String, String>,
37 base_name: String,
38 ordered_values: Vec<String>,
39}
40
41pub fn compile_pipeline(
42 pipeline: &PipelineSpec,
43 rule_ctx: Option<&CompileContext>,
44) -> Result<CompiledPipeline> {
45 let mut jobs = HashMap::new();
46 let mut ordered = Vec::new();
47 let mut expanded_jobs: HashMap<String, Vec<ExpandedVariant>> = HashMap::new();
48 let mut variant_lookup: HashMap<String, Vec<JobVariantInfo>> = HashMap::new();
49
50 for base_job in pipeline.jobs.values() {
51 let base_job = base_job.clone();
52 let variants = expand_job_variants(base_job.clone())?;
53 variant_lookup.insert(
54 base_job.name.clone(),
55 variants
56 .iter()
57 .map(|variant| JobVariantInfo {
58 name: variant.job.name.clone(),
59 labels: variant.labels.clone(),
60 ordered_values: variant.ordered_values.clone(),
61 })
62 .collect(),
63 );
64 expanded_jobs.insert(base_job.name.clone(), variants);
65 }
66
67 for (stage_idx, stage) in pipeline.stages.iter().enumerate() {
70 let default_deps: Vec<String> = if stage_idx == 0 {
71 Vec::new()
72 } else {
73 pipeline.stages[stage_idx - 1].jobs.clone()
74 };
75
76 for job_name in &stage.jobs {
77 let base_job = pipeline
78 .jobs
79 .get(job_name)
80 .cloned()
81 .ok_or_else(|| anyhow!("missing job '{}'", job_name))?;
82 let base_name = base_job.name.clone();
83 let variants = match expanded_jobs.remove(&base_name) {
84 Some(list) => list,
85 None => expand_job_variants(base_job.clone())?,
86 };
87 for mut expanded in variants {
88 let evaluation = if let Some(ctx) = rule_ctx {
89 if !filters_allow(&expanded.job, ctx) {
90 if let Some(entry) = variant_lookup.get_mut(&expanded.base_name) {
91 entry.retain(|meta| meta.name != expanded.job.name);
92 }
93 continue;
94 }
95 evaluate_rules(&expanded.job, ctx)?
96 } else {
97 RuleEvaluation::default()
98 };
99 let mut evaluation = evaluation;
100 if expanded.job.rules.is_empty() {
101 apply_when_config(
102 &mut evaluation,
103 expanded.job.when.as_deref(),
104 None,
105 Some("manual job"),
106 );
107 }
108 if !evaluation.included {
109 if let Some(entry) = variant_lookup.get_mut(&expanded.base_name) {
110 entry.retain(|meta| meta.name != expanded.job.name);
111 }
112 continue;
113 }
114 if !expanded.job.tags.is_empty() {
115 warn!(
116 job = %expanded.job.name,
117 tags = ?expanded.job.tags,
118 "job has runner tags, but Opal runs locally; ignoring tags"
119 );
120 }
121 if !evaluation.variables.is_empty() {
122 expanded.job.variables.extend(evaluation.variables.clone());
123 }
124 let resolved_deps = if expanded.job.explicit_needs {
125 resolve_parallel_dependencies(
126 &expanded.job.name,
127 &expanded.job.needs,
128 &variant_lookup,
129 )?
130 } else {
131 resolve_default_dependencies(&default_deps, &variant_lookup)
132 };
133 let job_timeout = expanded.job.timeout;
134 let job_retry = expanded.job.retry.clone();
135 let job_interruptible = expanded.job.interruptible;
136 let job_resource_group = expanded.job.resource_group.clone();
137 let job_name = expanded.job.name.clone();
138 let job_stage = stage.name.clone();
139 ordered.push(job_name.clone());
140 jobs.insert(
141 job_name.clone(),
142 JobInstance {
143 job: expanded.job,
144 stage_name: job_stage,
145 dependencies: resolved_deps,
146 rule: evaluation.clone(),
147 timeout: job_timeout,
148 retry: job_retry,
149 interruptible: job_interruptible,
150 resource_group: job_resource_group,
151 },
152 );
153 }
154 }
155 }
156
157 let mut order_index = HashMap::new();
158 for (idx, name) in ordered.iter().enumerate() {
159 order_index.insert(name.clone(), idx);
160 }
161
162 let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
163 for (name, instance) in &jobs {
164 for dep in &instance.dependencies {
165 if !jobs.contains_key(dep) {
166 return Err(anyhow!("job '{}' depends on unknown job '{}'", name, dep));
167 }
168 dependents
169 .entry(dep.clone())
170 .or_default()
171 .push(name.clone());
172 }
173 }
174
175 for deps in dependents.values_mut() {
176 deps.sort_by_key(|name| order_index.get(name).copied().unwrap_or(usize::MAX));
177 }
178
179 Ok(CompiledPipeline {
180 ordered,
181 jobs,
182 dependents,
183 order_index,
184 variants: variant_lookup,
185 })
186}
187
188fn resolve_parallel_dependencies(
189 owner: &str,
190 deps: &[JobDependencySpec],
191 variant_lookup: &HashMap<String, Vec<JobVariantInfo>>,
192) -> Result<Vec<String>> {
193 let mut resolved = Vec::new();
194 for dep in deps {
195 if !matches!(dep.source, DependencySourceSpec::Local) {
196 continue;
197 }
198 let Some(variants) = variant_lookup.get(&dep.job) else {
199 if dep.optional {
200 continue;
201 } else {
202 return Err(anyhow!(
203 "job '{}' depends on unknown job '{}'",
204 owner,
205 dep.job
206 ));
207 }
208 };
209 let selected = select_variants(variants, dep);
210 if selected.is_empty() {
211 if dep.optional {
212 continue;
213 } else {
214 return Err(anyhow!(
215 "job '{}' depends on '{}', but no parallel variant matches the requested matrix",
216 owner,
217 dep.job
218 ));
219 }
220 }
221 resolved.extend(selected.into_iter().map(|variant| variant.name.clone()));
222 }
223 resolved.sort();
224 resolved.dedup();
225 Ok(resolved)
226}
227
228fn resolve_default_dependencies(
229 defaults: &[String],
230 variant_lookup: &HashMap<String, Vec<JobVariantInfo>>,
231) -> Vec<String> {
232 let mut deps = Vec::new();
233 for name in defaults {
234 if let Some(variants) = variant_lookup.get(name) {
235 deps.extend(variants.iter().map(|variant| variant.name.clone()));
236 }
237 }
238 deps.sort();
239 deps.dedup();
240 deps
241}
242
243fn select_variants<'a>(
244 variants: &'a [JobVariantInfo],
245 dep: &JobDependencySpec,
246) -> Vec<&'a JobVariantInfo> {
247 if let Some(filters) = &dep.parallel {
250 variants
251 .iter()
252 .filter(|variant| {
253 filters.iter().any(|filter| {
254 filter.iter().all(|(key, value)| {
255 variant
256 .labels
257 .get(key)
258 .map(|current| current == value)
259 .unwrap_or(false)
260 })
261 })
262 })
263 .collect()
264 } else if let Some(expected) = &dep.inline_variant {
265 variants
266 .iter()
267 .filter(|variant| &variant.ordered_values == expected)
268 .collect()
269 } else {
270 variants.iter().collect()
271 }
272}
273
274fn expand_job_variants(job: JobSpec) -> Result<Vec<ExpandedVariant>> {
275 let base_name = job.name.clone();
276 let mut variants = Vec::new();
277 match &job.parallel {
278 Some(ParallelConfigSpec::Count(count)) => {
279 let total = (*count).max(1);
280 for idx in 0..total {
281 let mut clone = job.clone();
282 clone.parallel = None;
283 clone.name = format!("{}: [{}]", base_name, idx + 1);
284 clone
285 .variables
286 .insert("CI_NODE_INDEX".into(), (idx + 1).to_string());
287 clone
288 .variables
289 .insert("CI_NODE_TOTAL".into(), total.to_string());
290 variants.push(ExpandedVariant {
291 job: clone,
292 labels: HashMap::new(),
293 base_name: base_name.clone(),
294 ordered_values: vec![(idx + 1).to_string()],
295 });
296 }
297 }
298 Some(ParallelConfigSpec::Matrix(entries)) => {
299 let combos = matrix_combinations(entries)?;
300 if combos.len() > 200 {
301 bail!(
302 "parallel matrix for '{}' produces {} combinations, exceeding the limit of 200",
303 base_name,
304 combos.len()
305 );
306 }
307 let total = combos.len();
308 for (idx, combo) in combos.into_iter().enumerate() {
310 let mut clone = job.clone();
311 clone.parallel = None;
312 let label_text = format_gitlab_variant_values(&combo.ordered);
313 clone.name = format!("{}: [{}]", base_name, label_text);
314 for (key, value) in &combo.ordered {
315 clone.variables.insert(key.clone(), value.clone());
316 }
317 clone
318 .variables
319 .insert("CI_NODE_INDEX".into(), (idx + 1).to_string());
320 clone
321 .variables
322 .insert("CI_NODE_TOTAL".into(), total.to_string());
323 let ordered_values = combo
324 .ordered
325 .iter()
326 .map(|(_, value)| value.clone())
327 .collect();
328 variants.push(ExpandedVariant {
329 job: clone,
330 labels: combo.lookup.clone(),
331 base_name: base_name.clone(),
332 ordered_values,
333 });
334 }
335 }
336 None => {
337 let mut clone = job.clone();
338 clone.parallel = None;
339 variants.push(ExpandedVariant {
340 job: clone,
341 labels: HashMap::new(),
342 base_name,
343 ordered_values: Vec::new(),
344 });
345 return Ok(variants);
346 }
347 }
348 Ok(variants)
349}
350
351fn matrix_combinations(entries: &[ParallelMatrixEntrySpec]) -> Result<Vec<LabelCombination>> {
352 if entries.is_empty() {
353 return Ok(vec![LabelCombination::empty()]);
354 }
355 let mut combos = Vec::new();
357 for entry in entries {
358 let mut entry_combos = vec![LabelCombination::empty()];
359 for var in &entry.variables {
360 let mut new_sets = Vec::new();
361 for combo in &entry_combos {
362 for value in &var.values {
363 new_sets.push(combo.push(var.name.clone(), value.clone()));
364 }
365 }
366 entry_combos = new_sets;
367 }
368 combos.extend(entry_combos);
369 }
370 Ok(combos)
371}
372
373fn format_gitlab_variant_values(labels: &[(String, String)]) -> String {
374 labels
375 .iter()
376 .map(|(_, value)| value.clone())
377 .collect::<Vec<_>>()
378 .join(", ")
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use crate::gitlab::rules::JobRule;
385 use crate::model::{
386 ArtifactSpec, PipelineDefaultsSpec, PipelineFilterSpec, PipelineSpec, RetryPolicySpec,
387 StageSpec,
388 };
389 use crate::pipeline::rules::RuleContext;
390 use std::collections::HashMap;
391 use std::path::Path;
392
393 #[test]
394 fn compile_pipeline_applies_rule_variables_and_excludes_unmatched_jobs() {
395 let included = JobSpec {
396 rules: vec![JobRule {
397 if_expr: Some("$CI_COMMIT_BRANCH == \"main\"".into()),
398 variables: HashMap::from([("FROM_RULE".into(), "1".into())]),
399 ..JobRule::default()
400 }],
401 ..job("lint", "test")
402 };
403 let excluded = JobSpec {
404 rules: vec![JobRule {
405 if_expr: Some("$CI_COMMIT_BRANCH == \"release\"".into()),
406 ..JobRule::default()
407 }],
408 ..job("publish", "deploy")
409 };
410 let pipeline = pipeline_spec(
411 vec![
412 StageSpec {
413 name: "test".into(),
414 jobs: vec!["lint".into()],
415 },
416 StageSpec {
417 name: "deploy".into(),
418 jobs: vec!["publish".into()],
419 },
420 ],
421 vec![included, excluded],
422 );
423 let temp = tempfile::tempdir().expect("tempdir");
424 let ctx = RuleContext::from_env(
425 temp.path(),
426 HashMap::from([
427 ("CI_PIPELINE_SOURCE".into(), "push".into()),
428 ("CI_COMMIT_BRANCH".into(), "main".into()),
429 ("CI_COMMIT_REF_NAME".into(), "main".into()),
430 ("CI_COMMIT_REF_SLUG".into(), "main".into()),
431 ]),
432 false,
433 );
434
435 let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
436
437 assert!(compiled.jobs.contains_key("lint"));
438 assert!(!compiled.jobs.contains_key("publish"));
439 assert_eq!(
440 compiled.jobs["lint"].job.variables.get("FROM_RULE"),
441 Some(&"1".to_string())
442 );
443 assert!(
444 compiled
445 .variants
446 .get("publish")
447 .is_none_or(|variants| variants.is_empty())
448 );
449 }
450
451 #[test]
452 fn compile_pipeline_resolves_matrix_needs_to_variant_dependencies() {
453 let pipeline = PipelineSpec::from_path(Path::new(
454 "pipelines/tests/needs-and-artifacts.gitlab-ci.yml",
455 ))
456 .expect("pipeline loads");
457 let ctx = RuleContext::from_env(
458 Path::new("."),
459 HashMap::from([
460 ("CI_PIPELINE_SOURCE".into(), "push".into()),
461 ("CI_COMMIT_BRANCH".into(), "main".into()),
462 ("CI_COMMIT_REF_NAME".into(), "main".into()),
463 ("CI_COMMIT_REF_SLUG".into(), "main".into()),
464 ]),
465 false,
466 );
467
468 let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
469
470 let package = compiled
471 .jobs
472 .get("package-linux")
473 .expect("package job exists");
474 assert!(compiled.jobs.contains_key("build-matrix: [linux, release]"));
475 assert!(
476 package
477 .dependencies
478 .iter()
479 .any(|dep| dep == "build-matrix: [linux, release]")
480 );
481
482 let matrix_need = package
483 .job
484 .needs
485 .iter()
486 .find(|need| need.job == "build-matrix")
487 .expect("matrix dependency present");
488 let variants = compiled.variants_for_dependency(matrix_need);
489 assert_eq!(variants, vec!["build-matrix: [linux, release]".to_string()]);
490 }
491
492 #[test]
493 fn compile_pipeline_applies_job_only_except_filters() {
494 let branch_only = JobSpec {
495 only: vec!["branches".into()],
496 ..job("branch-only", "build")
497 };
498 let tag_only = JobSpec {
499 only: vec!["tags".into()],
500 ..job("tag-only", "test")
501 };
502 let release_excluded = JobSpec {
503 only: vec!["branches".into()],
504 except: vec!["/^release\\/.*$/".into()],
505 ..job("no-release", "test")
506 };
507 let pipeline = pipeline_spec(
508 vec![
509 StageSpec {
510 name: "build".into(),
511 jobs: vec!["branch-only".into()],
512 },
513 StageSpec {
514 name: "test".into(),
515 jobs: vec!["tag-only".into(), "no-release".into()],
516 },
517 ],
518 vec![branch_only, tag_only, release_excluded],
519 );
520 let temp = tempfile::tempdir().expect("tempdir");
521 let branch_ctx = RuleContext::from_env(
522 temp.path(),
523 HashMap::from([
524 ("CI_PIPELINE_SOURCE".into(), "push".into()),
525 ("CI_COMMIT_BRANCH".into(), "main".into()),
526 ("CI_COMMIT_REF_NAME".into(), "main".into()),
527 ]),
528 false,
529 );
530
531 let branch_compiled =
532 compile_pipeline(&pipeline, Some(&branch_ctx)).expect("branch pipeline compiles");
533
534 assert!(branch_compiled.jobs.contains_key("branch-only"));
535 assert!(branch_compiled.jobs.contains_key("no-release"));
536 assert!(!branch_compiled.jobs.contains_key("tag-only"));
537
538 let tag_ctx = RuleContext::from_env(
539 Path::new("."),
540 HashMap::from([
541 ("CI_PIPELINE_SOURCE".into(), "push".into()),
542 ("CI_COMMIT_TAG".into(), "v1.2.0".into()),
543 ("CI_COMMIT_REF_NAME".into(), "v1.2.0".into()),
544 ]),
545 false,
546 );
547 let tag_compiled =
548 compile_pipeline(&pipeline, Some(&tag_ctx)).expect("tag pipeline compiles");
549
550 assert!(!tag_compiled.jobs.contains_key("branch-only"));
551 assert!(tag_compiled.jobs.contains_key("tag-only"));
552 }
553
554 #[test]
555 fn compile_pipeline_applies_job_level_manual_when() {
556 let pipeline = pipeline_spec(
557 vec![StageSpec {
558 name: "deploy".into(),
559 jobs: vec!["stop-review".into()],
560 }],
561 vec![JobSpec {
562 when: Some("manual".into()),
563 ..job("stop-review", "deploy")
564 }],
565 );
566
567 let temp = tempfile::tempdir().expect("tempdir");
568 let ctx = RuleContext::from_env(temp.path(), HashMap::new(), false);
569 let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
570
571 let stop_review = compiled.jobs.get("stop-review").expect("job exists");
572 assert_eq!(stop_review.rule.when, crate::pipeline::RuleWhen::Manual);
573 assert_eq!(
574 stop_review.rule.manual_reason.as_deref(),
575 Some("manual job")
576 );
577 }
578
579 #[test]
580 fn compile_pipeline_preserves_job_level_manual_when_from_fixture() {
581 let pipeline =
582 PipelineSpec::from_path(Path::new("pipelines/tests/environments.gitlab-ci.yml"))
583 .expect("pipeline loads");
584 let ctx = RuleContext::from_env(
585 Path::new("."),
586 HashMap::from([
587 ("CI_PIPELINE_SOURCE".into(), "push".into()),
588 ("CI_COMMIT_BRANCH".into(), "main".into()),
589 ("CI_COMMIT_REF_NAME".into(), "main".into()),
590 ("CI_COMMIT_REF_SLUG".into(), "main".into()),
591 ]),
592 false,
593 );
594
595 let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
596 let stop_review = compiled.jobs.get("stop-review").expect("job exists");
597
598 assert_eq!(stop_review.rule.when, crate::pipeline::RuleWhen::Manual);
599 }
600
601 fn pipeline_spec(stages: Vec<StageSpec>, jobs: Vec<JobSpec>) -> PipelineSpec {
602 PipelineSpec {
603 stages,
604 jobs: jobs
605 .into_iter()
606 .map(|job| (job.name.clone(), job))
607 .collect::<HashMap<_, _>>(),
608 defaults: PipelineDefaultsSpec::default(),
609 workflow: None,
610 filters: PipelineFilterSpec::default(),
611 }
612 }
613
614 fn job(name: &str, stage: &str) -> JobSpec {
615 JobSpec {
616 name: name.into(),
617 stage: stage.into(),
618 commands: vec!["true".into()],
619 needs: Vec::new(),
620 explicit_needs: false,
621 dependencies: Vec::new(),
622 before_script: None,
623 after_script: None,
624 inherit_default_before_script: true,
625 inherit_default_after_script: true,
626 inherit_default_image: true,
627 inherit_default_cache: true,
628 inherit_default_services: true,
629 inherit_default_timeout: true,
630 inherit_default_retry: true,
631 inherit_default_interruptible: true,
632 when: None,
633 rules: Vec::new(),
634 only: Vec::new(),
635 except: Vec::new(),
636 artifacts: ArtifactSpec::default(),
637 cache: Vec::new(),
638 image: None,
639 variables: HashMap::new(),
640 services: Vec::new(),
641 timeout: None,
642 retry: RetryPolicySpec::default(),
643 interruptible: false,
644 resource_group: None,
645 parallel: None,
646 tags: Vec::new(),
647 environment: None,
648 }
649 }
650}