1use crate::compiler::{CompileContext, CompiledPipeline, JobInstance, JobVariantInfo};
2use crate::model::{
3 DependencySourceSpec, JobDependencySpec, JobSpec, ParallelConfigSpec, ParallelMatrixEntrySpec,
4 PipelineSpec,
5};
6use crate::pipeline::rules::{RuleEvaluation, apply_when_config, evaluate_rules, filters_allow};
7use anyhow::{Result, anyhow, bail};
8use std::collections::HashMap;
9use tracing::warn;
10
11#[derive(Clone)]
12struct LabelCombination {
13 ordered: Vec<(String, String)>,
14 lookup: HashMap<String, String>,
15}
16
17impl LabelCombination {
18 fn empty() -> Self {
19 Self {
20 ordered: Vec::new(),
21 lookup: HashMap::new(),
22 }
23 }
24
25 fn push(&self, key: String, value: String) -> Self {
26 let mut ordered = self.ordered.clone();
27 ordered.push((key.clone(), value.clone()));
28 let mut lookup = self.lookup.clone();
29 lookup.insert(key, value);
30 Self { ordered, lookup }
31 }
32}
33
34struct ExpandedVariant {
35 job: JobSpec,
36 labels: HashMap<String, String>,
37 base_name: String,
38 ordered_values: Vec<String>,
39}
40
41pub fn compile_pipeline(
42 pipeline: &PipelineSpec,
43 rule_ctx: Option<&CompileContext>,
44) -> Result<CompiledPipeline> {
45 let mut jobs = HashMap::new();
46 let mut ordered = Vec::new();
47 let mut expanded_jobs: HashMap<String, Vec<ExpandedVariant>> = HashMap::new();
48 let mut variant_lookup: HashMap<String, Vec<JobVariantInfo>> = HashMap::new();
49
50 for base_job in pipeline.jobs.values() {
51 let base_job = base_job.clone();
52 let variants = expand_job_variants(base_job.clone())?;
53 variant_lookup.insert(
54 base_job.name.clone(),
55 variants
56 .iter()
57 .map(|variant| JobVariantInfo {
58 name: variant.job.name.clone(),
59 labels: variant.labels.clone(),
60 ordered_values: variant.ordered_values.clone(),
61 })
62 .collect(),
63 );
64 expanded_jobs.insert(base_job.name.clone(), variants);
65 }
66
67 for (stage_idx, stage) in pipeline.stages.iter().enumerate() {
70 let default_deps: Vec<String> = if stage_idx == 0 {
71 Vec::new()
72 } else {
73 pipeline.stages[stage_idx - 1].jobs.clone()
74 };
75
76 for job_name in &stage.jobs {
77 let base_job = pipeline
78 .jobs
79 .get(job_name)
80 .cloned()
81 .ok_or_else(|| anyhow!("missing job '{}'", job_name))?;
82 let base_name = base_job.name.clone();
83 let variants = match expanded_jobs.remove(&base_name) {
84 Some(list) => list,
85 None => expand_job_variants(base_job.clone())?,
86 };
87 for mut expanded in variants {
88 let evaluation = if let Some(ctx) = rule_ctx {
89 if !filters_allow(&expanded.job, ctx) {
90 if let Some(entry) = variant_lookup.get_mut(&expanded.base_name) {
91 entry.retain(|meta| meta.name != expanded.job.name);
92 }
93 continue;
94 }
95 evaluate_rules(&expanded.job, ctx)?
96 } else {
97 RuleEvaluation::default()
98 };
99 let mut evaluation = evaluation;
100 if expanded.job.rules.is_empty() {
101 apply_when_config(
102 &mut evaluation,
103 expanded.job.when.as_deref(),
104 None,
105 Some("manual job"),
106 );
107 }
108 if !evaluation.included {
109 if let Some(entry) = variant_lookup.get_mut(&expanded.base_name) {
110 entry.retain(|meta| meta.name != expanded.job.name);
111 }
112 continue;
113 }
114 if !expanded.job.tags.is_empty() {
115 warn!(
116 job = %expanded.job.name,
117 tags = ?expanded.job.tags,
118 "job has runner tags, but Opal runs locally; ignoring tags"
119 );
120 }
121 if !evaluation.variables.is_empty() {
122 expanded.job.variables.extend(evaluation.variables.clone());
123 }
124 let resolved_deps = if expanded.job.explicit_needs {
125 resolve_parallel_dependencies(
126 &expanded.job.name,
127 &expanded.job.needs,
128 &variant_lookup,
129 )?
130 } else {
131 resolve_default_dependencies(&default_deps, &variant_lookup)
132 };
133 let job_timeout = expanded.job.timeout;
134 let job_retry = expanded.job.retry.clone();
135 let job_interruptible = expanded.job.interruptible;
136 let job_resource_group = expanded.job.resource_group.clone();
137 let job_name = expanded.job.name.clone();
138 let job_stage = stage.name.clone();
139 ordered.push(job_name.clone());
140 jobs.insert(
141 job_name.clone(),
142 JobInstance {
143 job: expanded.job,
144 stage_name: job_stage,
145 dependencies: resolved_deps,
146 rule: evaluation.clone(),
147 timeout: job_timeout,
148 retry: job_retry,
149 interruptible: job_interruptible,
150 resource_group: job_resource_group,
151 },
152 );
153 }
154 }
155 }
156
157 let mut order_index = HashMap::new();
158 for (idx, name) in ordered.iter().enumerate() {
159 order_index.insert(name.clone(), idx);
160 }
161
162 let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
163 for (name, instance) in &jobs {
164 for dep in &instance.dependencies {
165 if !jobs.contains_key(dep) {
166 return Err(anyhow!("job '{}' depends on unknown job '{}'", name, dep));
167 }
168 dependents
169 .entry(dep.clone())
170 .or_default()
171 .push(name.clone());
172 }
173 }
174
175 for deps in dependents.values_mut() {
176 deps.sort_by_key(|name| order_index.get(name).copied().unwrap_or(usize::MAX));
177 }
178
179 Ok(CompiledPipeline {
180 ordered,
181 jobs,
182 dependents,
183 order_index,
184 variants: variant_lookup,
185 })
186}
187
188fn resolve_parallel_dependencies(
189 owner: &str,
190 deps: &[JobDependencySpec],
191 variant_lookup: &HashMap<String, Vec<JobVariantInfo>>,
192) -> Result<Vec<String>> {
193 let mut resolved = Vec::new();
194 for dep in deps {
195 if !matches!(dep.source, DependencySourceSpec::Local) {
196 continue;
197 }
198 let Some(variants) = variant_lookup.get(&dep.job) else {
199 if dep.optional {
200 continue;
201 } else {
202 return Err(anyhow!(
203 "job '{}' depends on unknown job '{}'",
204 owner,
205 dep.job
206 ));
207 }
208 };
209 let selected = select_variants(variants, dep);
210 if selected.is_empty() {
211 if dep.optional {
212 continue;
213 } else {
214 return Err(anyhow!(
215 "job '{}' depends on '{}', but no parallel variant matches the requested matrix",
216 owner,
217 dep.job
218 ));
219 }
220 }
221 resolved.extend(selected.into_iter().map(|variant| variant.name.clone()));
222 }
223 resolved.sort();
224 resolved.dedup();
225 Ok(resolved)
226}
227
228fn resolve_default_dependencies(
229 defaults: &[String],
230 variant_lookup: &HashMap<String, Vec<JobVariantInfo>>,
231) -> Vec<String> {
232 let mut deps = Vec::new();
233 for name in defaults {
234 if let Some(variants) = variant_lookup.get(name) {
235 deps.extend(variants.iter().map(|variant| variant.name.clone()));
236 }
237 }
238 deps.sort();
239 deps.dedup();
240 deps
241}
242
243fn select_variants<'a>(
244 variants: &'a [JobVariantInfo],
245 dep: &JobDependencySpec,
246) -> Vec<&'a JobVariantInfo> {
247 if let Some(filters) = &dep.parallel {
250 variants
251 .iter()
252 .filter(|variant| {
253 filters.iter().any(|filter| {
254 filter.iter().all(|(key, value)| {
255 variant
256 .labels
257 .get(key)
258 .map(|current| current == value)
259 .unwrap_or(false)
260 })
261 })
262 })
263 .collect()
264 } else if let Some(expected) = &dep.inline_variant {
265 variants
266 .iter()
267 .filter(|variant| &variant.ordered_values == expected)
268 .collect()
269 } else {
270 variants.iter().collect()
271 }
272}
273
274fn expand_job_variants(job: JobSpec) -> Result<Vec<ExpandedVariant>> {
275 let base_name = job.name.clone();
276 let mut variants = Vec::new();
277 match &job.parallel {
278 Some(ParallelConfigSpec::Count(count)) => {
279 let total = (*count).max(1);
280 for idx in 0..total {
281 let mut clone = job.clone();
282 clone.parallel = None;
283 clone.name = format!("{}: [{}]", base_name, idx + 1);
284 clone
285 .variables
286 .insert("CI_NODE_INDEX".into(), (idx + 1).to_string());
287 clone
288 .variables
289 .insert("CI_NODE_TOTAL".into(), total.to_string());
290 variants.push(ExpandedVariant {
291 job: clone,
292 labels: HashMap::new(),
293 base_name: base_name.clone(),
294 ordered_values: vec![(idx + 1).to_string()],
295 });
296 }
297 }
298 Some(ParallelConfigSpec::Matrix(entries)) => {
299 let combos = matrix_combinations(entries)?;
300 if combos.len() > 200 {
301 bail!(
302 "parallel matrix for '{}' produces {} combinations, exceeding the limit of 200",
303 base_name,
304 combos.len()
305 );
306 }
307 let total = combos.len();
308 for (idx, combo) in combos.into_iter().enumerate() {
310 let mut clone = job.clone();
311 clone.parallel = None;
312 let label_text = format_gitlab_variant_values(&combo.ordered);
313 clone.name = format!("{}: [{}]", base_name, label_text);
314 for (key, value) in &combo.ordered {
315 clone.variables.insert(key.clone(), value.clone());
316 }
317 clone
318 .variables
319 .insert("CI_NODE_INDEX".into(), (idx + 1).to_string());
320 clone
321 .variables
322 .insert("CI_NODE_TOTAL".into(), total.to_string());
323 let ordered_values = combo
324 .ordered
325 .iter()
326 .map(|(_, value)| value.clone())
327 .collect();
328 variants.push(ExpandedVariant {
329 job: clone,
330 labels: combo.lookup.clone(),
331 base_name: base_name.clone(),
332 ordered_values,
333 });
334 }
335 }
336 None => {
337 let mut clone = job.clone();
338 clone.parallel = None;
339 variants.push(ExpandedVariant {
340 job: clone,
341 labels: HashMap::new(),
342 base_name,
343 ordered_values: Vec::new(),
344 });
345 return Ok(variants);
346 }
347 }
348 Ok(variants)
349}
350
351fn matrix_combinations(entries: &[ParallelMatrixEntrySpec]) -> Result<Vec<LabelCombination>> {
352 if entries.is_empty() {
353 return Ok(vec![LabelCombination::empty()]);
354 }
355 let mut combos = Vec::new();
357 for entry in entries {
358 let mut entry_combos = vec![LabelCombination::empty()];
359 for var in &entry.variables {
360 let mut new_sets = Vec::new();
361 for combo in &entry_combos {
362 for value in &var.values {
363 new_sets.push(combo.push(var.name.clone(), value.clone()));
364 }
365 }
366 entry_combos = new_sets;
367 }
368 combos.extend(entry_combos);
369 }
370 Ok(combos)
371}
372
373fn format_gitlab_variant_values(labels: &[(String, String)]) -> String {
374 labels
375 .iter()
376 .map(|(_, value)| value.clone())
377 .collect::<Vec<_>>()
378 .join(", ")
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use crate::gitlab::rules::JobRule;
385 use crate::model::{
386 ArtifactSpec, PipelineDefaultsSpec, PipelineFilterSpec, PipelineSpec, RetryPolicySpec,
387 StageSpec,
388 };
389 use crate::pipeline::rules::RuleContext;
390 use std::path::Path;
391
392 #[test]
393 fn compile_pipeline_applies_rule_variables_and_excludes_unmatched_jobs() {
394 let included = JobSpec {
395 rules: vec![JobRule {
396 if_expr: Some("$CI_COMMIT_BRANCH == \"main\"".into()),
397 variables: HashMap::from([("FROM_RULE".into(), "1".into())]),
398 ..JobRule::default()
399 }],
400 ..job("lint", "test")
401 };
402 let excluded = JobSpec {
403 rules: vec![JobRule {
404 if_expr: Some("$CI_COMMIT_BRANCH == \"release\"".into()),
405 ..JobRule::default()
406 }],
407 ..job("publish", "deploy")
408 };
409 let pipeline = pipeline_spec(
410 vec![
411 StageSpec {
412 name: "test".into(),
413 jobs: vec!["lint".into()],
414 },
415 StageSpec {
416 name: "deploy".into(),
417 jobs: vec!["publish".into()],
418 },
419 ],
420 vec![included, excluded],
421 );
422 let ctx = RuleContext::new(Path::new("."));
423
424 let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
425
426 assert!(compiled.jobs.contains_key("lint"));
427 assert!(!compiled.jobs.contains_key("publish"));
428 assert_eq!(
429 compiled.jobs["lint"].job.variables.get("FROM_RULE"),
430 Some(&"1".to_string())
431 );
432 assert!(
433 compiled
434 .variants
435 .get("publish")
436 .is_none_or(|variants| variants.is_empty())
437 );
438 }
439
440 #[test]
441 fn compile_pipeline_resolves_matrix_needs_to_variant_dependencies() {
442 let pipeline = PipelineSpec::from_path(Path::new(
443 "pipelines/tests/needs-and-artifacts.gitlab-ci.yml",
444 ))
445 .expect("pipeline loads");
446 let ctx = RuleContext::new(Path::new("."));
447
448 let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
449
450 let package = compiled
451 .jobs
452 .get("package-linux")
453 .expect("package job exists");
454 assert!(compiled.jobs.contains_key("build-matrix: [linux, release]"));
455 assert!(
456 package
457 .dependencies
458 .iter()
459 .any(|dep| dep == "build-matrix: [linux, release]")
460 );
461
462 let matrix_need = package
463 .job
464 .needs
465 .iter()
466 .find(|need| need.job == "build-matrix")
467 .expect("matrix dependency present");
468 let variants = compiled.variants_for_dependency(matrix_need);
469 assert_eq!(variants, vec!["build-matrix: [linux, release]".to_string()]);
470 }
471
472 #[test]
473 fn compile_pipeline_applies_job_only_except_filters() {
474 let branch_only = JobSpec {
475 only: vec!["branches".into()],
476 ..job("branch-only", "build")
477 };
478 let tag_only = JobSpec {
479 only: vec!["tags".into()],
480 ..job("tag-only", "test")
481 };
482 let release_excluded = JobSpec {
483 only: vec!["branches".into()],
484 except: vec!["/^release\\/.*$/".into()],
485 ..job("no-release", "test")
486 };
487 let pipeline = pipeline_spec(
488 vec![
489 StageSpec {
490 name: "build".into(),
491 jobs: vec!["branch-only".into()],
492 },
493 StageSpec {
494 name: "test".into(),
495 jobs: vec!["tag-only".into(), "no-release".into()],
496 },
497 ],
498 vec![branch_only, tag_only, release_excluded],
499 );
500 let temp = tempfile::tempdir().expect("tempdir");
501 let branch_ctx = RuleContext::from_env(
502 temp.path(),
503 HashMap::from([
504 ("CI_PIPELINE_SOURCE".into(), "push".into()),
505 ("CI_COMMIT_BRANCH".into(), "main".into()),
506 ("CI_COMMIT_REF_NAME".into(), "main".into()),
507 ]),
508 false,
509 );
510
511 let branch_compiled =
512 compile_pipeline(&pipeline, Some(&branch_ctx)).expect("branch pipeline compiles");
513
514 assert!(branch_compiled.jobs.contains_key("branch-only"));
515 assert!(branch_compiled.jobs.contains_key("no-release"));
516 assert!(!branch_compiled.jobs.contains_key("tag-only"));
517
518 let tag_ctx = RuleContext::from_env(
519 Path::new("."),
520 HashMap::from([
521 ("CI_PIPELINE_SOURCE".into(), "push".into()),
522 ("CI_COMMIT_TAG".into(), "v1.2.0".into()),
523 ("CI_COMMIT_REF_NAME".into(), "v1.2.0".into()),
524 ]),
525 false,
526 );
527 let tag_compiled =
528 compile_pipeline(&pipeline, Some(&tag_ctx)).expect("tag pipeline compiles");
529
530 assert!(!tag_compiled.jobs.contains_key("branch-only"));
531 assert!(tag_compiled.jobs.contains_key("tag-only"));
532 }
533
534 #[test]
535 fn compile_pipeline_applies_job_level_manual_when() {
536 let pipeline = pipeline_spec(
537 vec![StageSpec {
538 name: "deploy".into(),
539 jobs: vec!["stop-review".into()],
540 }],
541 vec![JobSpec {
542 when: Some("manual".into()),
543 ..job("stop-review", "deploy")
544 }],
545 );
546
547 let compiled = compile_pipeline(&pipeline, Some(&RuleContext::new(Path::new("."))))
548 .expect("pipeline compiles");
549
550 let stop_review = compiled.jobs.get("stop-review").expect("job exists");
551 assert_eq!(stop_review.rule.when, crate::pipeline::RuleWhen::Manual);
552 assert_eq!(
553 stop_review.rule.manual_reason.as_deref(),
554 Some("manual job")
555 );
556 }
557
558 #[test]
559 fn compile_pipeline_preserves_job_level_manual_when_from_fixture() {
560 let pipeline =
561 PipelineSpec::from_path(Path::new("pipelines/tests/environments.gitlab-ci.yml"))
562 .expect("pipeline loads");
563 let ctx = RuleContext::from_env(
564 Path::new("."),
565 HashMap::from([
566 ("CI_PIPELINE_SOURCE".into(), "push".into()),
567 ("CI_COMMIT_BRANCH".into(), "main".into()),
568 ("CI_COMMIT_REF_NAME".into(), "main".into()),
569 ("CI_COMMIT_REF_SLUG".into(), "main".into()),
570 ]),
571 false,
572 );
573
574 let compiled = compile_pipeline(&pipeline, Some(&ctx)).expect("pipeline compiles");
575 let stop_review = compiled.jobs.get("stop-review").expect("job exists");
576
577 assert_eq!(stop_review.rule.when, crate::pipeline::RuleWhen::Manual);
578 }
579
580 fn pipeline_spec(stages: Vec<StageSpec>, jobs: Vec<JobSpec>) -> PipelineSpec {
581 PipelineSpec {
582 stages,
583 jobs: jobs
584 .into_iter()
585 .map(|job| (job.name.clone(), job))
586 .collect::<HashMap<_, _>>(),
587 defaults: PipelineDefaultsSpec::default(),
588 workflow: None,
589 filters: PipelineFilterSpec::default(),
590 }
591 }
592
593 fn job(name: &str, stage: &str) -> JobSpec {
594 JobSpec {
595 name: name.into(),
596 stage: stage.into(),
597 commands: vec!["true".into()],
598 needs: Vec::new(),
599 explicit_needs: false,
600 dependencies: Vec::new(),
601 before_script: None,
602 after_script: None,
603 inherit_default_before_script: true,
604 inherit_default_after_script: true,
605 inherit_default_image: true,
606 inherit_default_cache: true,
607 inherit_default_services: true,
608 inherit_default_timeout: true,
609 inherit_default_retry: true,
610 inherit_default_interruptible: true,
611 when: None,
612 rules: Vec::new(),
613 only: Vec::new(),
614 except: Vec::new(),
615 artifacts: ArtifactSpec::default(),
616 cache: Vec::new(),
617 image: None,
618 variables: HashMap::new(),
619 services: Vec::new(),
620 timeout: None,
621 retry: RetryPolicySpec::default(),
622 interruptible: false,
623 resource_group: None,
624 parallel: None,
625 tags: Vec::new(),
626 environment: None,
627 }
628 }
629}