Skip to main content

opal/pipeline/
mounts.rs

1use crate::execution_plan::ExecutionPlan;
2use crate::model::{ArtifactSourceOutcome, DependencySourceSpec, JobSpec, PipelineSpec};
3use crate::pipeline::{
4    ArtifactManager, CacheManager, CacheMountSpec, artifacts::ExternalArtifactsManager,
5};
6use anyhow::{Context, Result, anyhow};
7use globset::{Glob, GlobSet, GlobSetBuilder};
8use std::collections::HashMap;
9use std::ffi::OsString;
10use std::fs;
11use std::path::{Path, PathBuf};
12use tracing::warn;
13
14#[derive(Debug, Clone)]
15pub struct VolumeMount {
16    pub host: PathBuf,
17    pub container: PathBuf,
18    pub read_only: bool,
19}
20
21pub struct VolumeMountContext<'a> {
22    pub job: &'a JobSpec,
23    pub plan: &'a ExecutionPlan,
24    pub pipeline: &'a PipelineSpec,
25    pub workspace_root: &'a Path,
26    pub artifacts: &'a ArtifactManager,
27    pub cache: &'a CacheManager,
28    pub cache_env: &'a HashMap<String, String>,
29    pub completed_jobs: &'a HashMap<String, ArtifactSourceOutcome>,
30    pub session_dir: &'a Path,
31    pub container_root: &'a Path,
32    pub external: Option<&'a ExternalArtifactsManager>,
33}
34
35fn mount_external_artifacts(root: &Path, collector: &mut MountCollector<'_>) -> Result<()> {
36    if !root.exists() {
37        return Err(anyhow!(
38            "external artifact directory {} does not exist",
39            root.display()
40        ));
41    }
42    for entry in fs::read_dir(root).with_context(|| format!("failed to read {}", root.display()))? {
43        let entry = entry?;
44        let path = entry.path();
45        let rel = match path.strip_prefix(root) {
46            Ok(rel) if !rel.as_os_str().is_empty() => rel.to_path_buf(),
47            _ => continue,
48        };
49        let container = collector.container_path(&rel);
50        collector.push(path, container, true);
51    }
52    Ok(())
53}
54
55// TODO: this is garbabe, does too much, nested for loop logic
56pub fn collect_volume_mounts(ctx: VolumeMountContext<'_>) -> Result<Vec<VolumeMount>> {
57    let VolumeMountContext {
58        job,
59        plan,
60        pipeline,
61        workspace_root,
62        artifacts,
63        cache,
64        cache_env,
65        completed_jobs,
66        session_dir,
67        container_root,
68        external,
69    } = ctx;
70    let mut collector = MountCollector::new(container_root);
71    let mut dependency_mounts = Vec::new();
72
73    for dependency in &job.needs {
74        if !dependency.needs_artifacts {
75            continue;
76        }
77        match &dependency.source {
78            DependencySourceSpec::Local => {
79                let dep_job = pipeline.jobs.get(&dependency.job).cloned();
80                let Some(dep_job) = dep_job else {
81                    if dependency.optional {
82                        continue;
83                    }
84                    return Err(anyhow!(
85                        "job '{}' depends on unknown job '{}'",
86                        job.name,
87                        dependency.job
88                    ));
89                };
90                let variant_names = plan.variants_for_dependency(dependency);
91                if variant_names.is_empty() {
92                    if dependency.optional {
93                        continue;
94                    }
95                    return Err(anyhow!(
96                        "job '{}' requires artifacts from '{}', but it did not run",
97                        job.name,
98                        dependency.job
99                    ));
100                }
101                for variant in variant_names {
102                    for (host, relative) in artifacts.dependency_mount_specs(
103                        &variant,
104                        Some(&dep_job),
105                        completed_jobs.get(&variant).copied(),
106                        dependency.optional,
107                    ) {
108                        dependency_mounts.push(DependencyMount {
109                            host,
110                            relative,
111                            exclude: dep_job.artifacts.exclude.clone(),
112                        });
113                    }
114                }
115            }
116            DependencySourceSpec::External(ext) => {
117                let Some(manager) = external else {
118                    if dependency.optional {
119                        warn!(
120                            job = job.name,
121                            dependency = %dependency.job,
122                            "skipping cross-project dependency (GitLab credentials not configured)"
123                        );
124                        continue;
125                    } else {
126                        return Err(anyhow!(
127                            "job '{}' requires artifacts from project '{}' but no GitLab token is configured",
128                            job.name,
129                            ext.project
130                        ));
131                    }
132                };
133                match manager.ensure_artifacts(&ext.project, &dependency.job, &ext.reference) {
134                    Ok(root) => {
135                        mount_external_artifacts(&root, &mut collector)?;
136                    }
137                    Err(err) => {
138                        if dependency.optional {
139                            warn!(
140                                job = job.name,
141                                dependency = %dependency.job,
142                                project = %ext.project,
143                                "failed to download dependency artifacts: {err}"
144                            );
145                            continue;
146                        } else {
147                            return Err(err.context(format!(
148                                "failed to download artifacts for '{}' from project '{}'",
149                                dependency.job, ext.project
150                            )));
151                        }
152                    }
153                }
154            }
155        }
156    }
157
158    for dep_name in &job.dependencies {
159        if let Some(dep_planned) = plan.nodes.get(dep_name) {
160            if !dep_planned
161                .instance
162                .job
163                .artifacts
164                .when
165                .includes(completed_jobs.get(dep_name).copied())
166            {
167                continue;
168            }
169            for relative in &dep_planned.instance.job.artifacts.paths {
170                let host =
171                    artifacts.job_artifact_host_path(&dep_planned.instance.job.name, relative);
172                if !host.exists() {
173                    warn!(job = dep_planned.instance.job.name, path = %relative.display(), "artifact missing");
174                    continue;
175                }
176                dependency_mounts.push(DependencyMount {
177                    host,
178                    relative: relative.clone(),
179                    exclude: dep_planned.instance.job.artifacts.exclude.clone(),
180                });
181            }
182            continue;
183        }
184        let dep_job = pipeline.jobs.get(dep_name);
185        let Some(dep_job) = dep_job else {
186            warn!(job = dep_name, "dependency not present in pipeline graph");
187            continue;
188        };
189        if !dep_job
190            .artifacts
191            .when
192            .includes(completed_jobs.get(dep_name).copied())
193        {
194            continue;
195        }
196        for relative in &dep_job.artifacts.paths {
197            let host = artifacts.job_artifact_host_path(&dep_job.name, relative);
198            if !host.exists() {
199                warn!(job = dep_name, path = %relative.display(), "artifact missing");
200                continue;
201            }
202            dependency_mounts.push(DependencyMount {
203                host,
204                relative: relative.clone(),
205                exclude: dep_job.artifacts.exclude.clone(),
206            });
207        }
208    }
209
210    add_dependency_mounts(job, artifacts, &mut collector, dependency_mounts)?;
211
212    let cache_specs = cache.mount_specs(
213        &job.name,
214        session_dir,
215        &job.cache,
216        workspace_root,
217        cache_env,
218    )?;
219    for CacheMountSpec {
220        host,
221        relative,
222        read_only,
223    } in cache_specs
224    {
225        let container = collector.container_path(&relative);
226        collector.push(host, container, read_only);
227    }
228
229    Ok(collector.into_mounts())
230}
231
232fn add_dependency_mounts(
233    job: &JobSpec,
234    artifacts: &ArtifactManager,
235    collector: &mut MountCollector<'_>,
236    mounts: Vec<DependencyMount>,
237) -> Result<()> {
238    let mut grouped: HashMap<PathBuf, Vec<DependencyMount>> = HashMap::new();
239    for mount in mounts {
240        grouped
241            .entry(mount.relative.clone())
242            .or_default()
243            .push(mount);
244    }
245
246    for (relative, mounts) in grouped {
247        let container = collector.container_path(&relative);
248        let file_only = mounts.iter().all(|mount| mount.host.is_file());
249        let must_stage =
250            file_only || mounts.len() > 1 || mounts.iter().any(|mount| !mount.exclude.is_empty());
251        if !must_stage {
252            let Some(mount) = mounts.into_iter().next() else {
253                return Err(anyhow!(
254                    "internal error: no dependency mounts available for {}",
255                    relative.display()
256                ));
257            };
258            collector.push(mount.host, container, true);
259            continue;
260        }
261
262        let staged = stage_dependency_mount(artifacts, &job.name, &relative, &mounts)?;
263        if staged.exists() {
264            if file_only {
265                let container_parent = container.parent().unwrap_or(&container).to_path_buf();
266                let host_parent = staged.parent().unwrap_or(&staged).to_path_buf();
267                collector.push(host_parent, container_parent, true);
268            } else {
269                collector.push(staged, container, true);
270            }
271        }
272    }
273
274    Ok(())
275}
276
277fn stage_dependency_mount(
278    artifacts: &ArtifactManager,
279    job_name: &str,
280    relative: &Path,
281    mounts: &[DependencyMount],
282) -> Result<PathBuf> {
283    let staged = artifacts.job_dependency_host_path(job_name, relative);
284    if staged.exists() {
285        remove_path(&staged)
286            .with_context(|| format!("failed to clear staged dependency {}", staged.display()))?;
287    }
288
289    let any_dir = mounts.iter().any(|mount| mount.host.is_dir());
290    // TODO: why's there a random fs::create_dir_all inside here?
291    if any_dir {
292        fs::create_dir_all(&staged)
293            .with_context(|| format!("failed to create {}", staged.display()))?;
294        for mount in mounts {
295            let exclude = build_exclude_matcher(&mount.exclude)?;
296            if mount.host.is_dir() {
297                copy_dir_contents(&mount.host, &staged, relative, exclude.as_ref())?;
298            } else {
299                copy_dependency_file(
300                    &mount.host,
301                    &staged.join(file_name_or_default(&mount.host)),
302                    relative,
303                    exclude.as_ref(),
304                )?;
305            }
306        }
307    } else {
308        if let Some(parent) = staged.parent() {
309            fs::create_dir_all(parent)
310                .with_context(|| format!("failed to create {}", parent.display()))?;
311        }
312        for mount in mounts {
313            let exclude = build_exclude_matcher(&mount.exclude)?;
314            copy_dependency_file(&mount.host, &staged, relative, exclude.as_ref())?;
315        }
316    }
317
318    Ok(staged)
319}
320
321fn copy_dir_contents(
322    src: &Path,
323    dest: &Path,
324    base_relative: &Path,
325    exclude: Option<&GlobSet>,
326) -> Result<()> {
327    for entry in fs::read_dir(src).with_context(|| format!("failed to read {}", src.display()))? {
328        let entry = entry?;
329        let child_src = entry.path();
330        let child_dest = dest.join(entry.file_name());
331        let child_relative = base_relative.join(entry.file_name());
332        copy_path(&child_src, &child_dest, &child_relative, exclude)?;
333    }
334    Ok(())
335}
336
337fn copy_dependency_file(
338    src: &Path,
339    dest: &Path,
340    relative: &Path,
341    exclude: Option<&GlobSet>,
342) -> Result<()> {
343    if should_exclude_artifact(relative, exclude) {
344        return Ok(());
345    }
346    copy_path(src, dest, relative, exclude)
347}
348
349fn copy_path(src: &Path, dest: &Path, relative: &Path, exclude: Option<&GlobSet>) -> Result<()> {
350    let metadata =
351        fs::symlink_metadata(src).with_context(|| format!("failed to stat {}", src.display()))?;
352    if metadata.is_dir() {
353        fs::create_dir_all(dest).with_context(|| format!("failed to create {}", dest.display()))?;
354        copy_dir_contents(src, dest, relative, exclude)?;
355        return Ok(());
356    }
357    if should_exclude_artifact(relative, exclude) {
358        return Ok(());
359    }
360
361    // TODO: random fs::create_dir_all, refactor, separate concern here so you can test the functions
362
363    if let Some(parent) = dest.parent() {
364        fs::create_dir_all(parent)
365            .with_context(|| format!("failed to create {}", parent.display()))?;
366    }
367    fs::copy(src, dest)
368        .with_context(|| format!("failed to copy {} to {}", src.display(), dest.display()))?;
369    Ok(())
370}
371
372fn build_exclude_matcher(patterns: &[String]) -> Result<Option<GlobSet>> {
373    if patterns.is_empty() {
374        return Ok(None);
375    }
376
377    let mut builder = GlobSetBuilder::new();
378    for pattern in patterns {
379        builder.add(
380            Glob::new(pattern)
381                .with_context(|| format!("invalid artifacts.exclude pattern '{pattern}'"))?,
382        );
383    }
384    Ok(Some(builder.build()?))
385}
386
387fn should_exclude_artifact(path: &Path, exclude: Option<&GlobSet>) -> bool {
388    exclude.is_some_and(|glob| glob.is_match(path))
389}
390
391fn remove_path(path: &Path) -> Result<()> {
392    if path.is_dir() {
393        fs::remove_dir_all(path).with_context(|| format!("failed to remove {}", path.display()))
394    } else {
395        fs::remove_file(path).with_context(|| format!("failed to remove {}", path.display()))
396    }
397}
398
399fn file_name_or_default(path: &Path) -> OsString {
400    path.file_name()
401        .map(|name| name.to_os_string())
402        .unwrap_or_else(|| OsString::from("artifact"))
403}
404
405#[derive(Debug, Clone)]
406struct DependencyMount {
407    host: PathBuf,
408    relative: PathBuf,
409    exclude: Vec<String>,
410}
411
412impl VolumeMount {
413    pub fn to_arg(&self) -> OsString {
414        let mut arg = OsString::new();
415        arg.push(self.host.as_os_str());
416        arg.push(":");
417        arg.push(self.container.as_os_str());
418        if self.read_only {
419            arg.push(":ro");
420        }
421        arg
422    }
423}
424
425struct MountCollector<'a> {
426    container_root: &'a Path,
427    mounts: Vec<VolumeMount>,
428}
429
430impl<'a> MountCollector<'a> {
431    fn new(container_root: &'a Path) -> Self {
432        Self {
433            container_root,
434            mounts: Vec::new(),
435        }
436    }
437
438    fn push(&mut self, host: PathBuf, container: PathBuf, read_only: bool) {
439        if self
440            .mounts
441            .iter()
442            .any(|existing| existing.container == container)
443        {
444            return;
445        }
446        self.mounts.push(VolumeMount {
447            host,
448            container,
449            read_only,
450        });
451    }
452
453    fn container_path(&self, relative: &Path) -> PathBuf {
454        if relative.is_absolute() {
455            relative.to_path_buf()
456        } else {
457            self.container_root.join(relative)
458        }
459    }
460
461    fn into_mounts(self) -> Vec<VolumeMount> {
462        self.mounts
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::{DependencyMount, MountCollector, add_dependency_mounts};
469    use crate::model::{ArtifactSpec, JobSpec, RetryPolicySpec};
470    use crate::pipeline::ArtifactManager;
471    use std::collections::HashMap;
472    use std::fs;
473    use std::path::{Path, PathBuf};
474    use std::time::{SystemTime, UNIX_EPOCH};
475
476    #[test]
477    fn add_dependency_mounts_merges_directory_artifacts_for_same_target() {
478        let root = temp_path("dependency-merge");
479        let artifacts = ArtifactManager::new(root.clone());
480        let job = job("package-linux");
481        let first = root.join("first");
482        let second = root.join("second");
483        fs::create_dir_all(&first).expect("create first");
484        fs::create_dir_all(&second).expect("create second");
485        fs::write(first.join("linux-debug.txt"), "debug").expect("write debug");
486        fs::write(second.join("linux-release.txt"), "release").expect("write release");
487
488        let mut collector = MountCollector::new(Path::new("/builds/opal"));
489        add_dependency_mounts(
490            &job,
491            &artifacts,
492            &mut collector,
493            vec![
494                DependencyMount {
495                    host: first,
496                    relative: PathBuf::from("tests-temp/build"),
497                    exclude: Vec::new(),
498                },
499                DependencyMount {
500                    host: second,
501                    relative: PathBuf::from("tests-temp/build"),
502                    exclude: Vec::new(),
503                },
504            ],
505        )
506        .expect("merge dependency mounts");
507
508        let mounts = collector.into_mounts();
509        assert_eq!(mounts.len(), 1);
510        assert_eq!(
511            mounts[0].container,
512            PathBuf::from("/builds/opal/tests-temp/build")
513        );
514        assert!(mounts[0].host.join("linux-debug.txt").exists());
515        assert!(mounts[0].host.join("linux-release.txt").exists());
516
517        let _ = fs::remove_dir_all(root);
518    }
519
520    #[test]
521    fn add_dependency_mounts_applies_artifact_excludes_when_staging() {
522        let root = temp_path("dependency-exclude");
523        let artifacts = ArtifactManager::new(root.clone());
524        let job = job("package-linux");
525        let source = root.join("source");
526        fs::create_dir_all(source.join("sub")).expect("create source dir");
527        fs::write(source.join("include.txt"), "keep").expect("write include");
528        fs::write(source.join("ignore.txt"), "skip").expect("write ignore");
529        fs::write(source.join("sub").join("trace.log"), "skip").expect("write nested ignore");
530
531        let mut collector = MountCollector::new(Path::new("/builds/opal"));
532        add_dependency_mounts(
533            &job,
534            &artifacts,
535            &mut collector,
536            vec![DependencyMount {
537                host: source,
538                relative: PathBuf::from("tests-temp/filtered"),
539                exclude: vec![
540                    "tests-temp/filtered/ignore.txt".into(),
541                    "tests-temp/filtered/**/*.log".into(),
542                ],
543            }],
544        )
545        .expect("stage filtered dependency mount");
546
547        let mount = collector
548            .into_mounts()
549            .into_iter()
550            .next()
551            .expect("staged mount");
552        assert!(mount.host.join("include.txt").exists());
553        assert!(!mount.host.join("ignore.txt").exists());
554        assert!(!mount.host.join("sub").join("trace.log").exists());
555
556        let _ = fs::remove_dir_all(root);
557    }
558
559    fn job(name: &str) -> JobSpec {
560        JobSpec {
561            name: name.into(),
562            stage: "test".into(),
563            commands: vec!["echo ok".into()],
564            needs: Vec::new(),
565            explicit_needs: false,
566            dependencies: Vec::new(),
567            before_script: None,
568            after_script: None,
569            inherit_default_before_script: true,
570            inherit_default_after_script: true,
571            inherit_default_image: true,
572            inherit_default_cache: true,
573            inherit_default_services: true,
574            inherit_default_timeout: true,
575            inherit_default_retry: true,
576            inherit_default_interruptible: true,
577            when: None,
578            rules: Vec::new(),
579            only: Vec::new(),
580            except: Vec::new(),
581            artifacts: ArtifactSpec::default(),
582            cache: Vec::new(),
583            image: None,
584            variables: HashMap::new(),
585            services: Vec::new(),
586            timeout: None,
587            retry: RetryPolicySpec::default(),
588            interruptible: false,
589            resource_group: None,
590            parallel: None,
591            tags: Vec::new(),
592            environment: None,
593        }
594    }
595
596    fn temp_path(prefix: &str) -> PathBuf {
597        let nanos = SystemTime::now()
598            .duration_since(UNIX_EPOCH)
599            .expect("system time before epoch")
600            .as_nanos();
601        std::env::temp_dir().join(format!("opal-{prefix}-{nanos}"))
602    }
603}