1use crate::execution_plan::ExecutionPlan;
2use crate::model::{ArtifactSourceOutcome, DependencySourceSpec, JobSpec, PipelineSpec};
3use crate::pipeline::{
4 ArtifactManager, CacheManager, CacheMountSpec, artifacts::ExternalArtifactsManager,
5};
6use anyhow::{Context, Result, anyhow};
7use globset::{Glob, GlobSet, GlobSetBuilder};
8use std::collections::HashMap;
9use std::ffi::OsString;
10use std::fs;
11use std::path::{Path, PathBuf};
12use tracing::warn;
13
14#[derive(Debug, Clone)]
15pub struct VolumeMount {
16 pub host: PathBuf,
17 pub container: PathBuf,
18 pub read_only: bool,
19}
20
21pub struct VolumeMountContext<'a> {
22 pub job: &'a JobSpec,
23 pub plan: &'a ExecutionPlan,
24 pub pipeline: &'a PipelineSpec,
25 pub workspace_root: &'a Path,
26 pub artifacts: &'a ArtifactManager,
27 pub cache: &'a CacheManager,
28 pub cache_env: &'a HashMap<String, String>,
29 pub completed_jobs: &'a HashMap<String, ArtifactSourceOutcome>,
30 pub session_dir: &'a Path,
31 pub container_root: &'a Path,
32 pub external: Option<&'a ExternalArtifactsManager>,
33}
34
35fn mount_external_artifacts(root: &Path, collector: &mut MountCollector<'_>) -> Result<()> {
36 if !root.exists() {
37 return Err(anyhow!(
38 "external artifact directory {} does not exist",
39 root.display()
40 ));
41 }
42 for entry in fs::read_dir(root).with_context(|| format!("failed to read {}", root.display()))? {
43 let entry = entry?;
44 let path = entry.path();
45 let rel = match path.strip_prefix(root) {
46 Ok(rel) if !rel.as_os_str().is_empty() => rel.to_path_buf(),
47 _ => continue,
48 };
49 let container = collector.container_path(&rel);
50 collector.push(path, container, true);
51 }
52 Ok(())
53}
54
55pub fn collect_volume_mounts(ctx: VolumeMountContext<'_>) -> Result<Vec<VolumeMount>> {
57 let VolumeMountContext {
58 job,
59 plan,
60 pipeline,
61 workspace_root,
62 artifacts,
63 cache,
64 cache_env,
65 completed_jobs,
66 session_dir,
67 container_root,
68 external,
69 } = ctx;
70 let mut collector = MountCollector::new(container_root);
71 let mut dependency_mounts = Vec::new();
72
73 for dependency in &job.needs {
74 if !dependency.needs_artifacts {
75 continue;
76 }
77 match &dependency.source {
78 DependencySourceSpec::Local => {
79 let dep_job = pipeline.jobs.get(&dependency.job).cloned();
80 let Some(dep_job) = dep_job else {
81 if dependency.optional {
82 continue;
83 }
84 return Err(anyhow!(
85 "job '{}' depends on unknown job '{}'",
86 job.name,
87 dependency.job
88 ));
89 };
90 let variant_names = plan.variants_for_dependency(dependency);
91 if variant_names.is_empty() {
92 if dependency.optional {
93 continue;
94 }
95 return Err(anyhow!(
96 "job '{}' requires artifacts from '{}', but it did not run",
97 job.name,
98 dependency.job
99 ));
100 }
101 for variant in variant_names {
102 for (host, relative) in artifacts.dependency_mount_specs(
103 &variant,
104 Some(&dep_job),
105 completed_jobs.get(&variant).copied(),
106 dependency.optional,
107 ) {
108 dependency_mounts.push(DependencyMount {
109 host,
110 relative,
111 exclude: dep_job.artifacts.exclude.clone(),
112 });
113 }
114 }
115 }
116 DependencySourceSpec::External(ext) => {
117 let Some(manager) = external else {
118 if dependency.optional {
119 warn!(
120 job = job.name,
121 dependency = %dependency.job,
122 "skipping cross-project dependency (GitLab credentials not configured)"
123 );
124 continue;
125 } else {
126 return Err(anyhow!(
127 "job '{}' requires artifacts from project '{}' but no GitLab token is configured",
128 job.name,
129 ext.project
130 ));
131 }
132 };
133 match manager.ensure_artifacts(&ext.project, &dependency.job, &ext.reference) {
134 Ok(root) => {
135 mount_external_artifacts(&root, &mut collector)?;
136 }
137 Err(err) => {
138 if dependency.optional {
139 warn!(
140 job = job.name,
141 dependency = %dependency.job,
142 project = %ext.project,
143 "failed to download dependency artifacts: {err}"
144 );
145 continue;
146 } else {
147 return Err(err.context(format!(
148 "failed to download artifacts for '{}' from project '{}'",
149 dependency.job, ext.project
150 )));
151 }
152 }
153 }
154 }
155 }
156 }
157
158 for dep_name in &job.dependencies {
159 if let Some(dep_planned) = plan.nodes.get(dep_name) {
160 if !dep_planned
161 .instance
162 .job
163 .artifacts
164 .when
165 .includes(completed_jobs.get(dep_name).copied())
166 {
167 continue;
168 }
169 for relative in &dep_planned.instance.job.artifacts.paths {
170 let host =
171 artifacts.job_artifact_host_path(&dep_planned.instance.job.name, relative);
172 if !host.exists() {
173 warn!(job = dep_planned.instance.job.name, path = %relative.display(), "artifact missing");
174 continue;
175 }
176 dependency_mounts.push(DependencyMount {
177 host,
178 relative: relative.clone(),
179 exclude: dep_planned.instance.job.artifacts.exclude.clone(),
180 });
181 }
182 continue;
183 }
184 let dep_job = pipeline.jobs.get(dep_name);
185 let Some(dep_job) = dep_job else {
186 warn!(job = dep_name, "dependency not present in pipeline graph");
187 continue;
188 };
189 if !dep_job
190 .artifacts
191 .when
192 .includes(completed_jobs.get(dep_name).copied())
193 {
194 continue;
195 }
196 for relative in &dep_job.artifacts.paths {
197 let host = artifacts.job_artifact_host_path(&dep_job.name, relative);
198 if !host.exists() {
199 warn!(job = dep_name, path = %relative.display(), "artifact missing");
200 continue;
201 }
202 dependency_mounts.push(DependencyMount {
203 host,
204 relative: relative.clone(),
205 exclude: dep_job.artifacts.exclude.clone(),
206 });
207 }
208 }
209
210 add_dependency_mounts(job, artifacts, &mut collector, dependency_mounts)?;
211
212 let cache_specs = cache.mount_specs(
213 &job.name,
214 session_dir,
215 &job.cache,
216 workspace_root,
217 cache_env,
218 )?;
219 for CacheMountSpec {
220 host,
221 relative,
222 read_only,
223 } in cache_specs
224 {
225 let container = collector.container_path(&relative);
226 collector.push(host, container, read_only);
227 }
228
229 Ok(collector.into_mounts())
230}
231
232fn add_dependency_mounts(
233 job: &JobSpec,
234 artifacts: &ArtifactManager,
235 collector: &mut MountCollector<'_>,
236 mounts: Vec<DependencyMount>,
237) -> Result<()> {
238 let mut grouped: HashMap<PathBuf, Vec<DependencyMount>> = HashMap::new();
239 for mount in mounts {
240 grouped
241 .entry(mount.relative.clone())
242 .or_default()
243 .push(mount);
244 }
245
246 for (relative, mounts) in grouped {
247 let container = collector.container_path(&relative);
248 let file_only = mounts.iter().all(|mount| mount.host.is_file());
249 let must_stage =
250 file_only || mounts.len() > 1 || mounts.iter().any(|mount| !mount.exclude.is_empty());
251 if !must_stage {
252 let Some(mount) = mounts.into_iter().next() else {
253 return Err(anyhow!(
254 "internal error: no dependency mounts available for {}",
255 relative.display()
256 ));
257 };
258 collector.push(mount.host, container, true);
259 continue;
260 }
261
262 let staged = stage_dependency_mount(artifacts, &job.name, &relative, &mounts)?;
263 if staged.exists() {
264 if file_only {
265 let container_parent = container.parent().unwrap_or(&container).to_path_buf();
266 let host_parent = staged.parent().unwrap_or(&staged).to_path_buf();
267 collector.push(host_parent, container_parent, true);
268 } else {
269 collector.push(staged, container, true);
270 }
271 }
272 }
273
274 Ok(())
275}
276
277fn stage_dependency_mount(
278 artifacts: &ArtifactManager,
279 job_name: &str,
280 relative: &Path,
281 mounts: &[DependencyMount],
282) -> Result<PathBuf> {
283 let staged = artifacts.job_dependency_host_path(job_name, relative);
284 if staged.exists() {
285 remove_path(&staged)
286 .with_context(|| format!("failed to clear staged dependency {}", staged.display()))?;
287 }
288
289 let any_dir = mounts.iter().any(|mount| mount.host.is_dir());
290 if any_dir {
292 fs::create_dir_all(&staged)
293 .with_context(|| format!("failed to create {}", staged.display()))?;
294 for mount in mounts {
295 let exclude = build_exclude_matcher(&mount.exclude)?;
296 if mount.host.is_dir() {
297 copy_dir_contents(&mount.host, &staged, relative, exclude.as_ref())?;
298 } else {
299 copy_dependency_file(
300 &mount.host,
301 &staged.join(file_name_or_default(&mount.host)),
302 relative,
303 exclude.as_ref(),
304 )?;
305 }
306 }
307 } else {
308 if let Some(parent) = staged.parent() {
309 fs::create_dir_all(parent)
310 .with_context(|| format!("failed to create {}", parent.display()))?;
311 }
312 for mount in mounts {
313 let exclude = build_exclude_matcher(&mount.exclude)?;
314 copy_dependency_file(&mount.host, &staged, relative, exclude.as_ref())?;
315 }
316 }
317
318 Ok(staged)
319}
320
321fn copy_dir_contents(
322 src: &Path,
323 dest: &Path,
324 base_relative: &Path,
325 exclude: Option<&GlobSet>,
326) -> Result<()> {
327 for entry in fs::read_dir(src).with_context(|| format!("failed to read {}", src.display()))? {
328 let entry = entry?;
329 let child_src = entry.path();
330 let child_dest = dest.join(entry.file_name());
331 let child_relative = base_relative.join(entry.file_name());
332 copy_path(&child_src, &child_dest, &child_relative, exclude)?;
333 }
334 Ok(())
335}
336
337fn copy_dependency_file(
338 src: &Path,
339 dest: &Path,
340 relative: &Path,
341 exclude: Option<&GlobSet>,
342) -> Result<()> {
343 if should_exclude_artifact(relative, exclude) {
344 return Ok(());
345 }
346 copy_path(src, dest, relative, exclude)
347}
348
349fn copy_path(src: &Path, dest: &Path, relative: &Path, exclude: Option<&GlobSet>) -> Result<()> {
350 let metadata =
351 fs::symlink_metadata(src).with_context(|| format!("failed to stat {}", src.display()))?;
352 if metadata.is_dir() {
353 fs::create_dir_all(dest).with_context(|| format!("failed to create {}", dest.display()))?;
354 copy_dir_contents(src, dest, relative, exclude)?;
355 return Ok(());
356 }
357 if should_exclude_artifact(relative, exclude) {
358 return Ok(());
359 }
360
361 if let Some(parent) = dest.parent() {
364 fs::create_dir_all(parent)
365 .with_context(|| format!("failed to create {}", parent.display()))?;
366 }
367 fs::copy(src, dest)
368 .with_context(|| format!("failed to copy {} to {}", src.display(), dest.display()))?;
369 Ok(())
370}
371
372fn build_exclude_matcher(patterns: &[String]) -> Result<Option<GlobSet>> {
373 if patterns.is_empty() {
374 return Ok(None);
375 }
376
377 let mut builder = GlobSetBuilder::new();
378 for pattern in patterns {
379 builder.add(
380 Glob::new(pattern)
381 .with_context(|| format!("invalid artifacts.exclude pattern '{pattern}'"))?,
382 );
383 }
384 Ok(Some(builder.build()?))
385}
386
387fn should_exclude_artifact(path: &Path, exclude: Option<&GlobSet>) -> bool {
388 exclude.is_some_and(|glob| glob.is_match(path))
389}
390
391fn remove_path(path: &Path) -> Result<()> {
392 if path.is_dir() {
393 fs::remove_dir_all(path).with_context(|| format!("failed to remove {}", path.display()))
394 } else {
395 fs::remove_file(path).with_context(|| format!("failed to remove {}", path.display()))
396 }
397}
398
399fn file_name_or_default(path: &Path) -> OsString {
400 path.file_name()
401 .map(|name| name.to_os_string())
402 .unwrap_or_else(|| OsString::from("artifact"))
403}
404
405#[derive(Debug, Clone)]
406struct DependencyMount {
407 host: PathBuf,
408 relative: PathBuf,
409 exclude: Vec<String>,
410}
411
412impl VolumeMount {
413 pub fn to_arg(&self) -> OsString {
414 let mut arg = OsString::new();
415 arg.push(self.host.as_os_str());
416 arg.push(":");
417 arg.push(self.container.as_os_str());
418 if self.read_only {
419 arg.push(":ro");
420 }
421 arg
422 }
423}
424
425struct MountCollector<'a> {
426 container_root: &'a Path,
427 mounts: Vec<VolumeMount>,
428}
429
430impl<'a> MountCollector<'a> {
431 fn new(container_root: &'a Path) -> Self {
432 Self {
433 container_root,
434 mounts: Vec::new(),
435 }
436 }
437
438 fn push(&mut self, host: PathBuf, container: PathBuf, read_only: bool) {
439 if self
440 .mounts
441 .iter()
442 .any(|existing| existing.container == container)
443 {
444 return;
445 }
446 self.mounts.push(VolumeMount {
447 host,
448 container,
449 read_only,
450 });
451 }
452
453 fn container_path(&self, relative: &Path) -> PathBuf {
454 if relative.is_absolute() {
455 relative.to_path_buf()
456 } else {
457 self.container_root.join(relative)
458 }
459 }
460
461 fn into_mounts(self) -> Vec<VolumeMount> {
462 self.mounts
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::{DependencyMount, MountCollector, add_dependency_mounts};
469 use crate::model::{ArtifactSpec, JobSpec, RetryPolicySpec};
470 use crate::pipeline::ArtifactManager;
471 use std::collections::HashMap;
472 use std::fs;
473 use std::path::{Path, PathBuf};
474 use std::time::{SystemTime, UNIX_EPOCH};
475
476 #[test]
477 fn add_dependency_mounts_merges_directory_artifacts_for_same_target() {
478 let root = temp_path("dependency-merge");
479 let artifacts = ArtifactManager::new(root.clone());
480 let job = job("package-linux");
481 let first = root.join("first");
482 let second = root.join("second");
483 fs::create_dir_all(&first).expect("create first");
484 fs::create_dir_all(&second).expect("create second");
485 fs::write(first.join("linux-debug.txt"), "debug").expect("write debug");
486 fs::write(second.join("linux-release.txt"), "release").expect("write release");
487
488 let mut collector = MountCollector::new(Path::new("/builds/opal"));
489 add_dependency_mounts(
490 &job,
491 &artifacts,
492 &mut collector,
493 vec![
494 DependencyMount {
495 host: first,
496 relative: PathBuf::from("tests-temp/build"),
497 exclude: Vec::new(),
498 },
499 DependencyMount {
500 host: second,
501 relative: PathBuf::from("tests-temp/build"),
502 exclude: Vec::new(),
503 },
504 ],
505 )
506 .expect("merge dependency mounts");
507
508 let mounts = collector.into_mounts();
509 assert_eq!(mounts.len(), 1);
510 assert_eq!(
511 mounts[0].container,
512 PathBuf::from("/builds/opal/tests-temp/build")
513 );
514 assert!(mounts[0].host.join("linux-debug.txt").exists());
515 assert!(mounts[0].host.join("linux-release.txt").exists());
516
517 let _ = fs::remove_dir_all(root);
518 }
519
520 #[test]
521 fn add_dependency_mounts_applies_artifact_excludes_when_staging() {
522 let root = temp_path("dependency-exclude");
523 let artifacts = ArtifactManager::new(root.clone());
524 let job = job("package-linux");
525 let source = root.join("source");
526 fs::create_dir_all(source.join("sub")).expect("create source dir");
527 fs::write(source.join("include.txt"), "keep").expect("write include");
528 fs::write(source.join("ignore.txt"), "skip").expect("write ignore");
529 fs::write(source.join("sub").join("trace.log"), "skip").expect("write nested ignore");
530
531 let mut collector = MountCollector::new(Path::new("/builds/opal"));
532 add_dependency_mounts(
533 &job,
534 &artifacts,
535 &mut collector,
536 vec![DependencyMount {
537 host: source,
538 relative: PathBuf::from("tests-temp/filtered"),
539 exclude: vec![
540 "tests-temp/filtered/ignore.txt".into(),
541 "tests-temp/filtered/**/*.log".into(),
542 ],
543 }],
544 )
545 .expect("stage filtered dependency mount");
546
547 let mount = collector
548 .into_mounts()
549 .into_iter()
550 .next()
551 .expect("staged mount");
552 assert!(mount.host.join("include.txt").exists());
553 assert!(!mount.host.join("ignore.txt").exists());
554 assert!(!mount.host.join("sub").join("trace.log").exists());
555
556 let _ = fs::remove_dir_all(root);
557 }
558
559 fn job(name: &str) -> JobSpec {
560 JobSpec {
561 name: name.into(),
562 stage: "test".into(),
563 commands: vec!["echo ok".into()],
564 needs: Vec::new(),
565 explicit_needs: false,
566 dependencies: Vec::new(),
567 before_script: None,
568 after_script: None,
569 inherit_default_before_script: true,
570 inherit_default_after_script: true,
571 inherit_default_image: true,
572 inherit_default_cache: true,
573 inherit_default_services: true,
574 inherit_default_timeout: true,
575 inherit_default_retry: true,
576 inherit_default_interruptible: true,
577 when: None,
578 rules: Vec::new(),
579 only: Vec::new(),
580 except: Vec::new(),
581 artifacts: ArtifactSpec::default(),
582 cache: Vec::new(),
583 image: None,
584 variables: HashMap::new(),
585 services: Vec::new(),
586 timeout: None,
587 retry: RetryPolicySpec::default(),
588 interruptible: false,
589 resource_group: None,
590 parallel: None,
591 tags: Vec::new(),
592 environment: None,
593 }
594 }
595
596 fn temp_path(prefix: &str) -> PathBuf {
597 let nanos = SystemTime::now()
598 .duration_since(UNIX_EPOCH)
599 .expect("system time before epoch")
600 .as_nanos();
601 std::env::temp_dir().join(format!("opal-{prefix}-{nanos}"))
602 }
603}