use std::collections::BTreeMap;
use std::ffi::OsString;
use std::io;
use std::path::{Path, PathBuf};
use snafu::{ResultExt, Snafu};
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use haz_cache::{Cache, Hasher, RestoreError, StoreError, StoreInputs, StoredOutput};
use haz_dag::graph::TaskGraph;
use haz_domain::action::{ShellType, TaskAction};
use haz_domain::env::{EnvSettings, EnvVarName};
use haz_domain::path::{CanonicalPath, OutputSpec, ParseAbsoluteError, PathPattern, ProjectRoot};
use haz_domain::project::Project;
use haz_domain::settings::cache::HashAlgo;
use haz_domain::task::Task;
use haz_domain::task_id::TaskId;
use haz_domain::workspace::Workspace;
use haz_vfs::{EntryKind, Filesystem, FsError, WritableFilesystem};
use crate::cache_key::{BuildKeyError, PredecessorStreamHashes, build_cache_key};
use crate::pattern_walk::{
GlobMatchAction, GlobWalk, glob_walk_origin, host_path_from_segments,
literal_workspace_segments, workspace_absolute_string_from_segments,
};
use crate::process::{
ExitStatus, Process, ProcessError, ProcessSpawner, Signal, SpawnPlan, Spawned,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RunSource {
CacheHit,
FreshRun,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RunState {
Succeeded,
Failed,
Cancelled,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompletedRecord {
pub task: TaskId,
pub source: RunSource,
pub state: RunState,
pub exit_status: Option<ExitStatus>,
pub stdout_hash: [u8; 32],
pub stderr_hash: [u8; 32],
pub materialised_outputs: Vec<CanonicalPath>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SkipRecord {
pub task: TaskId,
pub cause: SkipCause,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SkipCause {
UpstreamFailed {
upstream: TaskId,
},
UpstreamErrored {
upstream: TaskId,
},
RuntimeCycle,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CancelledRecord {
SignaledInFlight {
task: TaskId,
exit_status: ExitStatus,
stdout_hash: [u8; 32],
stderr_hash: [u8; 32],
},
UpstreamCancelled {
task: TaskId,
upstream: TaskId,
},
RunCancelled {
task: TaskId,
},
}
impl CancelledRecord {
#[must_use]
pub fn task(&self) -> &TaskId {
match self {
Self::SignaledInFlight { task, .. }
| Self::UpstreamCancelled { task, .. }
| Self::RunCancelled { task } => task,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RunOutcome {
Completed(CompletedRecord),
Skipped(SkipRecord),
Cancelled(CancelledRecord),
}
impl RunOutcome {
#[must_use]
pub fn task(&self) -> &TaskId {
match self {
Self::Completed(record) => &record.task,
Self::Skipped(record) => &record.task,
Self::Cancelled(record) => record.task(),
}
}
}
pub trait RunObserver {
fn on_task_started(&self, task: &TaskId);
fn on_stdout(&self, task: &TaskId, bytes: &[u8]);
fn on_stderr(&self, task: &TaskId, bytes: &[u8]);
fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord);
fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord);
fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord);
}
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum RunTaskError {
#[snafu(display("failed to derive cache key: {source}"))]
BuildKeyFailed {
source: BuildKeyError,
},
#[snafu(display("failed to restore cache entry: {source}"))]
RestoreFailed {
source: RestoreError,
},
#[snafu(display("failed to spawn process: {source}"))]
SpawnFailed {
source: ProcessError,
},
#[snafu(display("failed to wait for spawned process: {source}"))]
WaitFailed {
source: ProcessError,
},
#[snafu(display("failed to read captured stream: {source}"))]
CapturedStreamReadFailed {
stream: CapturedStream,
source: io::Error,
},
#[snafu(display(
"failed to resolve output patterns under: {}: {source}",
root.display()
))]
OutputPatternResolutionFailed {
root: PathBuf,
source: FsError,
},
#[snafu(display("failed to read mode of output file: {}: {source}", path.display()))]
OutputModeReadFailed {
path: PathBuf,
source: FsError,
},
#[snafu(display("output path is not a regular file: {}", path.display()))]
OutputNotARegularFile {
path: PathBuf,
},
#[snafu(display("task declared output but did not produce it: {}", path.display()))]
OutputDeclaredButNotProduced {
path: PathBuf,
},
#[snafu(display("failed to store cache entry: {source}"))]
StoreFailed {
source: StoreError,
},
#[snafu(display("materialised output path is not workspace-absolute: {path}: {source}"))]
MaterialisedOutputPathInvalid {
path: String,
source: ParseAbsoluteError,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CapturedStream {
Stdout,
Stderr,
}
pub struct RunContext<'a, F, S, O>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
pub fs: &'a F,
pub cache: &'a Cache<F>,
pub spawner: &'a S,
pub observer: &'a O,
pub workspace: &'a Workspace,
pub graph: &'a TaskGraph,
pub host_env: &'a BTreeMap<EnvVarName, String>,
pub algo: HashAlgo,
pub cancel: &'a CancellationToken,
}
#[derive(Debug)]
pub struct TaskLookup<'ws> {
pub project: &'ws Project,
pub task_def: &'ws Task,
pub key: haz_cache::CacheKey,
pub manifest: Option<haz_cache::Manifest>,
}
pub fn cache_lookup_phase<'ws, F, S, O>(
ctx: &RunContext<'ws, F, S, O>,
task: &TaskId,
predecessor_streams: &BTreeMap<TaskId, PredecessorStreamHashes>,
) -> Result<TaskLookup<'ws>, RunTaskError>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
let project =
ctx.workspace
.projects
.get(&task.project)
.ok_or_else(|| RunTaskError::BuildKeyFailed {
source: BuildKeyError::TaskNotInWorkspace { task: task.clone() },
})?;
let task_def = project
.tasks
.get(&task.task)
.ok_or_else(|| RunTaskError::BuildKeyFailed {
source: BuildKeyError::TaskNotInWorkspace { task: task.clone() },
})?;
let key = build_cache_key(
ctx.fs,
ctx.workspace,
ctx.graph,
task,
ctx.host_env,
predecessor_streams,
ctx.algo,
)
.context(BuildKeyFailedSnafu)?;
let manifest = ctx.cache.lookup(&key);
Ok(TaskLookup {
project,
task_def,
key,
manifest,
})
}
pub async fn run_task<F, S, O>(
ctx: &RunContext<'_, F, S, O>,
task: &TaskId,
predecessor_streams: &BTreeMap<TaskId, PredecessorStreamHashes>,
created_at_unix: u64,
) -> Result<CompletedRecord, RunTaskError>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
ctx.observer.on_task_started(task);
let lookup = cache_lookup_phase(ctx, task, predecessor_streams)?;
let record = if let Some(manifest) = lookup.manifest.as_ref() {
restore_from_hit(ctx, task, manifest)?
} else {
run_fresh(
ctx,
task,
lookup.project,
lookup.task_def,
&lookup.key,
created_at_unix,
)
.await?
};
ctx.observer.on_task_finished(task, &record);
Ok(record)
}
pub fn restore_from_hit<F, S, O>(
ctx: &RunContext<'_, F, S, O>,
task: &TaskId,
manifest: &haz_cache::Manifest,
) -> Result<CompletedRecord, RunTaskError>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
let restored = ctx.cache.restore(manifest).context(RestoreFailedSnafu)?;
ctx.observer.on_stdout(task, &restored.stdout);
ctx.observer.on_stderr(task, &restored.stderr);
let materialised_outputs = manifest
.outputs
.iter()
.map(|blob| blob.workspace_absolute_path.clone())
.collect();
Ok(CompletedRecord {
task: task.clone(),
source: RunSource::CacheHit,
state: RunState::Succeeded,
exit_status: None,
stdout_hash: manifest.stdout_hash,
stderr_hash: manifest.stderr_hash,
materialised_outputs,
})
}
pub async fn run_fresh<F, S, O>(
ctx: &RunContext<'_, F, S, O>,
task: &TaskId,
project: &Project,
task_def: &Task,
key: &haz_cache::CacheKey,
created_at_unix: u64,
) -> Result<CompletedRecord, RunTaskError>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
let plan = build_spawn_plan(
ctx.workspace.root.as_path(),
project,
task_def,
ctx.host_env,
);
let Spawned {
mut process,
mut stdout,
mut stderr,
} = ctx.spawner.spawn(&plan).await.context(SpawnFailedSnafu)?;
let mut stdout_bytes = Vec::new();
let mut stderr_bytes = Vec::new();
let grace = ctx.workspace.settings.execution.cancel_grace.as_duration();
let ((wait_result, was_cancelled), stdout_result, stderr_result) = tokio::join!(
await_exit_with_cancel(&mut process, ctx.cancel, grace),
AsyncReadExt::read_to_end(&mut stdout, &mut stdout_bytes),
AsyncReadExt::read_to_end(&mut stderr, &mut stderr_bytes),
);
let exit_status = wait_result.context(WaitFailedSnafu)?;
stdout_result.context(CapturedStreamReadFailedSnafu {
stream: CapturedStream::Stdout,
})?;
stderr_result.context(CapturedStreamReadFailedSnafu {
stream: CapturedStream::Stderr,
})?;
ctx.observer.on_stdout(task, &stdout_bytes);
ctx.observer.on_stderr(task, &stderr_bytes);
let stdout_hash = hash_bytes(ctx.algo, &stdout_bytes);
let stderr_hash = hash_bytes(ctx.algo, &stderr_bytes);
let (state, materialised_outputs) = if was_cancelled {
(RunState::Cancelled, Vec::new())
} else if exit_status.success() {
let outputs_owned =
resolve_output_files(ctx.fs, ctx.workspace, project, &task_def.outputs)?;
let materialised = canonical_paths_from_owned(&outputs_owned)?;
store_successful_run(
ctx,
key,
&outputs_owned,
&stdout_bytes,
&stderr_bytes,
created_at_unix,
)?;
(RunState::Succeeded, materialised)
} else {
(RunState::Failed, Vec::new())
};
Ok(CompletedRecord {
task: task.clone(),
source: RunSource::FreshRun,
state,
exit_status: Some(exit_status),
stdout_hash,
stderr_hash,
materialised_outputs,
})
}
async fn await_exit_with_cancel<P>(
process: &mut P,
cancel: &CancellationToken,
grace: std::time::Duration,
) -> (Result<ExitStatus, ProcessError>, bool)
where
P: Process,
{
tokio::select! {
biased;
() = cancel.cancelled() => {
let _ = process.send_signal(Signal::Terminate);
tokio::select! {
result = process.wait() => (result, true),
() = tokio::time::sleep(grace) => {
let _ = process.send_signal(Signal::Kill);
(process.wait().await, true)
}
}
}
result = process.wait() => (result, false),
}
}
fn store_successful_run<F, S, O>(
ctx: &RunContext<'_, F, S, O>,
key: &haz_cache::CacheKey,
outputs_owned: &[OwnedOutputFile],
stdout: &[u8],
stderr: &[u8],
created_at_unix: u64,
) -> Result<(), RunTaskError>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
let stored_outputs: Vec<StoredOutput<'_>> = outputs_owned
.iter()
.map(|f| StoredOutput {
workspace_absolute_path: &f.workspace_absolute_path,
on_disk_path: &f.on_disk_path,
mode: f.mode,
})
.collect();
let inputs = StoreInputs {
outputs: &stored_outputs,
stdout,
stderr,
created_at_unix,
};
ctx.cache.store(key, &inputs).context(StoreFailedSnafu)
}
fn canonical_paths_from_owned(
files: &[OwnedOutputFile],
) -> Result<Vec<CanonicalPath>, RunTaskError> {
files
.iter()
.map(|f| {
CanonicalPath::parse_workspace_absolute(&f.workspace_absolute_path).context(
MaterialisedOutputPathInvalidSnafu {
path: f.workspace_absolute_path.clone(),
},
)
})
.collect()
}
fn hash_bytes(algo: HashAlgo, bytes: &[u8]) -> [u8; 32] {
let mut hasher = Hasher::new(algo);
hasher.update(bytes);
hasher.finalize()
}
fn build_spawn_plan(
workspace_host: &Path,
project: &Project,
task_def: &Task,
host_env: &BTreeMap<EnvVarName, String>,
) -> SpawnPlan {
let (program, args) = program_and_args(&task_def.action);
SpawnPlan {
program,
args,
env: build_env_vec(&task_def.env, host_env),
cwd: project_cwd(workspace_host, &project.root),
}
}
fn project_cwd(workspace_host: &Path, project_root: &ProjectRoot) -> PathBuf {
let mut p = workspace_host.to_path_buf();
if let ProjectRoot::Nested(canonical) = project_root {
for seg in canonical.segments() {
p.push(seg.as_str());
}
}
p
}
fn program_and_args(action: &TaskAction) -> (OsString, Vec<OsString>) {
match action {
TaskAction::Command(nonempty_argv) => {
let program = OsString::from(nonempty_argv.head.as_str());
let args = nonempty_argv.tail.iter().map(OsString::from).collect();
(program, args)
}
TaskAction::Shell { script, shell } => {
let program = OsString::from(shell_binary_name(shell));
let args = vec![OsString::from("-c"), OsString::from(script)];
(program, args)
}
}
}
fn shell_binary_name(shell: &ShellType) -> &str {
match shell {
ShellType::Sh => "sh",
ShellType::Bash => "bash",
ShellType::Other(name) => AsRef::<str>::as_ref(name.as_ref()),
}
}
fn build_env_vec(
env: &EnvSettings,
host_env: &BTreeMap<EnvVarName, String>,
) -> Vec<(OsString, OsString)> {
let mut effective: BTreeMap<OsString, OsString> = BTreeMap::new();
for name in &env.from_host {
if let Some(value) = host_env.get(name) {
effective.insert(
OsString::from(AsRef::<str>::as_ref(name.as_ref())),
OsString::from(value),
);
}
}
for (name, value) in &env.overrides {
effective.insert(
OsString::from(AsRef::<str>::as_ref(name.as_ref())),
OsString::from(value),
);
}
effective.into_iter().collect()
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct OwnedOutputFile {
workspace_absolute_path: String,
on_disk_path: PathBuf,
mode: u32,
}
fn resolve_output_files<F: Filesystem>(
fs: &F,
workspace: &Workspace,
project: &Project,
outputs: &[OutputSpec],
) -> Result<Vec<OwnedOutputFile>, RunTaskError> {
let workspace_host = workspace.root.as_path();
let action = OutputAction;
let mut out = Vec::new();
for spec in outputs {
match spec.pattern() {
PathPattern::Literal(haz_path) => {
resolve_literal_output(
fs,
workspace_host,
&project.root,
haz_path,
&action,
&mut out,
)?;
}
PathPattern::Glob(glob_pattern) => {
let glob = glob_pattern.compile();
let matcher = glob.compile_matcher();
let (walk_host, workspace_prefix, candidate_prefix) =
glob_walk_origin(workspace_host, &project.root, glob_pattern.anchor());
let walker = GlobWalk {
fs,
matcher: &matcher,
candidate_prefix,
workspace_prefix,
action: &action,
};
let mut walk_rel: Vec<String> = Vec::new();
walker.walk(&walk_host, &mut walk_rel, &mut out)?;
}
}
}
Ok(out)
}
fn resolve_literal_output<F: Filesystem>(
fs: &F,
workspace_host: &Path,
project_root: &ProjectRoot,
haz_path: &haz_domain::path::HazPath,
action: &OutputAction,
out: &mut Vec<OwnedOutputFile>,
) -> Result<(), RunTaskError> {
let ws_segments = literal_workspace_segments(haz_path, project_root);
let host = host_path_from_segments(workspace_host, &ws_segments);
let meta = match fs.metadata(&host) {
Ok(m) => m,
Err(FsError::NotFound { path }) => {
return Err(RunTaskError::OutputDeclaredButNotProduced { path });
}
Err(source) => {
return Err(RunTaskError::OutputPatternResolutionFailed { root: host, source });
}
};
if meta.kind != EntryKind::File {
return Err(RunTaskError::OutputNotARegularFile { path: host });
}
let workspace_absolute_path = workspace_absolute_string_from_segments(&ws_segments);
action.on_match(fs, &host, workspace_absolute_path, out)
}
struct OutputAction;
impl<F: Filesystem> GlobMatchAction<F> for OutputAction {
type Output = OwnedOutputFile;
type Error = RunTaskError;
fn map_walk_error(&self, root: PathBuf, source: FsError) -> RunTaskError {
RunTaskError::OutputPatternResolutionFailed { root, source }
}
fn on_match(
&self,
fs: &F,
host_path: &Path,
workspace_absolute_path: String,
out: &mut Vec<OwnedOutputFile>,
) -> Result<(), RunTaskError> {
let mode = fs
.permissions(host_path)
.context(OutputModeReadFailedSnafu {
path: host_path.to_path_buf(),
})?;
out.push(OwnedOutputFile {
workspace_absolute_path,
on_disk_path: host_path.to_path_buf(),
mode,
});
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use std::ffi::OsString;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use nonempty::NonEmpty;
use haz_domain::action::{ShellType, TaskAction};
use haz_domain::env::{EnvSettings, EnvVarName};
use haz_domain::name::{ProjectName, TaskName};
use haz_domain::path::{CanonicalPath, HazPath, InputSpec, ProjectRoot};
use haz_domain::project::Project;
use haz_domain::task::Task;
use super::{build_env_vec, build_spawn_plan, project_cwd, shell_binary_name};
fn env_name(s: &str) -> EnvVarName {
EnvVarName::try_new(s).unwrap()
}
fn project_name(s: &str) -> ProjectName {
ProjectName::try_new(s).unwrap()
}
fn task_name(s: &str) -> TaskName {
TaskName::try_new(s).unwrap()
}
fn nested_project(name: &str, root: &str) -> Project {
Project {
name: project_name(name),
root: ProjectRoot::Nested(
CanonicalPath::from_absolute(&HazPath::parse(root).unwrap()).unwrap(),
),
tags: BTreeSet::new(),
tasks: BTreeMap::new(),
}
}
fn implicit_project(name: &str) -> Project {
Project {
name: project_name(name),
root: ProjectRoot::WorkspaceRoot,
tags: BTreeSet::new(),
tasks: BTreeMap::new(),
}
}
fn task_with(action: TaskAction, env: EnvSettings) -> Task {
Task {
name: task_name("run"),
action,
inputs: Vec::<InputSpec>::new(),
outputs: Vec::new(),
deps: Vec::new(),
weak_deps: Vec::new(),
mutex: None,
env,
}
}
fn command(argv: &[&str]) -> TaskAction {
TaskAction::Command(
NonEmpty::from_vec(argv.iter().map(|s| (*s).to_owned()).collect())
.expect("non-empty argv"),
)
}
fn shell(script: &str, shell_type: ShellType) -> TaskAction {
TaskAction::Shell {
script: script.to_owned(),
shell: shell_type,
}
}
#[test]
fn shell_binary_name_for_sh() {
assert_eq!(shell_binary_name(&ShellType::Sh), "sh");
}
#[test]
fn shell_binary_name_for_bash() {
assert_eq!(shell_binary_name(&ShellType::Bash), "bash");
}
#[test]
fn shell_binary_name_for_other_uses_validated_name() {
let name = haz_domain::action::NonEmptyAsciiName::from_str("zsh").unwrap();
assert_eq!(shell_binary_name(&ShellType::Other(name)), "zsh");
}
#[test]
fn project_cwd_for_nested_appends_segments() {
let p = nested_project("lib_core", "/lib_core");
let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
assert_eq!(cwd, PathBuf::from("/abs/ws/lib_core"));
}
#[test]
fn project_cwd_for_deep_nested_walks_every_segment() {
let p = nested_project("frontend", "/web/frontend");
let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
assert_eq!(cwd, PathBuf::from("/abs/ws/web/frontend"));
}
#[test]
fn project_cwd_for_implicit_mode_is_workspace_host() {
let p = implicit_project("root");
let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
assert_eq!(cwd, PathBuf::from("/abs/ws"));
}
fn env_settings(from_host: &[&str], overrides: &[(&str, &str)]) -> EnvSettings {
EnvSettings {
from_host: from_host.iter().map(|s| env_name(s)).collect(),
overrides: overrides
.iter()
.map(|(k, v)| (env_name(k), (*v).to_owned()))
.collect(),
}
}
fn host_snapshot(entries: &[(&str, &str)]) -> BTreeMap<EnvVarName, String> {
entries
.iter()
.map(|(k, v)| (env_name(k), (*v).to_owned()))
.collect()
}
fn osstr_pairs(slice: &[(&str, &str)]) -> Vec<(OsString, OsString)> {
slice
.iter()
.map(|(k, v)| (OsString::from(*k), OsString::from(*v)))
.collect()
}
#[test]
fn build_env_vec_from_host_present_name_propagates() {
let settings = env_settings(&["PATH"], &[]);
let host = host_snapshot(&[("PATH", "/usr/bin")]);
let got = build_env_vec(&settings, &host);
assert_eq!(got, osstr_pairs(&[("PATH", "/usr/bin")]));
}
#[test]
fn build_env_vec_from_host_absent_name_is_excluded() {
let settings = env_settings(&["NEVER_SET"], &[]);
let host = host_snapshot(&[("OTHER", "v")]);
let got = build_env_vec(&settings, &host);
assert!(
got.is_empty(),
"absent from_host names must not enter the child env, got {got:?}",
);
}
#[test]
fn build_env_vec_override_wins_on_collision() {
let settings = env_settings(&["X"], &[("X", "override-val")]);
let host = host_snapshot(&[("X", "host-val")]);
let got = build_env_vec(&settings, &host);
assert_eq!(got, osstr_pairs(&[("X", "override-val")]));
}
#[test]
fn build_env_vec_override_only_entries_propagate() {
let settings = env_settings(&[], &[("HAZ_MODE", "ci")]);
let host = host_snapshot(&[]);
let got = build_env_vec(&settings, &host);
assert_eq!(got, osstr_pairs(&[("HAZ_MODE", "ci")]));
}
#[test]
fn build_env_vec_unrelated_host_names_do_not_appear() {
let settings = env_settings(&["PATH"], &[]);
let host = host_snapshot(&[("PATH", "/usr/bin"), ("HOME", "/home/me")]);
let got = build_env_vec(&settings, &host);
assert_eq!(got, osstr_pairs(&[("PATH", "/usr/bin")]));
}
#[test]
fn build_env_vec_result_is_lexicographic() {
let settings = env_settings(&["ZULU", "ALPHA", "BRAVO"], &[]);
let host = host_snapshot(&[("ALPHA", "a"), ("BRAVO", "b"), ("ZULU", "z")]);
let got = build_env_vec(&settings, &host);
let names: Vec<&OsString> = got.iter().map(|(k, _)| k).collect();
assert_eq!(
names,
vec![
&OsString::from("ALPHA"),
&OsString::from("BRAVO"),
&OsString::from("ZULU"),
]
);
}
#[test]
fn build_env_vec_empty_value_is_kept() {
let settings = env_settings(&["X"], &[]);
let host = host_snapshot(&[("X", "")]);
let got = build_env_vec(&settings, &host);
assert_eq!(got, osstr_pairs(&[("X", "")]));
}
#[test]
fn build_spawn_plan_command_maps_argv_head_and_tail() {
let p = nested_project("proj", "/proj");
let t = task_with(
command(&["cargo", "build", "--release"]),
EnvSettings::default(),
);
let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
assert_eq!(plan.program, OsString::from("cargo"));
assert_eq!(
plan.args,
vec![OsString::from("build"), OsString::from("--release")],
);
}
#[test]
fn build_spawn_plan_command_with_single_arg_has_empty_args() {
let p = nested_project("proj", "/proj");
let t = task_with(command(&["true"]), EnvSettings::default());
let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
assert_eq!(plan.program, OsString::from("true"));
assert!(plan.args.is_empty());
}
#[test]
fn build_spawn_plan_shell_uses_dash_c_and_script() {
let p = nested_project("proj", "/proj");
let t = task_with(shell("echo hi", ShellType::Sh), EnvSettings::default());
let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
assert_eq!(plan.program, OsString::from("sh"));
assert_eq!(
plan.args,
vec![OsString::from("-c"), OsString::from("echo hi")],
);
}
#[test]
fn build_spawn_plan_shell_other_uses_named_binary() {
let p = nested_project("proj", "/proj");
let other = haz_domain::action::NonEmptyAsciiName::from_str("zsh").unwrap();
let t = task_with(
shell("echo hi", ShellType::Other(other)),
EnvSettings::default(),
);
let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
assert_eq!(plan.program, OsString::from("zsh"));
assert_eq!(
plan.args,
vec![OsString::from("-c"), OsString::from("echo hi")],
);
}
#[test]
fn build_spawn_plan_sets_cwd_to_project_host_path_for_nested() {
let p = nested_project("frontend", "/web/frontend");
let t = task_with(command(&["true"]), EnvSettings::default());
let plan = build_spawn_plan(Path::new("/abs/ws"), &p, &t, &BTreeMap::new());
assert_eq!(plan.cwd, PathBuf::from("/abs/ws/web/frontend"));
}
#[test]
fn build_spawn_plan_sets_cwd_to_workspace_host_for_implicit_project() {
let p = implicit_project("root");
let t = task_with(command(&["true"]), EnvSettings::default());
let plan = build_spawn_plan(Path::new("/abs/ws"), &p, &t, &BTreeMap::new());
assert_eq!(plan.cwd, PathBuf::from("/abs/ws"));
}
#[test]
fn build_spawn_plan_env_reflects_from_host_overrides_and_excludes_unrelated() {
let settings = env_settings(
&["PATH", "MISSING"],
&[("HAZ_MODE", "ci"), ("PATH", "/override/bin")],
);
let p = nested_project("proj", "/proj");
let t = task_with(command(&["true"]), settings);
let host = host_snapshot(&[("PATH", "/usr/bin"), ("UNRELATED", "should-not-appear")]);
let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &host);
assert_eq!(
plan.env,
osstr_pairs(&[("HAZ_MODE", "ci"), ("PATH", "/override/bin"),]),
"override wins over from_host; UNRELATED and MISSING (absent) are excluded",
);
}
mod output_resolution {
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use haz_domain::path::{
CanonicalPath, HazPath, OutputSpec, ProjectRoot, WorkspaceRootPath,
};
use haz_domain::project::Project;
use haz_domain::settings::WorkspaceSettings;
use haz_domain::workspace::Workspace;
use haz_vfs::{MemFilesystem, WritableFilesystem};
use super::super::{OwnedOutputFile, RunTaskError, resolve_output_files};
const WORKSPACE_HOST: &str = "/ws";
const PROJECT_HOST: &str = "/ws/proj";
fn nested_project() -> Project {
Project {
name: haz_domain::name::ProjectName::try_new("proj").unwrap(),
root: ProjectRoot::Nested(
CanonicalPath::from_absolute(&HazPath::parse("/proj").unwrap()).unwrap(),
),
tags: BTreeSet::new(),
tasks: BTreeMap::new(),
}
}
fn implicit_project() -> Project {
Project {
name: haz_domain::name::ProjectName::try_new("root").unwrap(),
root: ProjectRoot::WorkspaceRoot,
tags: BTreeSet::new(),
tasks: BTreeMap::new(),
}
}
fn workspace_with(project: &Project) -> Workspace {
let mut projects = BTreeMap::new();
projects.insert(project.name.clone(), project.clone());
Workspace {
root: WorkspaceRootPath::try_new(PathBuf::from(WORKSPACE_HOST)).unwrap(),
projects,
overlays: BTreeMap::new(),
settings: WorkspaceSettings::default(),
}
}
fn paths_of(files: &[OwnedOutputFile]) -> BTreeSet<String> {
files
.iter()
.map(|f| f.workspace_absolute_path.clone())
.collect()
}
fn host_paths_of(files: &[OwnedOutputFile]) -> BTreeSet<PathBuf> {
files.iter().map(|f| f.on_disk_path.clone()).collect()
}
fn modes_by_workspace_path(files: &[OwnedOutputFile]) -> BTreeMap<String, u32> {
files
.iter()
.map(|f| (f.workspace_absolute_path.clone(), f.mode))
.collect()
}
#[test]
fn literal_hit_returns_one_output_with_recorded_mode() {
let mut fs = MemFilesystem::new();
fs.add_dir(PROJECT_HOST).unwrap();
fs.add_file_with_mode(format!("{PROJECT_HOST}/bundle.js"), b"data".to_vec(), 0o640)
.unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("bundle.js").unwrap()];
let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].workspace_absolute_path, "/proj/bundle.js");
assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/proj/bundle.js"));
assert_eq!(result[0].mode & 0o7777, 0o640);
}
#[test]
fn literal_workspace_absolute_resolves_under_workspace_root() {
let mut fs = MemFilesystem::new();
fs.add_dir("/ws/dist").unwrap();
fs.add_file_with_mode("/ws/dist/main.js", b"x".to_vec(), 0o755)
.unwrap();
fs.add_dir(PROJECT_HOST).unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("/dist/main.js").unwrap()];
let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].workspace_absolute_path, "/dist/main.js");
assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/dist/main.js"));
assert_eq!(result[0].mode & 0o7777, 0o755);
}
#[test]
fn literal_missing_surfaces_output_declared_but_not_produced() {
let mut fs = MemFilesystem::new();
fs.add_dir(PROJECT_HOST).unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("absent.txt").unwrap()];
match resolve_output_files(&fs, &workspace, &project, &outputs) {
Err(RunTaskError::OutputDeclaredButNotProduced { path }) => {
assert_eq!(path, PathBuf::from("/ws/proj/absent.txt"));
}
other => panic!("expected OutputDeclaredButNotProduced, got {other:?}"),
}
}
#[test]
fn literal_pointing_at_directory_surfaces_output_not_a_regular_file() {
let mut fs = MemFilesystem::new();
fs.add_dir(format!("{PROJECT_HOST}/subdir")).unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("subdir").unwrap()];
match resolve_output_files(&fs, &workspace, &project, &outputs) {
Err(RunTaskError::OutputNotARegularFile { path }) => {
assert_eq!(path, PathBuf::from("/ws/proj/subdir"));
}
other => panic!("expected OutputNotARegularFile, got {other:?}"),
}
}
#[test]
fn glob_multi_match_collects_every_matching_file_with_its_mode() {
let mut fs = MemFilesystem::new();
fs.add_dir(PROJECT_HOST).unwrap();
fs.add_file_with_mode(format!("{PROJECT_HOST}/a.js"), b"a".to_vec(), 0o644)
.unwrap();
fs.add_file_with_mode(format!("{PROJECT_HOST}/b.js"), b"b".to_vec(), 0o600)
.unwrap();
fs.add_file_with_mode(
format!("{PROJECT_HOST}/keep.txt"),
b"ignored".to_vec(),
0o644,
)
.unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("*.js").unwrap()];
let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
paths_of(&result),
BTreeSet::from(["/proj/a.js".to_owned(), "/proj/b.js".to_owned()]),
);
let modes = modes_by_workspace_path(&result);
assert_eq!(
modes.get("/proj/a.js").copied().map(|m| m & 0o7777),
Some(0o644)
);
assert_eq!(
modes.get("/proj/b.js").copied().map(|m| m & 0o7777),
Some(0o600)
);
}
#[test]
fn glob_no_match_returns_empty_contribution() {
let mut fs = MemFilesystem::new();
fs.add_dir(PROJECT_HOST).unwrap();
fs.add_file(format!("{PROJECT_HOST}/only.txt"), b"x".to_vec())
.unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("*.js").unwrap()];
let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
assert!(result.is_empty());
}
#[test]
fn glob_nested_double_star_recurses_into_subdirectories() {
let mut fs = MemFilesystem::new();
fs.add_dir(format!("{PROJECT_HOST}/dist")).unwrap();
fs.add_dir(format!("{PROJECT_HOST}/dist/inner")).unwrap();
fs.add_file(format!("{PROJECT_HOST}/dist/top.js"), b"top".to_vec())
.unwrap();
fs.add_file(
format!("{PROJECT_HOST}/dist/inner/deep.js"),
b"deep".to_vec(),
)
.unwrap();
fs.add_file(format!("{PROJECT_HOST}/other.js"), b"other".to_vec())
.unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("dist/**/*.js").unwrap()];
let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
paths_of(&result),
BTreeSet::from([
"/proj/dist/top.js".to_owned(),
"/proj/dist/inner/deep.js".to_owned(),
]),
);
assert_eq!(
host_paths_of(&result),
BTreeSet::from([
PathBuf::from("/ws/proj/dist/top.js"),
PathBuf::from("/ws/proj/dist/inner/deep.js"),
]),
);
}
#[test]
fn glob_symlink_to_file_records_link_path_with_target_mode() {
let mut fs = MemFilesystem::new();
fs.add_dir(PROJECT_HOST).unwrap();
fs.add_file_with_mode(
format!("{PROJECT_HOST}/real.txt"),
b"real bytes".to_vec(),
0o600,
)
.unwrap();
fs.add_symlink(
format!("{PROJECT_HOST}/link.txt"),
format!("{PROJECT_HOST}/real.txt"),
)
.unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("*.txt").unwrap()];
let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
assert_eq!(
result.len(),
2,
"both the real file and the symlink to it are distinct contributions",
);
assert_eq!(
paths_of(&result),
BTreeSet::from(["/proj/real.txt".to_owned(), "/proj/link.txt".to_owned(),]),
);
for file in &result {
assert_eq!(
file.mode & 0o7777,
0o600,
"{} should report target's mode",
file.workspace_absolute_path,
);
}
}
#[test]
fn implicit_mode_project_relative_literal_is_workspace_absolute() {
let mut fs = MemFilesystem::new();
fs.add_dir(WORKSPACE_HOST).unwrap();
fs.add_file_with_mode(
format!("{WORKSPACE_HOST}/manifest.json"),
b"{}".to_vec(),
0o644,
)
.unwrap();
let project = implicit_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("manifest.json").unwrap()];
let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].workspace_absolute_path, "/manifest.json");
assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/manifest.json"));
assert_eq!(result[0].mode & 0o7777, 0o644);
}
#[test]
fn glob_walk_error_surfaces_output_pattern_resolution_failed() {
let fs = MemFilesystem::new();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("*.js").unwrap()];
match resolve_output_files(&fs, &workspace, &project, &outputs) {
Err(RunTaskError::OutputPatternResolutionFailed { root, .. }) => {
assert_eq!(root, PathBuf::from("/ws/proj"));
}
other => panic!("expected OutputPatternResolutionFailed, got {other:?}"),
}
}
#[test]
fn permissions_round_trip_through_set_permissions() {
let mut fs = MemFilesystem::new();
fs.add_dir(PROJECT_HOST).unwrap();
fs.add_file(format!("{PROJECT_HOST}/bin"), b"!".to_vec())
.unwrap();
fs.set_permissions(std::path::Path::new("/ws/proj/bin"), 0o751)
.unwrap();
let project = nested_project();
let workspace = workspace_with(&project);
let outputs = vec![OutputSpec::parse("bin").unwrap()];
let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].mode & 0o7777, 0o751);
}
}
mod end_to_end {
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use std::sync::Mutex;
use nonempty::NonEmpty;
use haz_cache::Cache;
use haz_dag::graph::TaskGraph;
use haz_domain::action::TaskAction;
use haz_domain::env::{EnvSettings, EnvVarName};
use haz_domain::name::{ProjectName, TaskName};
use haz_domain::path::{
CanonicalPath, HazPath, OutputSpec, ProjectRoot, WorkspaceRootPath,
};
use haz_domain::project::Project;
use haz_domain::settings::WorkspaceSettings;
use haz_domain::settings::cache::HashAlgo;
use haz_domain::task::Task;
use haz_domain::task_id::TaskId;
use haz_domain::workspace::Workspace;
use haz_vfs::{Filesystem, FsError, MemFilesystem, WritableFilesystem};
use crate::cache_key::BuildKeyError;
use crate::mock_impl::{MockProcessSpawner, MockSpec};
use super::super::{
CancelledRecord, CompletedRecord, RunContext, RunObserver, RunSource, RunState,
RunTaskError, SkipRecord, run_task,
};
use tokio_util::sync::CancellationToken;
const WORKSPACE_HOST: &str = "/ws";
const PROJECT_HOST: &str = "/ws/proj";
fn env_var(s: &str) -> EnvVarName {
EnvVarName::try_new(s).unwrap()
}
fn task_id_for(project: &str, task: &str) -> TaskId {
TaskId {
project: ProjectName::try_new(project).unwrap(),
task: TaskName::try_new(task).unwrap(),
}
}
fn command(argv: &[&str]) -> TaskAction {
TaskAction::Command(
NonEmpty::from_vec(argv.iter().map(|s| (*s).to_owned()).collect())
.expect("non-empty argv"),
)
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum RecordedEvent {
Started(TaskId),
Stdout(TaskId, Vec<u8>),
Stderr(TaskId, Vec<u8>),
Finished(TaskId, CompletedRecord),
}
#[derive(Default)]
struct Recorder {
events: Mutex<Vec<RecordedEvent>>,
}
impl Recorder {
fn events(&self) -> Vec<RecordedEvent> {
self.events.lock().unwrap().clone()
}
}
impl RunObserver for Recorder {
fn on_task_started(&self, task: &TaskId) {
self.events
.lock()
.unwrap()
.push(RecordedEvent::Started(task.clone()));
}
fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
self.events
.lock()
.unwrap()
.push(RecordedEvent::Stdout(task.clone(), bytes.to_vec()));
}
fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
self.events
.lock()
.unwrap()
.push(RecordedEvent::Stderr(task.clone(), bytes.to_vec()));
}
fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
self.events
.lock()
.unwrap()
.push(RecordedEvent::Finished(task.clone(), record.clone()));
}
fn on_task_skipped(&self, _task: &TaskId, _record: &SkipRecord) {
}
fn on_task_cancelled(&self, _task: &TaskId, _record: &CancelledRecord) {
}
}
struct Fixture {
cache: Cache<MemFilesystem>,
workspace: Workspace,
graph: TaskGraph,
task_id: TaskId,
host_env: BTreeMap<EnvVarName, String>,
cancel: CancellationToken,
}
impl Fixture {
fn new(action: TaskAction, env: EnvSettings, outputs: Vec<OutputSpec>) -> Self {
let mut fs = MemFilesystem::new();
fs.add_dir(WORKSPACE_HOST).unwrap();
fs.add_dir(PROJECT_HOST).unwrap();
let task = Task {
name: TaskName::try_new("build").unwrap(),
action,
inputs: Vec::new(),
outputs,
deps: Vec::new(),
weak_deps: Vec::new(),
mutex: None,
env,
};
let project = Project {
name: ProjectName::try_new("proj").unwrap(),
root: ProjectRoot::Nested(
CanonicalPath::from_absolute(&HazPath::parse("/proj").unwrap()).unwrap(),
),
tags: BTreeSet::new(),
tasks: BTreeMap::from([(task.name.clone(), task)]),
};
let task_id = task_id_for("proj", "build");
let workspace = Workspace {
root: WorkspaceRootPath::try_new(PathBuf::from(WORKSPACE_HOST)).unwrap(),
projects: BTreeMap::from([(project.name.clone(), project)]),
overlays: BTreeMap::new(),
settings: WorkspaceSettings::default(),
};
let graph = TaskGraph {
nodes: BTreeSet::from([task_id.clone()]),
edges: BTreeSet::new(),
};
let cache = Cache::new(fs, std::path::Path::new(WORKSPACE_HOST), HashAlgo::Blake3);
Self {
cache,
workspace,
graph,
task_id,
host_env: BTreeMap::new(),
cancel: CancellationToken::new(),
}
}
}
fn make_ctx<'a>(
fixture: &'a Fixture,
spawner: &'a MockProcessSpawner,
observer: &'a Recorder,
) -> RunContext<'a, MemFilesystem, MockProcessSpawner, Recorder> {
RunContext {
fs: fixture.cache.fs(),
cache: &fixture.cache,
spawner,
observer,
workspace: &fixture.workspace,
graph: &fixture.graph,
host_env: &fixture.host_env,
algo: HashAlgo::Blake3,
cancel: &fixture.cancel,
}
}
#[tokio::test]
async fn cache_miss_success_records_observer_events_and_persists_manifest() {
let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
stdout: b"out\n".to_vec(),
stderr: b"err\n".to_vec(),
exit_code: 0,
..MockSpec::default()
});
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let outcome = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1_700_000_000)
.await
.expect("baseline run should succeed");
assert_eq!(outcome.source, RunSource::FreshRun);
assert_eq!(outcome.state, RunState::Succeeded);
assert!(outcome.exit_status.is_some_and(|s| s.success()));
assert_eq!(outcome.task, fixture.task_id);
let events = observer.events();
assert_eq!(events.len(), 4, "expected 4 events, got {events:?}");
assert_eq!(events[0], RecordedEvent::Started(fixture.task_id.clone()));
assert_eq!(
events[1],
RecordedEvent::Stdout(fixture.task_id.clone(), b"out\n".to_vec()),
);
assert_eq!(
events[2],
RecordedEvent::Stderr(fixture.task_id.clone(), b"err\n".to_vec()),
);
match &events[3] {
RecordedEvent::Finished(id, finished_outcome) => {
assert_eq!(id, &fixture.task_id);
assert_eq!(finished_outcome, &outcome);
}
other => panic!("expected Finished event, got {other:?}"),
}
}
#[tokio::test]
async fn cache_miss_then_second_call_hits_cache() {
let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
stdout: b"out\n".to_vec(),
stderr: b"err\n".to_vec(),
exit_code: 0,
..MockSpec::default()
});
let observer1 = Recorder::default();
let ctx1 = make_ctx(&fixture, &spawner, &observer1);
let first = run_task(&ctx1, &fixture.task_id, &BTreeMap::new(), 1)
.await
.unwrap();
assert_eq!(first.source, RunSource::FreshRun);
let observer2 = Recorder::default();
let ctx2 = make_ctx(&fixture, &spawner, &observer2);
let second = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
.await
.unwrap();
assert_eq!(second.source, RunSource::CacheHit);
assert_eq!(second.state, RunState::Succeeded);
assert!(second.exit_status.is_none());
assert_eq!(second.stdout_hash, first.stdout_hash);
assert_eq!(second.stderr_hash, first.stderr_hash);
let events = observer2.events();
assert!(
events.iter().any(|e| matches!(
e,
RecordedEvent::Stdout(_, bytes) if bytes == b"out\n"
)),
"missing recorded stdout in hit-path events: {events:?}",
);
assert!(
events.iter().any(|e| matches!(
e,
RecordedEvent::Stderr(_, bytes) if bytes == b"err\n"
)),
"missing recorded stderr in hit-path events: {events:?}",
);
assert_eq!(
spawner.spawns().len(),
1,
"second call must not respawn, got {} total spawns",
spawner.spawns().len(),
);
}
#[tokio::test]
async fn cache_018_failed_run_does_not_store_a_cache_entry() {
let fixture = Fixture::new(command(&["false"]), EnvSettings::default(), Vec::new());
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
stdout: Vec::new(),
stderr: b"error\n".to_vec(),
exit_code: 1,
..MockSpec::default()
});
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let outcome = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
.await
.unwrap();
assert_eq!(outcome.source, RunSource::FreshRun);
assert_eq!(outcome.state, RunState::Failed);
assert!(outcome.exit_status.is_some_and(|s| !s.success()));
assert!(
observer.events().iter().any(|e| matches!(
e,
RecordedEvent::Stderr(_, bytes) if bytes == b"error\n"
)),
"failure-side stderr must reach the observer",
);
let cache_root_meta = fixture.cache.fs().metadata(fixture.cache.cache_root());
assert!(
matches!(cache_root_meta, Err(FsError::NotFound { .. })),
"cache root must remain absent after a failed run; got {cache_root_meta:?}",
);
spawner.push_spec(MockSpec {
stdout: Vec::new(),
stderr: b"again\n".to_vec(),
exit_code: 1,
..MockSpec::default()
});
let observer2 = Recorder::default();
let ctx2 = make_ctx(&fixture, &spawner, &observer2);
let outcome2 = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
.await
.unwrap();
assert_eq!(outcome2.source, RunSource::FreshRun);
assert_eq!(spawner.spawns().len(), 2);
}
#[tokio::test]
async fn cache_miss_success_stores_outputs_and_restore_overwrites_target() {
let fixture = Fixture::new(
command(&["true"]),
EnvSettings::default(),
vec![OutputSpec::parse("artifact.bin").unwrap()],
);
fixture
.cache
.fs()
.write_file(
std::path::Path::new("/ws/proj/artifact.bin"),
b"artifact-bytes",
)
.unwrap();
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec::default());
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let first = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
.await
.unwrap();
assert_eq!(first.source, RunSource::FreshRun);
assert_eq!(first.state, RunState::Succeeded);
fixture
.cache
.fs()
.write_file(std::path::Path::new("/ws/proj/artifact.bin"), b"garbage")
.unwrap();
let observer2 = Recorder::default();
let ctx2 = make_ctx(&fixture, &spawner, &observer2);
let second = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
.await
.unwrap();
assert_eq!(second.source, RunSource::CacheHit);
let restored_bytes = fixture
.cache
.fs()
.read(std::path::Path::new("/ws/proj/artifact.bin"))
.unwrap();
assert_eq!(restored_bytes, b"artifact-bytes");
}
#[tokio::test]
async fn cache_023_hit_is_observationally_equivalent_to_fresh_run() {
const OUTPUT_BYTES: &[u8] = b"\x00built\x01artifact\xFF";
const STDOUT_BYTES: &[u8] = b"hello from the task\n";
const STDERR_BYTES: &[u8] = b"warning: be careful\n";
let fixture = Fixture::new(
command(&["true"]),
EnvSettings::default(),
vec![OutputSpec::parse("artifact.bin").unwrap()],
);
let artifact_path = std::path::Path::new("/ws/proj/artifact.bin");
fixture
.cache
.fs()
.write_file(artifact_path, OUTPUT_BYTES)
.unwrap();
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec {
stdout: STDOUT_BYTES.to_vec(),
stderr: STDERR_BYTES.to_vec(),
exit_code: 0,
..MockSpec::default()
});
let observer_fresh = Recorder::default();
let ctx_fresh = make_ctx(&fixture, &spawner, &observer_fresh);
let fresh = run_task(&ctx_fresh, &fixture.task_id, &BTreeMap::new(), 1)
.await
.unwrap();
assert_eq!(fresh.source, RunSource::FreshRun);
assert_eq!(fresh.state, RunState::Succeeded);
let fresh_output = fixture.cache.fs().read(artifact_path).unwrap();
let fresh_stdout = concat_stdout(&observer_fresh.events());
let fresh_stderr = concat_stderr(&observer_fresh.events());
fixture
.cache
.fs()
.write_file(artifact_path, b"garbage-should-be-overwritten")
.unwrap();
let observer_hit = Recorder::default();
let ctx_hit = make_ctx(&fixture, &spawner, &observer_hit);
let hit = run_task(&ctx_hit, &fixture.task_id, &BTreeMap::new(), 2)
.await
.unwrap();
assert_eq!(hit.source, RunSource::CacheHit);
assert_eq!(hit.state, RunState::Succeeded);
let hit_output = fixture.cache.fs().read(artifact_path).unwrap();
let hit_stdout = concat_stdout(&observer_hit.events());
let hit_stderr = concat_stderr(&observer_hit.events());
assert_eq!(
fresh_output, hit_output,
"CACHE-023: declared output bytes differ between fresh and hit",
);
assert_eq!(
fresh_stdout, hit_stdout,
"CACHE-023: captured stdout differs between fresh and hit",
);
assert_eq!(
fresh_stderr, hit_stderr,
"CACHE-023: captured stderr differs between fresh and hit",
);
assert_eq!(hit_output, OUTPUT_BYTES);
assert_eq!(hit_stdout, STDOUT_BYTES);
assert_eq!(hit_stderr, STDERR_BYTES);
assert_eq!(spawner.spawns().len(), 1);
}
fn concat_stdout(events: &[RecordedEvent]) -> Vec<u8> {
let mut acc = Vec::new();
for event in events {
if let RecordedEvent::Stdout(_, bytes) = event {
acc.extend_from_slice(bytes);
}
}
acc
}
fn concat_stderr(events: &[RecordedEvent]) -> Vec<u8> {
let mut acc = Vec::new();
for event in events {
if let RecordedEvent::Stderr(_, bytes) = event {
acc.extend_from_slice(bytes);
}
}
acc
}
#[tokio::test]
async fn task_not_in_workspace_surfaces_build_key_failed() {
let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
let spawner = MockProcessSpawner::new();
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let absent = task_id_for("absent_project", "build");
match run_task(&ctx, &absent, &BTreeMap::new(), 1).await {
Err(RunTaskError::BuildKeyFailed {
source: BuildKeyError::TaskNotInWorkspace { task },
}) => assert_eq!(task, absent),
other => panic!("expected BuildKeyFailed(TaskNotInWorkspace), got {other:?}"),
}
let events = observer.events();
assert_eq!(
events.len(),
1,
"expected one Started event, got {events:?}"
);
assert!(
matches!(events[0], RecordedEvent::Started(ref id) if id == &absent),
"expected Started(absent), got {events:?}",
);
assert!(spawner.spawns().is_empty());
}
#[tokio::test]
async fn spawn_plan_env_reflects_cache_008_runtime_view() {
let env = EnvSettings {
from_host: BTreeSet::from([env_var("PATH"), env_var("MISSING")]),
overrides: BTreeMap::from([
(env_var("HAZ_MODE"), "ci".to_owned()),
(env_var("PATH"), "/override/bin".to_owned()),
]),
};
let mut fixture = Fixture::new(command(&["true"]), env, Vec::new());
fixture
.host_env
.insert(env_var("PATH"), "/usr/bin".to_owned());
fixture
.host_env
.insert(env_var("UNRELATED"), "should-not-appear".to_owned());
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec::default());
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let _ = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
.await
.unwrap();
let records = spawner.spawns();
assert_eq!(records.len(), 1);
let env_vec = &records[0].plan.env;
let env_names: BTreeSet<String> = env_vec
.iter()
.map(|(k, _)| k.to_str().unwrap().to_owned())
.collect();
assert_eq!(
env_names,
BTreeSet::from(["HAZ_MODE".to_owned(), "PATH".to_owned()]),
);
let path_value = env_vec
.iter()
.find(|(k, _)| k.to_str() == Some("PATH"))
.map(|(_, v)| v.to_str().unwrap().to_owned());
assert_eq!(path_value.as_deref(), Some("/override/bin"));
}
}
}