Skip to main content

cuenv_core/tasks/
cache.rs

1//! Task-level caching glue between the executor, [`cuenv_cas`], and
2//! [`cuenv_vcs`].
3//!
4//! This module is responsible for:
5//!
6//! 1. Building the [`cuenv_cas::Action`] envelope for a task: a deterministic
7//!    summary of everything that affects the task's outputs.
8//! 2. Querying the [`cuenv_cas::ActionCache`] for a previous result.
9//! 3. Materializing cached outputs back into the workspace on a hit.
10//! 4. Persisting outputs and metadata after a successful execution on a miss.
11
12use crate::Result;
13use crate::environment::Environment;
14use crate::tasks::{Task, TaskCachePolicy};
15use cuenv_cas::{
16    Action, ActionCache, ActionResult, Cas, Command, Digest, Directory, DirectoryNode,
17    ExecutionMetadata, FileNode, OutputFile, Platform, digest_of,
18};
19use cuenv_vcs::{HashedInput, VcsHasher};
20use globset::{Glob, GlobSetBuilder};
21use std::collections::BTreeMap;
22use std::path::{Component, Path, PathBuf};
23use std::sync::Arc;
24use std::time::Duration;
25use walkdir::WalkDir;
26
27/// Bundle of caching infrastructure used by the task executor.
28///
29/// All three handles must point at the same logical store; the executor
30/// does no cross-store reconciliation.
31#[derive(Clone)]
32pub struct TaskCacheConfig {
33    /// Content-addressed blob store.
34    pub cas: Arc<dyn Cas>,
35    /// Action -> result lookup table.
36    pub action_cache: Arc<dyn ActionCache>,
37    /// Strategy for resolving and hashing input files.
38    pub vcs_hasher: Arc<dyn VcsHasher>,
39    /// Root path the shared [`VcsHasher`] resolves inputs against.
40    pub vcs_hasher_root: PathBuf,
41    /// cuenv binary version, baked into every action digest. Bumping this
42    /// invalidates all cache entries on upgrade.
43    pub cuenv_version: String,
44    /// Optional runtime identity properties folded into action identity.
45    /// For Nix runtime this includes the locked runtime digest.
46    pub runtime_identity_properties: BTreeMap<String, String>,
47    /// Optional reason caching is disabled for this run.
48    pub cache_disabled_reason: Option<String>,
49}
50
51impl std::fmt::Debug for TaskCacheConfig {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.debug_struct("TaskCacheConfig")
54            .field("vcs_hasher", &self.vcs_hasher.name())
55            .field("vcs_hasher_root", &self.vcs_hasher_root)
56            .field("cuenv_version", &self.cuenv_version)
57            .field(
58                "runtime_identity_properties",
59                &self.runtime_identity_properties,
60            )
61            .field("cache_disabled_reason", &self.cache_disabled_reason)
62            .finish_non_exhaustive()
63    }
64}
65
66/// Returns the effective task cache policy.
67#[must_use]
68pub fn effective_policy(task: &Task) -> TaskCachePolicy {
69    task.cache_policy()
70}
71
72/// Inputs to [`build_action`].
73pub struct BuildActionInput<'a> {
74    /// Task definition being hashed.
75    pub task: &'a Task,
76    /// Human-readable task name for diagnostics.
77    pub task_name: &'a str,
78    /// Environment resolver used by the executor.
79    pub environment: &'a Environment,
80    /// Cache infrastructure.
81    pub cache: &'a TaskCacheConfig,
82    /// Working directory the executor will actually use.
83    pub workdir: &'a Path,
84    /// Project root used for resolving task inputs.
85    pub project_root: &'a Path,
86    /// cue module root used for relative workdir normalization when needed.
87    pub module_root: &'a Path,
88}
89
90/// Build the [`Action`] envelope for a task and compute its digest.
91///
92/// Returns `Ok(None)` when the task is not eligible for caching.
93///
94/// # Errors
95///
96/// Propagates failures from task command resolution and canonical encoding.
97///
98/// Input hashing failures degrade to `Ok(None)` so cache eligibility never
99/// changes whether the task itself is runnable.
100pub async fn build_action(input: BuildActionInput<'_>) -> Result<Option<(Action, Digest)>> {
101    let BuildActionInput {
102        task,
103        task_name,
104        environment,
105        cache,
106        workdir,
107        project_root,
108        module_root,
109    } = input;
110
111    if let Some(reason) = &cache.cache_disabled_reason {
112        tracing::debug!(task = %task_name, reason, "skipping cache");
113        return Ok(None);
114    }
115
116    let policy = effective_policy(task);
117    if !policy.mode.allows_read() && !policy.mode.allows_write() {
118        tracing::debug!(task = %task_name, "skipping cache: task cache mode is never");
119        return Ok(None);
120    }
121
122    if task.inputs.is_empty() {
123        return Ok(None);
124    }
125
126    let mut patterns = Vec::with_capacity(task.inputs.len());
127    for input in &task.inputs {
128        if let Some(path) = input.as_path() {
129            patterns.push(path.clone());
130        } else {
131            tracing::debug!(
132                task = %task_name,
133                "skipping cache: task uses non-path input (project/task reference)"
134            );
135            return Ok(None);
136        }
137    }
138
139    let Some(hashed) = resolve_hashed_inputs(cache, &patterns, project_root, task_name).await?
140    else {
141        return Ok(None);
142    };
143    if hashed.is_empty() {
144        tracing::debug!(
145            task = %task_name,
146            "skipping cache: declared path inputs resolved to no files"
147        );
148        return Ok(None);
149    }
150    let input_root_digest = build_input_root_digest(&hashed)?;
151
152    let mut environment_variables = BTreeMap::new();
153    let resolved = environment.merge_with_system_hermetic();
154    for (key, value) in &resolved {
155        environment_variables.insert(key.clone(), value.clone());
156    }
157    for (key, value) in &task.env {
158        if let Some(string_value) = value.as_str() {
159            if let Some(host) = super::output_refs::parse_passthrough(string_value) {
160                if let Ok(host_value) = std::env::var(host) {
161                    environment_variables.insert(key.clone(), host_value);
162                }
163            } else if !string_value.starts_with("cuenv:ref:") {
164                environment_variables.insert(key.clone(), string_value.to_string());
165            }
166        } else if let Some(number) = value.as_i64() {
167            environment_variables.insert(key.clone(), number.to_string());
168        } else if let Some(boolean) = value.as_bool() {
169            environment_variables.insert(key.clone(), boolean.to_string());
170        }
171    }
172
173    let command_spec = task.command_spec(|command| environment.resolve_command(command))?;
174    let mut arguments = Vec::with_capacity(1 + command_spec.args.len());
175    arguments.push(command_spec.program);
176    arguments.extend(command_spec.args);
177
178    let command = Command {
179        arguments,
180        environment_variables,
181        output_files: task.outputs.clone(),
182        output_directories: Vec::new(),
183        working_directory: normalize_workdir(workdir, project_root, module_root),
184    };
185    let command_digest = digest_of(&command)
186        .map_err(|e| crate::Error::configuration(format!("command digest: {e}")))?;
187
188    let mut platform_properties = BTreeMap::new();
189    platform_properties.insert("os".to_string(), std::env::consts::OS.to_string());
190    platform_properties.insert("arch".to_string(), std::env::consts::ARCH.to_string());
191    for (key, value) in &cache.runtime_identity_properties {
192        platform_properties.insert(key.clone(), value.clone());
193    }
194
195    let action = Action {
196        command_digest,
197        input_root_digest,
198        platform: Platform {
199            properties: platform_properties,
200        },
201        cuenv_version: cache.cuenv_version.clone(),
202    };
203    let action_digest = digest_of(&action)
204        .map_err(|e| crate::Error::configuration(format!("action digest: {e}")))?;
205
206    Ok(Some((action, action_digest)))
207}
208
209async fn resolve_hashed_inputs(
210    cache: &TaskCacheConfig,
211    patterns: &[String],
212    project_root: &Path,
213    task_name: &str,
214) -> Result<Option<Vec<HashedInput>>> {
215    let prefixed_patterns =
216        match prefix_patterns_for_hasher_root(patterns, project_root, &cache.vcs_hasher_root) {
217            Ok(prefixed_patterns) => prefixed_patterns,
218            Err(error) => {
219                tracing::warn!(
220                    task = %task_name,
221                    project_root = %project_root.display(),
222                    hasher_root = %cache.vcs_hasher_root.display(),
223                    error = %error,
224                    "skipping cache: cannot map task inputs to cache hasher root"
225                );
226                return Ok(None);
227            }
228        };
229
230    let hashed = match cache.vcs_hasher.resolve_and_hash(&prefixed_patterns).await {
231        Ok(hashed) => hashed,
232        Err(error) => {
233            tracing::warn!(
234                task = %task_name,
235                error = %error,
236                "skipping cache: input hashing failed"
237            );
238            return Ok(None);
239        }
240    };
241
242    let rebased =
243        match rebase_hashed_inputs_for_project_root(hashed, project_root, &cache.vcs_hasher_root) {
244            Ok(rebased) => rebased,
245            Err(error) => {
246                tracing::warn!(
247                    task = %task_name,
248                    project_root = %project_root.display(),
249                    hasher_root = %cache.vcs_hasher_root.display(),
250                    error = %error,
251                    "skipping cache: hashed inputs escaped task project root"
252                );
253                return Ok(None);
254            }
255        };
256
257    Ok(Some(rebased))
258}
259
260/// Query the action cache for a previous result.
261///
262/// # Errors
263///
264/// Propagates any error from the underlying [`ActionCache`] implementation.
265pub fn lookup(
266    cache: &TaskCacheConfig,
267    action_digest: &Digest,
268    task: &Task,
269) -> Result<Option<ActionResult>> {
270    let policy = effective_policy(task);
271    if !policy.mode.allows_read() {
272        return Ok(None);
273    }
274
275    let Some(result) = cache
276        .action_cache
277        .lookup(action_digest)
278        .map_err(|e| crate::Error::configuration(format!("action cache lookup: {e}")))?
279    else {
280        return Ok(None);
281    };
282
283    if result.exit_code != 0 {
284        tracing::warn!(
285            action = %action_digest,
286            exit_code = result.exit_code,
287            "ignoring invalid cached result with non-zero exit code"
288        );
289        return Ok(None);
290    }
291
292    if is_expired(&result, policy.max_age.as_deref())? {
293        tracing::debug!(
294            action = %action_digest,
295            max_age = ?policy.max_age,
296            "cache entry expired"
297        );
298        return Ok(None);
299    }
300
301    Ok(Some(result))
302}
303
304/// Materialize a cache hit's outputs into `workdir`.
305///
306/// Returns `(stdout, stderr, exit_code)` reconstructed from the CAS so the
307/// caller can build a `TaskResult` without having executed the task.
308///
309/// # Errors
310///
311/// Propagates any error from the [`Cas`] when fetching blobs or restoring
312/// output permissions.
313pub fn materialize_hit(
314    cache: &TaskCacheConfig,
315    workdir: &Path,
316    result: &ActionResult,
317) -> Result<(String, String, i32)> {
318    for output_file in &result.output_files {
319        let destination = workdir.join(&output_file.path);
320        if let Some(parent) = destination.parent() {
321            std::fs::create_dir_all(parent).map_err(|e| {
322                crate::Error::configuration(format!(
323                    "create output parent {}: {e}",
324                    parent.display()
325                ))
326            })?;
327        }
328        cache
329            .cas
330            .get_to_file(&output_file.digest, &destination)
331            .map_err(|e| crate::Error::configuration(format!("cas get output: {e}")))?;
332        set_executable_if_needed(&destination, output_file.is_executable)?;
333    }
334
335    let stdout = if let Some(digest) = &result.stdout_digest {
336        let bytes = cache
337            .cas
338            .get(digest)
339            .map_err(|e| crate::Error::configuration(format!("cas get stdout: {e}")))?;
340        String::from_utf8_lossy(&bytes).into_owned()
341    } else {
342        String::new()
343    };
344
345    let stderr = if let Some(digest) = &result.stderr_digest {
346        let bytes = cache
347            .cas
348            .get(digest)
349            .map_err(|e| crate::Error::configuration(format!("cas get stderr: {e}")))?;
350        String::from_utf8_lossy(&bytes).into_owned()
351    } else {
352        String::new()
353    };
354
355    Ok((stdout, stderr, result.exit_code))
356}
357
358/// Persist a successful execution to the cache.
359///
360/// Failures are best-effort: callers should ignore the result so a cache-write
361/// hiccup never fails the user's task.
362///
363/// # Errors
364///
365/// Returns an error if the [`Cas`] or [`ActionCache`] persistence fails.
366pub fn record(input: RecordInput<'_>) -> Result<()> {
367    let RecordInput {
368        cache,
369        action_digest,
370        workdir,
371        task,
372        stdout,
373        stderr,
374        exit_code,
375        duration_ms,
376    } = input;
377
378    if exit_code != 0 {
379        tracing::debug!(exit_code, "skipping cache write for non-zero exit code");
380        return Ok(());
381    }
382
383    let resolved_outputs = collect_outputs(workdir, &task.outputs)?;
384    let mut output_files = Vec::with_capacity(resolved_outputs.len());
385    for relative_path in resolved_outputs {
386        let absolute_path = workdir.join(&relative_path);
387        let digest = cache
388            .cas
389            .put_file(&absolute_path)
390            .map_err(|e| crate::Error::configuration(format!("cas put output: {e}")))?;
391        output_files.push(OutputFile {
392            path: path_to_forward_slashes(&relative_path),
393            digest,
394            is_executable: is_executable(&absolute_path)?,
395        });
396    }
397
398    let redacted_stdout = cuenv_events::redact(stdout);
399    let redacted_stderr = cuenv_events::redact(stderr);
400    let stdout_digest = cache
401        .cas
402        .put_bytes(redacted_stdout.as_bytes())
403        .map_err(|e| crate::Error::configuration(format!("cas put stdout: {e}")))?;
404    let stderr_digest = cache
405        .cas
406        .put_bytes(redacted_stderr.as_bytes())
407        .map_err(|e| crate::Error::configuration(format!("cas put stderr: {e}")))?;
408
409    let result = ActionResult {
410        output_files,
411        output_directories: Vec::new(),
412        exit_code,
413        stdout_digest: Some(stdout_digest),
414        stderr_digest: Some(stderr_digest),
415        execution_metadata: ExecutionMetadata {
416            worker: "local".to_string(),
417            duration_ms,
418            created_at: chrono::Utc::now(),
419        },
420    };
421    cache
422        .action_cache
423        .update(action_digest, &result)
424        .map_err(|e| crate::Error::configuration(format!("action cache update: {e}")))?;
425    Ok(())
426}
427
428fn is_expired(result: &ActionResult, max_age: Option<&str>) -> Result<bool> {
429    let Some(spec) = max_age else {
430        return Ok(false);
431    };
432    let max_age_duration = parse_max_age(spec)?;
433    let Some(max_age_duration) = max_age_duration else {
434        return Ok(false);
435    };
436
437    let now = chrono::Utc::now();
438    let age = now.signed_duration_since(result.execution_metadata.created_at);
439    if age < chrono::Duration::zero() {
440        return Ok(true);
441    }
442
443    let age = age
444        .to_std()
445        .map_err(|e| crate::Error::configuration(format!("invalid cache age: {e}")))?;
446    Ok(age > max_age_duration)
447}
448
449fn parse_max_age(spec: &str) -> Result<Option<Duration>> {
450    let raw = spec.trim();
451    if raw.is_empty() {
452        return Err(crate::Error::configuration(
453            "cache.maxAge must not be empty".to_string(),
454        ));
455    }
456    if raw.eq_ignore_ascii_case("infinite")
457        || raw.eq_ignore_ascii_case("inf")
458        || raw.eq_ignore_ascii_case("never")
459    {
460        return Ok(None);
461    }
462
463    let digits_len = raw.bytes().take_while(|byte| byte.is_ascii_digit()).count();
464    if digits_len == 0 || digits_len == raw.len() {
465        return Err(crate::Error::configuration(format!(
466            "invalid cache.maxAge '{raw}': expected <int><unit> (e.g. 30m, 1h)"
467        )));
468    }
469
470    let quantity: u64 = raw[..digits_len]
471        .parse()
472        .map_err(|e| crate::Error::configuration(format!("invalid cache.maxAge '{raw}': {e}")))?;
473    let unit = raw[digits_len..].trim().to_ascii_lowercase();
474
475    let duration = match unit.as_str() {
476        "ms" => Duration::from_millis(quantity),
477        "s" => Duration::from_secs(quantity),
478        "m" => Duration::from_secs(multiply_checked(quantity, 60, raw)?),
479        "h" => Duration::from_secs(multiply_checked(quantity, 60 * 60, raw)?),
480        "d" => Duration::from_secs(multiply_checked(quantity, 24 * 60 * 60, raw)?),
481        _ => {
482            return Err(crate::Error::configuration(format!(
483                "invalid cache.maxAge unit in '{raw}': use ms|s|m|h|d|infinite"
484            )));
485        }
486    };
487
488    Ok(Some(duration))
489}
490
491fn multiply_checked(quantity: u64, factor: u64, raw: &str) -> Result<u64> {
492    quantity.checked_mul(factor).ok_or_else(|| {
493        crate::Error::configuration(format!("cache.maxAge '{raw}' is too large to represent"))
494    })
495}
496
497#[derive(Default)]
498struct InputDirectoryBuilder {
499    files: BTreeMap<String, FileNode>,
500    directories: BTreeMap<String, Self>,
501}
502
503impl InputDirectoryBuilder {
504    fn insert(&mut self, relative_path: &Path, digest: Digest, is_executable: bool) -> Result<()> {
505        let mut components = relative_path.components().peekable();
506        let mut current = self;
507
508        while let Some(component) = components.next() {
509            let Component::Normal(name) = component else {
510                return Err(crate::Error::configuration(format!(
511                    "invalid hashed input path '{}'",
512                    relative_path.display()
513                )));
514            };
515
516            let name = name.to_string_lossy().into_owned();
517            if components.peek().is_some() {
518                current = current.directories.entry(name).or_default();
519            } else {
520                current.files.insert(
521                    name.clone(),
522                    FileNode {
523                        name,
524                        digest: digest.clone(),
525                        is_executable,
526                    },
527                );
528            }
529        }
530
531        Ok(())
532    }
533
534    fn into_directory(self) -> Result<(Directory, Digest)> {
535        let mut directories = Vec::with_capacity(self.directories.len());
536        for (name, child) in self.directories {
537            let (_, child_digest) = child.into_directory()?;
538            directories.push(DirectoryNode {
539                name,
540                digest: child_digest,
541            });
542        }
543
544        let directory = Directory {
545            files: self.files.into_values().collect(),
546            directories,
547            symlinks: Vec::new(),
548        };
549        let digest = digest_of(&directory)
550            .map_err(|e| crate::Error::configuration(format!("input root digest: {e}")))?;
551        Ok((directory, digest))
552    }
553}
554
555fn build_input_root_digest(hashed: &[HashedInput]) -> Result<Digest> {
556    let mut builder = InputDirectoryBuilder::default();
557    for input in hashed {
558        builder.insert(
559            &input.relative_path,
560            Digest {
561                hash: input.sha256.clone(),
562                size_bytes: input.size,
563            },
564            input.is_executable,
565        )?;
566    }
567    let (_, digest) = builder.into_directory()?;
568    Ok(digest)
569}
570
571fn prefix_patterns_for_hasher_root(
572    patterns: &[String],
573    project_root: &Path,
574    hasher_root: &Path,
575) -> Result<Vec<String>> {
576    let prefix = project_root.strip_prefix(hasher_root).map_err(|e| {
577        crate::Error::configuration(format!(
578            "project root '{}' is not under cache hasher root '{}': {e}",
579            project_root.display(),
580            hasher_root.display()
581        ))
582    })?;
583
584    if prefix.as_os_str().is_empty() {
585        return Ok(patterns.to_vec());
586    }
587
588    Ok(patterns
589        .iter()
590        .map(|pattern| {
591            let trimmed = pattern.trim();
592            if trimmed.is_empty() {
593                String::new()
594            } else {
595                path_to_forward_slashes(&prefix.join(trimmed))
596            }
597        })
598        .collect())
599}
600
601fn rebase_hashed_inputs_for_project_root(
602    hashed: Vec<HashedInput>,
603    project_root: &Path,
604    hasher_root: &Path,
605) -> Result<Vec<HashedInput>> {
606    let prefix = project_root.strip_prefix(hasher_root).map_err(|e| {
607        crate::Error::configuration(format!(
608            "project root '{}' is not under cache hasher root '{}': {e}",
609            project_root.display(),
610            hasher_root.display()
611        ))
612    })?;
613
614    if prefix.as_os_str().is_empty() {
615        return Ok(hashed);
616    }
617
618    hashed
619        .into_iter()
620        .map(|input| {
621            let relative_path = input.relative_path.strip_prefix(prefix).map_err(|e| {
622                crate::Error::configuration(format!(
623                    "hashed input '{}' is not under task project root '{}': {e}",
624                    input.relative_path.display(),
625                    project_root.display()
626                ))
627            })?;
628
629            Ok(HashedInput {
630                relative_path: relative_path.to_path_buf(),
631                ..input
632            })
633        })
634        .collect()
635}
636
637fn normalize_workdir(workdir: &Path, project_root: &Path, module_root: &Path) -> String {
638    if let Ok(relative) = workdir.strip_prefix(project_root) {
639        return path_to_forward_slashes(relative);
640    }
641    if let Ok(relative) = workdir.strip_prefix(module_root) {
642        return path_to_forward_slashes(relative);
643    }
644    path_to_forward_slashes(workdir)
645}
646
647fn collect_outputs(workdir: &Path, patterns: &[String]) -> Result<Vec<PathBuf>> {
648    if patterns.is_empty() {
649        return Ok(Vec::new());
650    }
651
652    let mut builder = GlobSetBuilder::new();
653    let mut has_patterns = false;
654    for pattern in patterns {
655        let trimmed = pattern.trim();
656        if trimmed.is_empty() {
657            continue;
658        }
659
660        let looks_like_glob = trimmed.contains('*')
661            || trimmed.contains('{')
662            || trimmed.contains('?')
663            || trimmed.contains('[');
664        let mut glob_pattern = trimmed.to_string();
665        let absolute = workdir.join(trimmed);
666        if absolute.is_dir() && !looks_like_glob {
667            glob_pattern = format!("{}/**/*", trimmed.trim_end_matches('/'));
668        }
669
670        let glob = Glob::new(&glob_pattern).map_err(|e| {
671            crate::Error::configuration(format!("invalid output glob '{glob_pattern}': {e}"))
672        })?;
673        builder.add(glob);
674        has_patterns = true;
675    }
676
677    if !has_patterns {
678        return Ok(Vec::new());
679    }
680
681    let globset = builder
682        .build()
683        .map_err(|e| crate::Error::configuration(format!("failed to build output globset: {e}")))?;
684
685    let mut resolved = Vec::new();
686    for entry in WalkDir::new(workdir) {
687        let entry = entry.map_err(|e| {
688            crate::Error::configuration(format!("walk output tree {}: {e}", workdir.display()))
689        })?;
690        if entry.file_type().is_dir() {
691            continue;
692        }
693
694        let relative = entry.path().strip_prefix(workdir).map_err(|e| {
695            crate::Error::configuration(format!(
696                "output path '{}' not under workdir '{}': {e}",
697                entry.path().display(),
698                workdir.display()
699            ))
700        })?;
701        if globset.is_match(relative) {
702            resolved.push(relative.to_path_buf());
703        }
704    }
705
706    resolved.sort();
707    Ok(resolved)
708}
709
710fn path_to_forward_slashes(path: &Path) -> String {
711    path.to_string_lossy().replace('\\', "/")
712}
713
714#[cfg(unix)]
715fn is_executable(path: &Path) -> Result<bool> {
716    use std::os::unix::fs::PermissionsExt;
717
718    let metadata = std::fs::metadata(path)
719        .map_err(|e| crate::Error::configuration(format!("metadata {}: {e}", path.display())))?;
720    Ok(metadata.permissions().mode() & 0o111 != 0)
721}
722
723#[cfg(not(unix))]
724fn is_executable(_path: &Path) -> Result<bool> {
725    Ok(false)
726}
727
728#[cfg(unix)]
729fn set_executable_if_needed(path: &Path, is_executable: bool) -> Result<()> {
730    use std::os::unix::fs::PermissionsExt;
731
732    if !is_executable {
733        return Ok(());
734    }
735
736    let mut permissions = std::fs::metadata(path)
737        .map_err(|e| crate::Error::configuration(format!("metadata {}: {e}", path.display())))?
738        .permissions();
739    permissions.set_mode(permissions.mode() | 0o111);
740    std::fs::set_permissions(path, permissions).map_err(|e| {
741        crate::Error::configuration(format!("set permissions {}: {e}", path.display()))
742    })?;
743    Ok(())
744}
745
746#[cfg(not(unix))]
747fn set_executable_if_needed(_path: &Path, _is_executable: bool) -> Result<()> {
748    Ok(())
749}
750
751/// Inputs to [`record`] grouped to keep call sites self-documenting.
752pub struct RecordInput<'a> {
753    /// Cache configuration.
754    pub cache: &'a TaskCacheConfig,
755    /// Action digest the result is keyed under.
756    pub action_digest: &'a Digest,
757    /// Working directory the task ran in.
758    pub workdir: &'a Path,
759    /// The task definition.
760    pub task: &'a Task,
761    /// Captured stdout.
762    pub stdout: &'a str,
763    /// Captured stderr.
764    pub stderr: &'a str,
765    /// Process exit code.
766    pub exit_code: i32,
767    /// Wall-clock duration in milliseconds.
768    pub duration_ms: u128,
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774    use crate::environment::Environment;
775    use crate::tasks::{Input, Task, TaskCacheMode, TaskCachePolicy};
776    use cuenv_cas::{LocalActionCache, LocalCas};
777    use cuenv_vcs::WalkHasher;
778    use std::fs;
779    use tempfile::TempDir;
780
781    fn make_cache(root: &Path) -> TaskCacheConfig {
782        TaskCacheConfig {
783            cas: Arc::new(LocalCas::open(root).unwrap()),
784            action_cache: Arc::new(LocalActionCache::open(root).unwrap()),
785            vcs_hasher: Arc::new(WalkHasher::new(root)),
786            vcs_hasher_root: root.to_path_buf(),
787            cuenv_version: "test-version".to_string(),
788            runtime_identity_properties: BTreeMap::new(),
789            cache_disabled_reason: None,
790        }
791    }
792
793    fn make_task(command: &str, args: &[&str], inputs: &[&str], outputs: &[&str]) -> Task {
794        Task {
795            command: command.to_string(),
796            args: args.iter().map(|arg| (*arg).to_string()).collect(),
797            inputs: inputs
798                .iter()
799                .map(|path| Input::Path((*path).to_string()))
800                .collect(),
801            outputs: outputs.iter().map(|output| (*output).to_string()).collect(),
802            cache: Some(TaskCachePolicy {
803                mode: TaskCacheMode::ReadWrite,
804                max_age: None,
805            }),
806            ..Task::default()
807        }
808    }
809
810    async fn build_action_for_test(input: BuildActionInput<'_>) -> Option<(Action, Digest)> {
811        build_action(input).await.unwrap()
812    }
813
814    #[tokio::test]
815    async fn build_action_returns_none_when_no_inputs() {
816        let tmp = TempDir::new().unwrap();
817        let cache = make_cache(tmp.path());
818        let task = make_task("echo", &["hi"], &[], &[]);
819        let env = Environment::new();
820
821        let result = build_action_for_test(BuildActionInput {
822            task: &task,
823            task_name: "no-inputs",
824            environment: &env,
825            cache: &cache,
826            workdir: tmp.path(),
827            project_root: tmp.path(),
828            module_root: tmp.path(),
829        })
830        .await;
831        assert!(result.is_none());
832    }
833
834    #[tokio::test]
835    async fn build_action_is_deterministic() {
836        let tmp = TempDir::new().unwrap();
837        fs::write(tmp.path().join("input.txt"), "payload").unwrap();
838        let cache = make_cache(tmp.path());
839        let task = make_task("echo", &["hi"], &["input.txt"], &[]);
840        let env = Environment::new();
841
842        let (_, first) = build_action_for_test(BuildActionInput {
843            task: &task,
844            task_name: "t",
845            environment: &env,
846            cache: &cache,
847            workdir: tmp.path(),
848            project_root: tmp.path(),
849            module_root: tmp.path(),
850        })
851        .await
852        .unwrap();
853        let (_, second) = build_action_for_test(BuildActionInput {
854            task: &task,
855            task_name: "t",
856            environment: &env,
857            cache: &cache,
858            workdir: tmp.path(),
859            project_root: tmp.path(),
860            module_root: tmp.path(),
861        })
862        .await
863        .unwrap();
864        assert_eq!(first, second);
865    }
866
867    #[tokio::test]
868    async fn build_action_changes_when_input_changes() {
869        let tmp = TempDir::new().unwrap();
870        fs::write(tmp.path().join("input.txt"), "first").unwrap();
871        let cache = make_cache(tmp.path());
872        let task = make_task("echo", &["hi"], &["input.txt"], &[]);
873        let env = Environment::new();
874
875        let (_, first) = build_action_for_test(BuildActionInput {
876            task: &task,
877            task_name: "t",
878            environment: &env,
879            cache: &cache,
880            workdir: tmp.path(),
881            project_root: tmp.path(),
882            module_root: tmp.path(),
883        })
884        .await
885        .unwrap();
886
887        fs::write(tmp.path().join("input.txt"), "second").unwrap();
888        let (_, second) = build_action_for_test(BuildActionInput {
889            task: &task,
890            task_name: "t",
891            environment: &env,
892            cache: &cache,
893            workdir: tmp.path(),
894            project_root: tmp.path(),
895            module_root: tmp.path(),
896        })
897        .await
898        .unwrap();
899
900        assert_ne!(first, second);
901    }
902
903    #[tokio::test]
904    async fn build_action_hashes_inputs_relative_to_task_project_root() {
905        let tmp = TempDir::new().unwrap();
906        let workspace_root = tmp.path();
907        let nested_project_root = workspace_root.join("packages/app");
908        fs::create_dir_all(nested_project_root.join("src")).unwrap();
909        fs::create_dir_all(workspace_root.join("src")).unwrap();
910        fs::write(workspace_root.join("src/input.txt"), "workspace-root").unwrap();
911        fs::write(nested_project_root.join("src/input.txt"), "nested-project").unwrap();
912
913        let cache = make_cache(workspace_root);
914        let task = make_task("echo", &["hi"], &["src/input.txt"], &[]);
915        let env = Environment::new();
916
917        let (_, first) = build_action_for_test(BuildActionInput {
918            task: &task,
919            task_name: "nested",
920            environment: &env,
921            cache: &cache,
922            workdir: &nested_project_root,
923            project_root: &nested_project_root,
924            module_root: workspace_root,
925        })
926        .await
927        .unwrap();
928
929        fs::write(
930            workspace_root.join("src/input.txt"),
931            "workspace-root-updated",
932        )
933        .unwrap();
934        let (_, second) = build_action_for_test(BuildActionInput {
935            task: &task,
936            task_name: "nested",
937            environment: &env,
938            cache: &cache,
939            workdir: &nested_project_root,
940            project_root: &nested_project_root,
941            module_root: workspace_root,
942        })
943        .await
944        .unwrap();
945
946        assert_eq!(first, second);
947
948        fs::write(
949            nested_project_root.join("src/input.txt"),
950            "nested-project-updated",
951        )
952        .unwrap();
953        let (_, third) = build_action_for_test(BuildActionInput {
954            task: &task,
955            task_name: "nested",
956            environment: &env,
957            cache: &cache,
958            workdir: &nested_project_root,
959            project_root: &nested_project_root,
960            module_root: workspace_root,
961        })
962        .await
963        .unwrap();
964
965        assert_ne!(first, third);
966    }
967
968    #[tokio::test]
969    async fn build_action_changes_when_command_changes() {
970        let tmp = TempDir::new().unwrap();
971        fs::write(tmp.path().join("input.txt"), "payload").unwrap();
972        let cache = make_cache(tmp.path());
973        let env = Environment::new();
974
975        let task1 = make_task("cargo", &["build"], &["input.txt"], &[]);
976        let task2 = make_task("cargo", &["test"], &["input.txt"], &[]);
977
978        let (_, first) = build_action_for_test(BuildActionInput {
979            task: &task1,
980            task_name: "t",
981            environment: &env,
982            cache: &cache,
983            workdir: tmp.path(),
984            project_root: tmp.path(),
985            module_root: tmp.path(),
986        })
987        .await
988        .unwrap();
989        let (_, second) = build_action_for_test(BuildActionInput {
990            task: &task2,
991            task_name: "t",
992            environment: &env,
993            cache: &cache,
994            workdir: tmp.path(),
995            project_root: tmp.path(),
996            module_root: tmp.path(),
997        })
998        .await
999        .unwrap();
1000        assert_ne!(first, second);
1001    }
1002
1003    #[tokio::test]
1004    async fn build_action_changes_when_script_changes() {
1005        let tmp = TempDir::new().unwrap();
1006        fs::write(tmp.path().join("input.txt"), "payload").unwrap();
1007        let cache = make_cache(tmp.path());
1008        let env = Environment::new();
1009
1010        let task1 = Task {
1011            script: Some("echo one".to_string()),
1012            inputs: vec![Input::Path("input.txt".to_string())],
1013            cache: Some(TaskCachePolicy {
1014                mode: TaskCacheMode::ReadWrite,
1015                max_age: None,
1016            }),
1017            ..Task::default()
1018        };
1019        let task2 = Task {
1020            script: Some("echo two".to_string()),
1021            inputs: vec![Input::Path("input.txt".to_string())],
1022            cache: Some(TaskCachePolicy {
1023                mode: TaskCacheMode::ReadWrite,
1024                max_age: None,
1025            }),
1026            ..Task::default()
1027        };
1028
1029        let (_, first) = build_action_for_test(BuildActionInput {
1030            task: &task1,
1031            task_name: "script",
1032            environment: &env,
1033            cache: &cache,
1034            workdir: tmp.path(),
1035            project_root: tmp.path(),
1036            module_root: tmp.path(),
1037        })
1038        .await
1039        .unwrap();
1040        let (_, second) = build_action_for_test(BuildActionInput {
1041            task: &task2,
1042            task_name: "script",
1043            environment: &env,
1044            cache: &cache,
1045            workdir: tmp.path(),
1046            project_root: tmp.path(),
1047            module_root: tmp.path(),
1048        })
1049        .await
1050        .unwrap();
1051
1052        assert_ne!(first, second);
1053    }
1054
1055    #[tokio::test]
1056    async fn record_then_lookup_roundtrips() {
1057        let tmp = TempDir::new().unwrap();
1058        let workdir = tmp.path().join("work");
1059        fs::create_dir_all(&workdir).unwrap();
1060        fs::write(tmp.path().join("input.txt"), "in").unwrap();
1061        fs::write(workdir.join("out.txt"), "produced").unwrap();
1062
1063        let cache = make_cache(tmp.path());
1064        let task = make_task("echo", &["hi"], &["input.txt"], &["out.txt"]);
1065        let env = Environment::new();
1066
1067        let (_, action_digest) = build_action_for_test(BuildActionInput {
1068            task: &task,
1069            task_name: "t",
1070            environment: &env,
1071            cache: &cache,
1072            workdir: &workdir,
1073            project_root: tmp.path(),
1074            module_root: tmp.path(),
1075        })
1076        .await
1077        .unwrap();
1078
1079        record(RecordInput {
1080            cache: &cache,
1081            action_digest: &action_digest,
1082            workdir: &workdir,
1083            task: &task,
1084            stdout: "stdout-text",
1085            stderr: "stderr-text",
1086            exit_code: 0,
1087            duration_ms: 42,
1088        })
1089        .unwrap();
1090
1091        let recorded = lookup(&cache, &action_digest, &task).unwrap().unwrap();
1092        assert_eq!(recorded.exit_code, 0);
1093        assert_eq!(recorded.output_files.len(), 1);
1094        assert_eq!(recorded.output_files[0].path, "out.txt");
1095
1096        let fresh = tmp.path().join("fresh");
1097        fs::create_dir_all(&fresh).unwrap();
1098        let (stdout, stderr, exit_code) = materialize_hit(&cache, &fresh, &recorded).unwrap();
1099        assert_eq!(stdout, "stdout-text");
1100        assert_eq!(stderr, "stderr-text");
1101        assert_eq!(exit_code, 0);
1102        assert_eq!(fs::read(fresh.join("out.txt")).unwrap(), b"produced");
1103    }
1104
1105    #[cfg(unix)]
1106    #[tokio::test]
1107    async fn record_and_materialize_preserve_executable_outputs() {
1108        use std::os::unix::fs::PermissionsExt;
1109
1110        let tmp = TempDir::new().unwrap();
1111        let workdir = tmp.path().join("work");
1112        fs::create_dir_all(&workdir).unwrap();
1113        fs::write(tmp.path().join("input.txt"), "in").unwrap();
1114        let script = workdir.join("bin/run.sh");
1115        fs::create_dir_all(script.parent().unwrap()).unwrap();
1116        fs::write(&script, "#!/bin/sh\necho hi\n").unwrap();
1117        let mut permissions = fs::metadata(&script).unwrap().permissions();
1118        permissions.set_mode(0o755);
1119        fs::set_permissions(&script, permissions).unwrap();
1120
1121        let cache = make_cache(tmp.path());
1122        let task = make_task("echo", &["hi"], &["input.txt"], &["bin"]);
1123        let env = Environment::new();
1124
1125        let (_, action_digest) = build_action_for_test(BuildActionInput {
1126            task: &task,
1127            task_name: "exec",
1128            environment: &env,
1129            cache: &cache,
1130            workdir: &workdir,
1131            project_root: tmp.path(),
1132            module_root: tmp.path(),
1133        })
1134        .await
1135        .unwrap();
1136
1137        record(RecordInput {
1138            cache: &cache,
1139            action_digest: &action_digest,
1140            workdir: &workdir,
1141            task: &task,
1142            stdout: "",
1143            stderr: "",
1144            exit_code: 0,
1145            duration_ms: 1,
1146        })
1147        .unwrap();
1148
1149        let recorded = lookup(&cache, &action_digest, &task).unwrap().unwrap();
1150        let fresh = tmp.path().join("fresh");
1151        fs::create_dir_all(&fresh).unwrap();
1152        materialize_hit(&cache, &fresh, &recorded).unwrap();
1153
1154        let mode = fs::metadata(fresh.join("bin/run.sh"))
1155            .unwrap()
1156            .permissions()
1157            .mode();
1158        assert_ne!(mode & 0o111, 0);
1159    }
1160
1161    #[tokio::test]
1162    async fn build_action_returns_none_when_cache_mode_never() {
1163        let tmp = TempDir::new().unwrap();
1164        fs::write(tmp.path().join("input.txt"), "payload").unwrap();
1165        let cache = make_cache(tmp.path());
1166        let task = Task {
1167            command: "echo".to_string(),
1168            args: vec!["hi".to_string()],
1169            inputs: vec![Input::Path("input.txt".to_string())],
1170            cache: Some(TaskCachePolicy {
1171                mode: TaskCacheMode::Never,
1172                max_age: None,
1173            }),
1174            ..Task::default()
1175        };
1176        let env = Environment::new();
1177
1178        let result = build_action_for_test(BuildActionInput {
1179            task: &task,
1180            task_name: "never",
1181            environment: &env,
1182            cache: &cache,
1183            workdir: tmp.path(),
1184            project_root: tmp.path(),
1185            module_root: tmp.path(),
1186        })
1187        .await;
1188        assert!(result.is_none());
1189    }
1190
1191    #[tokio::test]
1192    async fn build_action_returns_none_when_explicit_input_is_missing() {
1193        let tmp = TempDir::new().unwrap();
1194        let cache = make_cache(tmp.path());
1195        let task = make_task("echo", &["hi"], &["missing.txt"], &[]);
1196        let env = Environment::new();
1197
1198        let result = build_action_for_test(BuildActionInput {
1199            task: &task,
1200            task_name: "missing",
1201            environment: &env,
1202            cache: &cache,
1203            workdir: tmp.path(),
1204            project_root: tmp.path(),
1205            module_root: tmp.path(),
1206        })
1207        .await;
1208
1209        assert!(result.is_none());
1210    }
1211
1212    #[tokio::test]
1213    async fn lookup_respects_max_age() {
1214        let tmp = TempDir::new().unwrap();
1215        let workdir = tmp.path().join("work");
1216        fs::create_dir_all(&workdir).unwrap();
1217        fs::write(tmp.path().join("input.txt"), "in").unwrap();
1218        fs::write(workdir.join("out.txt"), "produced").unwrap();
1219
1220        let cache = make_cache(tmp.path());
1221        let task = Task {
1222            command: "echo".to_string(),
1223            args: vec!["hi".to_string()],
1224            inputs: vec![Input::Path("input.txt".to_string())],
1225            outputs: vec!["out.txt".to_string()],
1226            cache: Some(TaskCachePolicy {
1227                mode: TaskCacheMode::ReadWrite,
1228                max_age: Some("1ms".to_string()),
1229            }),
1230            ..Task::default()
1231        };
1232        let env = Environment::new();
1233
1234        let (_, action_digest) = build_action_for_test(BuildActionInput {
1235            task: &task,
1236            task_name: "ttl",
1237            environment: &env,
1238            cache: &cache,
1239            workdir: &workdir,
1240            project_root: tmp.path(),
1241            module_root: tmp.path(),
1242        })
1243        .await
1244        .unwrap();
1245        record(RecordInput {
1246            cache: &cache,
1247            action_digest: &action_digest,
1248            workdir: &workdir,
1249            task: &task,
1250            stdout: "stdout-text",
1251            stderr: "stderr-text",
1252            exit_code: 0,
1253            duration_ms: 42,
1254        })
1255        .unwrap();
1256
1257        std::thread::sleep(std::time::Duration::from_millis(5));
1258        let lookup_result = lookup(&cache, &action_digest, &task).unwrap();
1259        assert!(lookup_result.is_none());
1260    }
1261
1262    #[tokio::test]
1263    async fn record_skips_non_zero_exit_codes() {
1264        let tmp = TempDir::new().unwrap();
1265        let workdir = tmp.path().join("work");
1266        fs::create_dir_all(&workdir).unwrap();
1267        fs::write(tmp.path().join("input.txt"), "in").unwrap();
1268        fs::write(workdir.join("out.txt"), "produced").unwrap();
1269
1270        let cache = make_cache(tmp.path());
1271        let task = make_task("echo", &["hi"], &["input.txt"], &["out.txt"]);
1272        let env = Environment::new();
1273
1274        let (_, action_digest) = build_action_for_test(BuildActionInput {
1275            task: &task,
1276            task_name: "non-zero",
1277            environment: &env,
1278            cache: &cache,
1279            workdir: &workdir,
1280            project_root: tmp.path(),
1281            module_root: tmp.path(),
1282        })
1283        .await
1284        .unwrap();
1285
1286        record(RecordInput {
1287            cache: &cache,
1288            action_digest: &action_digest,
1289            workdir: &workdir,
1290            task: &task,
1291            stdout: "stdout-text",
1292            stderr: "stderr-text",
1293            exit_code: 1,
1294            duration_ms: 42,
1295        })
1296        .unwrap();
1297
1298        let lookup_result = lookup(&cache, &action_digest, &task).unwrap();
1299        assert!(lookup_result.is_none());
1300    }
1301}