Skip to main content

opal/pipeline/
cache.rs

1use crate::model::{CacheKeySpec, CachePolicySpec, CacheSpec};
2use crate::naming::job_name_slug;
3use anyhow::{Context, Result};
4use globset::Glob;
5use sha2::{Digest, Sha256};
6use std::collections::HashMap;
7use std::fs;
8use std::path::{Path, PathBuf};
9use walkdir::WalkDir;
10
11#[derive(Debug, Clone)]
12pub struct CacheManager {
13    root: PathBuf,
14}
15
16#[derive(Debug, Clone)]
17pub struct CacheMountSpec {
18    pub host: PathBuf,
19    pub relative: PathBuf,
20    pub read_only: bool,
21}
22
23#[derive(Debug, Clone)]
24pub struct CacheEntryInfo {
25    pub key: String,
26    pub fallback_keys: Vec<String>,
27    pub policy: CachePolicySpec,
28    pub host: PathBuf,
29    pub paths: Vec<PathBuf>,
30}
31
32impl CacheManager {
33    pub fn new(root: PathBuf) -> Self {
34        Self { root }
35    }
36
37    pub fn mount_specs(
38        &self,
39        job_name: &str,
40        staging_root: &Path,
41        caches: &[CacheSpec],
42        workspace: &Path,
43        env: &HashMap<String, String>,
44    ) -> Result<Vec<CacheMountSpec>> {
45        if caches.is_empty() {
46            return Ok(Vec::new());
47        }
48
49        let mut specs = Vec::new();
50        for cache in caches {
51            let key = resolve_cache_key(&cache.key, env, workspace);
52            let fallback_keys: Vec<String> = cache
53                .fallback_keys
54                .iter()
55                .map(|fallback| render_cache_key(fallback, env))
56                .collect();
57            let entry_root = self.entry_root(&key);
58            fs::create_dir_all(&entry_root).with_context(|| {
59                format!("failed to prepare cache root {}", entry_root.display())
60            })?;
61            for fallback in &fallback_keys {
62                let fallback_root = self.entry_root(fallback);
63                fs::create_dir_all(&fallback_root).with_context(|| {
64                    format!("failed to prepare cache root {}", fallback_root.display())
65                })?;
66            }
67
68            for relative in &cache.paths {
69                let rel = cache_relative_path(relative);
70                let entry_path = entry_root.join(&rel);
71                let fallback_entry_paths: Vec<PathBuf> = fallback_keys
72                    .iter()
73                    .map(|fallback| self.entry_root(fallback).join(&rel))
74                    .collect();
75                let host = prepare_cache_mount(
76                    cache.policy,
77                    job_name,
78                    staging_root,
79                    &key,
80                    &rel,
81                    &entry_path,
82                    &fallback_entry_paths,
83                )?;
84                specs.push(CacheMountSpec {
85                    host,
86                    relative: relative.clone(),
87                    read_only: false,
88                });
89            }
90        }
91
92        Ok(specs)
93    }
94
95    pub fn describe_entries(
96        &self,
97        caches: &[CacheSpec],
98        workspace: &Path,
99        env: &HashMap<String, String>,
100    ) -> Vec<CacheEntryInfo> {
101        caches
102            .iter()
103            .map(|cache| {
104                let key = resolve_cache_key(&cache.key, env, workspace);
105                let host = self.entry_root(&key);
106                CacheEntryInfo {
107                    key,
108                    fallback_keys: cache
109                        .fallback_keys
110                        .iter()
111                        .map(|fallback| render_cache_key(fallback, env))
112                        .collect(),
113                    policy: cache.policy,
114                    host,
115                    paths: cache.paths.clone(),
116                }
117            })
118            .collect()
119    }
120
121    fn entry_root(&self, key: &str) -> PathBuf {
122        self.root.join(cache_dir_name(key))
123    }
124}
125
126fn prepare_cache_mount(
127    policy: CachePolicySpec,
128    job_name: &str,
129    staging_root: &Path,
130    key: &str,
131    relative: &Path,
132    entry_path: &Path,
133    fallback_entry_paths: &[PathBuf],
134) -> Result<PathBuf> {
135    match policy {
136        CachePolicySpec::Pull => {
137            let staged = staged_cache_path(staging_root, job_name, key, relative);
138            reset_path(&staged)?;
139            if let Some(source) = restore_source_path(entry_path, fallback_entry_paths) {
140                copy_cache_path(source, &staged)?;
141            } else {
142                prepare_cache_path(&staged)?;
143            }
144            Ok(staged)
145        }
146        CachePolicySpec::Push => {
147            reset_path(entry_path)?;
148            prepare_cache_path(entry_path)?;
149            Ok(entry_path.to_path_buf())
150        }
151        CachePolicySpec::PullPush => {
152            if !entry_path.exists() {
153                if let Some(source) = restore_source_path(entry_path, fallback_entry_paths) {
154                    copy_cache_path(source, entry_path)?;
155                } else {
156                    prepare_cache_path(entry_path)?;
157                }
158            } else {
159                prepare_cache_path(entry_path)?;
160            }
161            Ok(entry_path.to_path_buf())
162        }
163    }
164}
165
166fn restore_source_path<'a>(primary: &'a Path, fallbacks: &'a [PathBuf]) -> Option<&'a Path> {
167    if primary.exists() {
168        return Some(primary);
169    }
170    fallbacks
171        .iter()
172        .find(|candidate| candidate.exists())
173        .map(PathBuf::as_path)
174}
175
176fn staged_cache_path(staging_root: &Path, job_name: &str, key: &str, relative: &Path) -> PathBuf {
177    staging_root
178        .join("cache-staging")
179        .join(job_name_slug(job_name))
180        .join(cache_dir_name(key))
181        .join(relative)
182}
183
184fn prepare_cache_path(path: &Path) -> Result<()> {
185    fs::create_dir_all(path)
186        .with_context(|| format!("failed to prepare cache path {}", path.display()))
187}
188
189fn reset_path(path: &Path) -> Result<()> {
190    if path.exists() {
191        remove_path(path)?;
192    }
193    Ok(())
194}
195
196fn copy_cache_path(src: &Path, dest: &Path) -> Result<()> {
197    let metadata =
198        fs::symlink_metadata(src).with_context(|| format!("failed to stat {}", src.display()))?;
199    if metadata.is_dir() {
200        fs::create_dir_all(dest).with_context(|| format!("failed to create {}", dest.display()))?;
201        for entry in
202            fs::read_dir(src).with_context(|| format!("failed to read {}", src.display()))?
203        {
204            let entry = entry?;
205            let child_src = entry.path();
206            let child_dest = dest.join(entry.file_name());
207            copy_cache_path(&child_src, &child_dest)?;
208        }
209        return Ok(());
210    }
211
212    if let Some(parent) = dest.parent() {
213        fs::create_dir_all(parent)
214            .with_context(|| format!("failed to create {}", parent.display()))?;
215    }
216    fs::copy(src, dest)
217        .with_context(|| format!("failed to copy {} to {}", src.display(), dest.display()))?;
218    Ok(())
219}
220
221fn remove_path(path: &Path) -> Result<()> {
222    if path.is_dir() {
223        fs::remove_dir_all(path).with_context(|| format!("failed to remove {}", path.display()))
224    } else {
225        fs::remove_file(path).with_context(|| format!("failed to remove {}", path.display()))
226    }
227}
228
229fn render_cache_key(template: &str, env: &HashMap<String, String>) -> String {
230    expand_variables(template, env)
231}
232
233fn resolve_cache_key(
234    cache_key: &CacheKeySpec,
235    env: &HashMap<String, String>,
236    workspace: &Path,
237) -> String {
238    match cache_key {
239        CacheKeySpec::Literal(template) => render_cache_key(template, env),
240        CacheKeySpec::Files { files, prefix } => {
241            files_cache_key(files, prefix.as_deref(), env, workspace)
242        }
243    }
244}
245
246fn files_cache_key(
247    files: &[PathBuf],
248    prefix: Option<&str>,
249    env: &HashMap<String, String>,
250    workspace: &Path,
251) -> String {
252    let mut matched = Vec::new();
253    for file in files {
254        matched.extend(resolve_cache_key_file_entry(file, workspace));
255    }
256    matched.sort();
257    matched.dedup();
258
259    let suffix = if matched.is_empty() {
260        "default".to_string()
261    } else {
262        let mut digest = Sha256::new();
263        let mut had_input = false;
264        for path in matched {
265            if let Ok(bytes) = fs::read(&path) {
266                digest.update(&bytes);
267                had_input = true;
268            }
269        }
270        if had_input {
271            format!("{:x}", digest.finalize())
272        } else {
273            "default".to_string()
274        }
275    };
276
277    if let Some(prefix) = prefix {
278        let rendered = expand_variables(prefix, env);
279        if !rendered.is_empty() {
280            return format!("{rendered}-{suffix}");
281        }
282    }
283    suffix
284}
285
286fn resolve_cache_key_file_entry(entry: &Path, workspace: &Path) -> Vec<PathBuf> {
287    let pattern = entry.to_string_lossy();
288    if has_glob_pattern(&pattern) {
289        let Ok(glob) = Glob::new(&pattern) else {
290            return Vec::new();
291        };
292        let matcher = glob.compile_matcher();
293        let mut matches = Vec::new();
294        for walk in WalkDir::new(workspace)
295            .follow_links(false)
296            .into_iter()
297            .flatten()
298        {
299            if !walk.path().is_file() {
300                continue;
301            }
302            let Ok(relative) = walk.path().strip_prefix(workspace) else {
303                continue;
304            };
305            if matcher.is_match(relative) {
306                matches.push(walk.path().to_path_buf());
307            }
308        }
309        return matches;
310    }
311    let path = if entry.is_absolute() {
312        entry.to_path_buf()
313    } else {
314        workspace.join(entry)
315    };
316    if path.is_file() {
317        vec![path]
318    } else {
319        Vec::new()
320    }
321}
322
323fn has_glob_pattern(value: &str) -> bool {
324    value.contains('*') || value.contains('?') || value.contains('[') || value.contains('{')
325}
326
327fn cache_dir_name(key: &str) -> String {
328    let mut slug = String::new();
329    for ch in key.chars() {
330        if ch.is_ascii_alphanumeric() {
331            slug.push(ch.to_ascii_lowercase());
332        } else if matches!(ch, '-' | '_' | '.') {
333            slug.push('-');
334        }
335    }
336    if slug.is_empty() {
337        slug.push_str("cache");
338    }
339    let digest = Sha256::digest(key.as_bytes());
340    let suffix = format!("{:x}", digest);
341    let short = &suffix[..12];
342    format!("{slug}-{short}")
343}
344
345fn cache_relative_path(path: &Path) -> PathBuf {
346    use std::path::Component;
347
348    let mut rel = PathBuf::new();
349    for component in path.components() {
350        match component {
351            Component::RootDir | Component::CurDir => continue,
352            Component::ParentDir => continue,
353            Component::Prefix(prefix) => rel.push(prefix.as_os_str()),
354            Component::Normal(seg) => rel.push(seg),
355        }
356    }
357
358    if rel.as_os_str().is_empty() {
359        rel.push("cache");
360    }
361    rel
362}
363
364// TODO: why are expanding vars inside the pipeline cache, what the fuck?
365fn expand_variables(template: &str, env: &HashMap<String, String>) -> String {
366    let mut out = String::new();
367    let mut chars = template.chars().peekable();
368    while let Some(ch) = chars.next() {
369        if ch != '$' {
370            out.push(ch);
371            continue;
372        }
373        match chars.peek().copied() {
374            Some('$') => {
375                out.push('$');
376                chars.next();
377            }
378            Some('{') => {
379                chars.next();
380                let mut name = String::new();
381                for next in chars.by_ref() {
382                    if next == '}' {
383                        break;
384                    }
385                    name.push(next);
386                }
387                if let Some(value) = env.get(&name) {
388                    out.push_str(value);
389                }
390            }
391            Some(c) if is_var_char(c) => {
392                let mut name = String::new();
393                name.push(c);
394                chars.next();
395                while let Some(&next) = chars.peek() {
396                    if is_var_char(next) {
397                        name.push(next);
398                        chars.next();
399                    } else {
400                        break;
401                    }
402                }
403                if let Some(value) = env.get(&name) {
404                    out.push_str(value);
405                }
406            }
407            _ => {
408                out.push('$');
409            }
410        }
411    }
412    out
413}
414
415fn is_var_char(ch: char) -> bool {
416    ch == '_' || ch.is_ascii_alphanumeric()
417}
418
419#[cfg(test)]
420mod tests {
421    use super::{CacheManager, cache_dir_name};
422    use crate::model::{CacheKeySpec, CachePolicySpec, CacheSpec};
423    use std::collections::HashMap;
424    use std::fs;
425    use std::path::{Path, PathBuf};
426    use std::time::{SystemTime, UNIX_EPOCH};
427
428    #[test]
429    fn pull_policy_stages_cache_into_job_local_copy() {
430        let root = temp_path("cache-pull");
431        let manager = CacheManager::new(root.join("cache-root"));
432        let key = "branch-main";
433        let entry = root
434            .join("cache-root")
435            .join(cache_dir_name(key))
436            .join("tests-temp/cache-data");
437        fs::create_dir_all(&entry).expect("create cache entry");
438        fs::write(entry.join("seed.txt"), "seed").expect("write seed");
439
440        let specs = manager
441            .mount_specs(
442                "test-job",
443                &root.join("session"),
444                &[cache("tests-temp/cache-data/", key, CachePolicySpec::Pull)],
445                &root,
446                &HashMap::new(),
447            )
448            .expect("mount specs");
449
450        assert_eq!(specs.len(), 1);
451        assert!(!specs[0].read_only);
452        assert!(
453            specs[0]
454                .host
455                .starts_with(root.join("session").join("cache-staging").join("test-job"))
456        );
457        assert!(specs[0].host.ends_with(Path::new("tests-temp/cache-data")));
458        assert!(specs[0].host.join("seed.txt").exists());
459
460        fs::write(specs[0].host.join("seed.txt"), "mutated").expect("mutate staged copy");
461        assert_eq!(
462            fs::read_to_string(entry.join("seed.txt")).expect("read original"),
463            "seed"
464        );
465
466        let _ = fs::remove_dir_all(root);
467    }
468
469    #[test]
470    fn push_policy_restarts_from_empty_cache_path() {
471        let root = temp_path("cache-push");
472        let manager = CacheManager::new(root.join("cache-root"));
473        let key = "branch-main";
474        let entry = root
475            .join("cache-root")
476            .join(cache_dir_name(key))
477            .join("tests-temp/cache-data");
478        fs::create_dir_all(&entry).expect("create cache entry");
479        fs::write(entry.join("old.txt"), "old").expect("write old");
480
481        let specs = manager
482            .mount_specs(
483                "seed-job",
484                &root.join("session"),
485                &[cache("tests-temp/cache-data/", key, CachePolicySpec::Push)],
486                &root,
487                &HashMap::new(),
488            )
489            .expect("mount specs");
490
491        assert_eq!(specs[0].host, entry);
492        assert!(!specs[0].host.join("old.txt").exists());
493
494        let _ = fs::remove_dir_all(root);
495    }
496
497    #[test]
498    fn pull_push_policy_restores_from_fallback_key_into_primary() {
499        let root = temp_path("cache-fallback");
500        let manager = CacheManager::new(root.join("cache-root"));
501        let primary_key = "branch-feature";
502        let fallback_key = "branch-main";
503        let fallback_entry = root
504            .join("cache-root")
505            .join(cache_dir_name(fallback_key))
506            .join("tests-temp/cache-data");
507        fs::create_dir_all(&fallback_entry).expect("create fallback entry");
508        fs::write(fallback_entry.join("seed.txt"), "fallback").expect("write fallback");
509
510        let specs = manager
511            .mount_specs(
512                "verify-job",
513                &root.join("session"),
514                &[cache_with_fallback(
515                    "tests-temp/cache-data/",
516                    primary_key,
517                    &[fallback_key],
518                    CachePolicySpec::PullPush,
519                )],
520                &root,
521                &HashMap::new(),
522            )
523            .expect("mount specs");
524
525        let primary_entry = root
526            .join("cache-root")
527            .join(cache_dir_name(primary_key))
528            .join("tests-temp/cache-data");
529
530        assert_eq!(specs[0].host, primary_entry);
531        assert_eq!(
532            fs::read_to_string(primary_entry.join("seed.txt")).expect("read restored"),
533            "fallback"
534        );
535
536        let _ = fs::remove_dir_all(root);
537    }
538
539    #[test]
540    fn files_cache_key_uses_workspace_file_content_with_prefix() {
541        let root = temp_path("cache-files-key");
542        let manager = CacheManager::new(root.join("cache-root"));
543        fs::create_dir_all(&root).expect("create root");
544        fs::write(root.join("Cargo.lock"), "content-v1").expect("write lockfile");
545
546        let entries = manager.describe_entries(
547            &[CacheSpec {
548                key: CacheKeySpec::Files {
549                    files: vec![PathBuf::from("Cargo.lock")],
550                    prefix: Some("$CI_JOB_NAME".to_string()),
551                },
552                fallback_keys: Vec::new(),
553                paths: vec![PathBuf::from("target")],
554                policy: CachePolicySpec::PullPush,
555            }],
556            &root,
557            &HashMap::from([("CI_JOB_NAME".to_string(), "lint".to_string())]),
558        );
559
560        assert_eq!(entries.len(), 1);
561        assert!(entries[0].key.starts_with("lint-"));
562        assert_ne!(entries[0].key, "lint-default");
563
564        let _ = fs::remove_dir_all(root);
565    }
566
567    #[test]
568    fn files_cache_key_falls_back_to_default_when_files_missing() {
569        let root = temp_path("cache-files-default");
570        let manager = CacheManager::new(root.join("cache-root"));
571        fs::create_dir_all(&root).expect("create root");
572
573        let entries = manager.describe_entries(
574            &[CacheSpec {
575                key: CacheKeySpec::Files {
576                    files: vec![PathBuf::from("missing.lock")],
577                    prefix: Some("deps".to_string()),
578                },
579                fallback_keys: Vec::new(),
580                paths: vec![PathBuf::from("target")],
581                policy: CachePolicySpec::PullPush,
582            }],
583            &root,
584            &HashMap::new(),
585        );
586
587        assert_eq!(entries[0].key, "deps-default");
588
589        let _ = fs::remove_dir_all(root);
590    }
591
592    fn cache(path: &str, key: &str, policy: CachePolicySpec) -> CacheSpec {
593        cache_with_fallback(path, key, &[], policy)
594    }
595
596    fn cache_with_fallback(
597        path: &str,
598        key: &str,
599        fallback_keys: &[&str],
600        policy: CachePolicySpec,
601    ) -> CacheSpec {
602        CacheSpec {
603            key: CacheKeySpec::Literal(key.into()),
604            fallback_keys: fallback_keys.iter().map(|key| (*key).to_string()).collect(),
605            paths: vec![PathBuf::from(path)],
606            policy,
607        }
608    }
609
610    fn temp_path(prefix: &str) -> PathBuf {
611        let nanos = SystemTime::now()
612            .duration_since(UNIX_EPOCH)
613            .expect("system time before epoch")
614            .as_nanos();
615        std::env::temp_dir().join(format!("opal-{prefix}-{nanos}"))
616    }
617}