haz_exec/run_task.rs
1//! Single-task lifecycle: cache lookup, hit-path restoration, or
2//! miss-path spawn-and-capture with success-only store.
3//!
4//! [`run_task`] is the async entry point. It does no scheduling, no
5//! cap accounting, no mutex acquisition, no cancellation, and emits
6//! no presentation-layer bytes itself: captured streams flow to the
7//! caller through the [`RunObserver`] trait. Concrete observer
8//! implementations distinguish the two `EXEC-016` modes: `live`
9//! (per-line `[project:task] `-tag-prefixed writes to a shared
10//! parent sink) and `buffered` (one contiguous block per stream on
11//! task completion).
12//!
13//! The function composes the cache-key derivation
14//! ([`crate::cache_key::build_cache_key`]) with the
15//! [`crate::process::ProcessSpawner`] trait
16//! into the executor's smallest
17//! end-to-end unit. A scheduler layer combines many [`run_task`]
18//! calls with the workspace's concurrency caps and mutex hold-set
19//! (`EXEC-004..007`).
20//!
21//! # Spec coverage
22//!
23//! - `EXEC-007` step 1 -- key derivation and lookup precede every
24//! other concern. Mutex acquisition is not implemented by this
25//! function; that responsibility belongs to a scheduler layer.
26//! - `EXEC-007` step 2 -- cache-hit restoration runs mutex-free.
27//! - `EXEC-009` -- terminal classification: zero exit is
28//! [`RunState::Succeeded`]; non-zero or signalled is
29//! [`RunState::Failed`]. A cache hit is [`RunState::Succeeded`].
30//! The cancelled state of `EXEC-009`'s third option is not
31//! reachable from this function: cancellation is not implemented
32//! here.
33//! - `EXEC-016` -- the full `stdout` and `stderr` byte streams of
34//! every fresh run are captured (the cache stores them per
35//! `CACHE-012`) and surfaced through [`RunObserver`].
36//! - `EXEC-017` -- on a cache hit, the recorded `stdout` and
37//! `stderr` flow through [`RunObserver`] with the same
38//! mode-agnostic signal as a fresh run.
39//! - `CACHE-008` (runtime) -- the spawned process sees only the
40//! effective env: allow-listed host values plus task-level
41//! overrides, override wins on collision. The spawn-plan
42//! builder assembles that env vector; the std backend's
43//! `env_clear()` ensures the child sees this set and nothing
44//! else.
45//! - `CACHE-014..016` -- [`haz_cache::Cache::lookup`] folds every
46//! failure into a clean miss, so no [`RunTaskError`] variant
47//! covers it.
48//! - `CACHE-017..018` -- the cache store fires only when the fresh
49//! run's exit status was zero. The store call itself surfaces
50//! as [`RunTaskError::StoreFailed`].
51//! - `CACHE-019` -- restoration uses [`haz_cache::Cache::restore`],
52//! surfacing [`haz_cache::RestoredStreams`] through the observer.
53//!
54//! # Out of scope for this function
55//!
56//! - Concurrency caps (`EXEC-004..006`) and the canonical-order
57//! tie-breaking of `EXEC-003`: scheduler-level concerns.
58//! - Mutex acquisition (`EXEC-007` step 3) and `EXEC-006`
59//! condition 3: scheduler-level concerns.
60//! - Failure cascade and the `skipped` state (`EXEC-009..011`):
61//! scheduler-level concerns.
62//! - Cancellation (`EXEC-012..015`): not implemented in this
63//! function.
64//! - Live-mode per-line tag prefixing and atomic sink writes
65//! (`EXEC-016` presentation): implemented inside concrete
66//! [`RunObserver`] types.
67//! - Runtime cycle (`EXEC-019`) and output-overlap (`EXEC-020`)
68//! detection: scheduler-level concerns.
69
70use std::collections::BTreeMap;
71use std::ffi::OsString;
72use std::io;
73use std::path::{Path, PathBuf};
74
75use snafu::{ResultExt, Snafu};
76use tokio::io::AsyncReadExt;
77use tokio_util::sync::CancellationToken;
78
79use haz_cache::{Cache, Hasher, RestoreError, StoreError, StoreInputs, StoredOutput};
80use haz_dag::graph::TaskGraph;
81use haz_domain::action::{ShellType, TaskAction};
82use haz_domain::env::{EnvSettings, EnvVarName};
83use haz_domain::path::{CanonicalPath, OutputSpec, ParseAbsoluteError, PathPattern, ProjectRoot};
84use haz_domain::project::Project;
85use haz_domain::settings::cache::HashAlgo;
86use haz_domain::task::Task;
87use haz_domain::task_id::TaskId;
88use haz_domain::workspace::Workspace;
89use haz_vfs::{EntryKind, Filesystem, FsError, WritableFilesystem};
90
91use crate::cache_key::{BuildKeyError, PredecessorStreamHashes, build_cache_key};
92use crate::pattern_walk::{
93 GlobMatchAction, GlobWalk, glob_walk_origin, host_path_from_segments,
94 literal_workspace_segments, workspace_absolute_string_from_segments,
95};
96use crate::process::{
97 ExitStatus, Process, ProcessError, ProcessSpawner, Signal, SpawnPlan, Spawned,
98};
99
100/// Where a [`CompletedRecord`]'s success came from.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub enum RunSource {
103 /// The cache held a valid entry for the derived key
104 /// (`CACHE-015`); restoration materialised the outputs and the
105 /// recorded streams were surfaced through the observer.
106 CacheHit,
107 /// The cache missed; the executor spawned the task's command,
108 /// captured its stdout and stderr, and (on success) stored the
109 /// result for future hits.
110 FreshRun,
111}
112
113/// Terminal classification of a single-task run per `EXEC-009`.
114///
115/// The variants exhaustively cover the spec's three terminal
116/// classifications: succeeded, failed, and cancelled-by-executor.
117/// The cancelled variant is produced only by the spawn-step
118/// future when the run-context's cancellation token fires while
119/// the child is in flight; until that wiring lands no
120/// [`run_task`] call returns [`RunState::Cancelled`].
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
122pub enum RunState {
123 /// A fresh run with exit status zero, or a cache hit (a hit
124 /// asserts a prior successful run per `CACHE-018`).
125 Succeeded,
126 /// A fresh run whose command terminated with a non-zero exit
127 /// status or whose process was killed by a signal not initiated
128 /// by the executor.
129 Failed,
130 /// A fresh run whose child was signalled by the executor in
131 /// response to the run's cancellation token firing
132 /// (`EXEC-009` cancelled state, `EXEC-013` step 2). Distinct
133 /// from [`Self::Failed`] so the run summary can tell
134 /// executor-initiated termination apart from a task that
135 /// failed under its own power.
136 Cancelled,
137}
138
139/// Run-record fields produced by one [`run_task`] invocation: the
140/// observation of a task that the scheduler admitted into the
141/// lookup-then-spawn pipeline and that reached a run classification
142/// (`EXEC-009`).
143///
144/// Carries enough information for a scheduler layer to feed the
145/// per-`TaskId` `predecessor_streams` map that
146/// [`crate::cache_key::build_cache_key`] consumes: `stdout_hash` and
147/// `stderr_hash` are always present, whether the run was a cache
148/// hit (sourced from the manifest's recorded hashes per
149/// `CACHE-011`) or a fresh run (computed from the captured byte
150/// buffers under the cache's active [`HashAlgo`]).
151///
152/// A scheduler-level cascade decision produces
153/// [`RunOutcome::Skipped`] instead (`EXEC-010` / `EXEC-011`);
154/// [`run_task`] itself never produces a Skipped because skipped
155/// tasks never enter the pipeline.
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct CompletedRecord {
158 /// The task this record describes.
159 pub task: TaskId,
160 /// Whether the success came from a cache hit or a fresh run.
161 pub source: RunSource,
162 /// Terminal classification per `EXEC-009`.
163 pub state: RunState,
164 /// Process exit status for a fresh run, [`None`] for a cache
165 /// hit (no process ran).
166 pub exit_status: Option<ExitStatus>,
167 /// Hash of the run's captured stdout under the cache's
168 /// [`HashAlgo`]. For [`RunSource::CacheHit`] this is read from
169 /// the manifest; for [`RunSource::FreshRun`] it is computed
170 /// from the captured byte buffer.
171 pub stdout_hash: [u8; 32],
172 /// Hash of the run's captured stderr under the cache's
173 /// [`HashAlgo`].
174 pub stderr_hash: [u8; 32],
175 /// Workspace-absolute paths the task materialised on disk.
176 ///
177 /// Populated only for [`RunState::Succeeded`]; empty for
178 /// [`RunState::Failed`] and [`RunState::Cancelled`]. The
179 /// source is the executor's output-resolution pass for a
180 /// fresh run and [`haz_cache::Manifest::outputs`] for a
181 /// cache hit.
182 ///
183 /// Consumed by the scheduler's runtime DAG validation pass
184 /// (`EXEC-019` runtime cycle detection and `EXEC-020` runtime
185 /// output-overlap detection) to derive runtime producer-
186 /// matching edges and to populate the output-claim tracker.
187 pub materialised_outputs: Vec<CanonicalPath>,
188}
189
190/// Record of a task that the scheduler cascade-skipped because an
191/// upstream task failed (`EXEC-010` / `EXEC-011`).
192///
193/// A skipped task is never started by the executor and never
194/// produces a cache entry. The `cause` field names the root
195/// failing task (NOT an intermediate cascade-skipped predecessor),
196/// so the run summary can attribute every skip to a single
197/// actually-failed task.
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub struct SkipRecord {
200 /// The task that was skipped.
201 pub task: TaskId,
202 /// Reason the task was skipped, naming the originating
203 /// failure.
204 pub cause: SkipCause,
205}
206
207/// Reason a task was cascade-skipped per `EXEC-011`.
208///
209/// The `upstream` field on each variant carries the root failing
210/// task identity, not the immediate hard predecessor: when `A`
211/// fails and the cascade marks `B` (direct hard child) and `C`
212/// (hard child of `B`), both record `upstream = A`.
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub enum SkipCause {
215 /// A transitively-upstream task reached
216 /// [`RunState::Failed`] (per `EXEC-009`: non-zero exit,
217 /// non-executor signal, or executor-imposed timeout).
218 UpstreamFailed {
219 /// Identity of the originally-failed upstream task.
220 upstream: TaskId,
221 },
222 /// A transitively-upstream task surfaced a [`RunTaskError`]
223 /// (executor-level failure: cache derive, spawn, wait,
224 /// stream-read, output-resolution, or store). Distinct from
225 /// [`Self::UpstreamFailed`] so the run summary can tell
226 /// "the upstream task ran and failed" from "the executor
227 /// could not even run the upstream task".
228 UpstreamErrored {
229 /// Identity of the upstream task whose [`run_task`] call
230 /// returned an [`Err`].
231 upstream: TaskId,
232 },
233 /// The task was identified as a member of a runtime cycle
234 /// detected by `EXEC-019`. The scheduler stops admitting
235 /// further tasks once the cycle is observed; cycle members
236 /// still in the ready set never start. The cycle's full node
237 /// set and the offending edge appear in the run's
238 /// [`crate::run_graph::RunGraphOutcome::invariant_violations`]
239 /// diagnostic; this per-task cause attributes the skip to
240 /// that workspace-level event without rewriting the cycle
241 /// node's outcome to a misleading "failed" classification.
242 RuntimeCycle,
243}
244
245/// Record of a task that ended in the executor-initiated
246/// cancelled state per `EXEC-009` / `EXEC-013`.
247///
248/// The three variants distinguish the three structural shapes a
249/// cancellation can take in the run-graph view:
250///
251/// - [`Self::SignaledInFlight`] is the only shape a single-task
252/// spawn-step future can produce: the cancellation token fired
253/// while the child was running, and the executor sent SIGTERM
254/// (and possibly SIGKILL after `EXEC-014`'s grace period).
255/// - [`Self::UpstreamCancelled`] is the cascade counterpart to
256/// [`SkipCause::UpstreamFailed`]: a hard descendant of a
257/// cancelled task is itself cancelled. `upstream` carries the
258/// root cancelled task, not the immediate hard predecessor
259/// (same root-cause attribution as [`SkipRecord`]).
260/// - [`Self::RunCancelled`] covers tasks that were still in
261/// `state.ready` (or whose lookup-step was in flight) when the
262/// scheduler observed the cancellation signal and drained the
263/// admission front. These tasks never made it to the spawn
264/// step; no process was started or signalled.
265#[derive(Debug, Clone, PartialEq, Eq)]
266pub enum CancelledRecord {
267 /// The task was admitted, spawned, and signalled by the
268 /// executor in response to the cancellation token firing.
269 /// `exit_status` records the child's exit (typically
270 /// signal-terminated, but a polite child may exit cleanly
271 /// after SIGTERM); the captured stream hashes are present
272 /// for diagnostic purposes even though `EXEC-015` blocks
273 /// the cache store.
274 SignaledInFlight {
275 /// The task that was signalled.
276 task: TaskId,
277 /// The child's exit status after the signal flow ran to
278 /// completion.
279 exit_status: ExitStatus,
280 /// Hash of captured stdout under the cache's
281 /// [`HashAlgo`], for diagnostic / predecessor-streams use.
282 stdout_hash: [u8; 32],
283 /// Hash of captured stderr under the cache's
284 /// [`HashAlgo`], for diagnostic / predecessor-streams use.
285 stderr_hash: [u8; 32],
286 },
287 /// The task was cascade-cancelled because a transitively-
288 /// upstream task entered the cancelled state. Mirrors
289 /// [`SkipCause::UpstreamFailed`] for the cancellation flow.
290 UpstreamCancelled {
291 /// The task that was cascade-cancelled.
292 task: TaskId,
293 /// The root cancelled task that triggered the cascade.
294 upstream: TaskId,
295 },
296 /// The task was in `state.ready` (or its lookup-step was in
297 /// flight) when the scheduler observed the cancellation
298 /// signal. No process was spawned; the task simply never
299 /// entered the spawn-step pipeline.
300 RunCancelled {
301 /// The task whose admission was drained on cancel.
302 task: TaskId,
303 },
304}
305
306impl CancelledRecord {
307 /// The task this record describes, available on every
308 /// variant.
309 #[must_use]
310 pub fn task(&self) -> &TaskId {
311 match self {
312 Self::SignaledInFlight { task, .. }
313 | Self::UpstreamCancelled { task, .. }
314 | Self::RunCancelled { task } => task,
315 }
316 }
317}
318
319/// Terminal state of one task in a [`crate::run_graph`] invocation.
320///
321/// `Completed` wraps a [`CompletedRecord`] for any task that the
322/// scheduler admitted into the lookup-then-spawn pipeline and
323/// that reached a run classification (`EXEC-009`); `Skipped`
324/// wraps a [`SkipRecord`] for any task the cascade marked
325/// do-not-schedule before admission (`EXEC-010` / `EXEC-011`);
326/// `Cancelled` wraps a [`CancelledRecord`] for any task that
327/// the cancellation flow (`EXEC-012..015`) reached, either
328/// directly (signalled in flight, drained from the ready set)
329/// or via cascade.
330///
331/// [`run_task`] itself returns the bare [`CompletedRecord`]
332/// (never wrapped in this enum) because a single-task lifecycle
333/// has no cascade to inspect.
334#[derive(Debug, Clone, PartialEq, Eq)]
335pub enum RunOutcome {
336 /// A task that ran to a [`CompletedRecord`] (succeeded or
337 /// failed per `EXEC-009`).
338 Completed(CompletedRecord),
339 /// A task the scheduler cascade-skipped before admission.
340 Skipped(SkipRecord),
341 /// A task the cancellation flow caught at any point (in
342 /// flight, on the ready set, or via cascade).
343 Cancelled(CancelledRecord),
344}
345
346impl RunOutcome {
347 /// The task this outcome describes, available on every
348 /// variant.
349 #[must_use]
350 pub fn task(&self) -> &TaskId {
351 match self {
352 Self::Completed(record) => &record.task,
353 Self::Skipped(record) => &record.task,
354 Self::Cancelled(record) => record.task(),
355 }
356 }
357}
358
359/// Observation surface for one [`run_task`] invocation.
360///
361/// The four callbacks define a mode-agnostic surface: byte chunks
362/// flow through [`RunObserver::on_stdout`] and
363/// [`RunObserver::on_stderr`] for both fresh runs and cache hits.
364/// Concrete implementations distinguish the `EXEC-016` presentation
365/// modes:
366///
367/// - A `buffered` observer collects the chunks per task and emits
368/// one contiguous block to the parent process on
369/// [`RunObserver::on_task_finished`].
370/// - A `live` observer prefixes each line with `[project:task] `
371/// and writes it to a shared sink with per-line atomicity.
372///
373/// Implementations MAY use interior mutability; the trait methods
374/// take `&self` so that one observer can serve many concurrent
375/// [`run_task`] calls (the scheduler exercises this via concurrent
376/// invocations).
377pub trait RunObserver {
378 /// Called once at the start of the run, before any cache work.
379 fn on_task_started(&self, task: &TaskId);
380
381 /// One chunk of captured stdout bytes. For a cache hit, called
382 /// exactly once with the recorded stdout in full. For a fresh
383 /// run, the call shape depends on the observer's presentation
384 /// strategy: a buffered observer typically receives a single
385 /// call with the whole captured stdout; a live observer may
386 /// receive many sub-line chunks as bytes flow from the pipe.
387 fn on_stdout(&self, task: &TaskId, bytes: &[u8]);
388
389 /// One chunk of captured stderr bytes. Same call shape as
390 /// [`Self::on_stdout`].
391 fn on_stderr(&self, task: &TaskId, bytes: &[u8]);
392
393 /// Called once at the end of the run with the run-record
394 /// classification. Fires only for tasks the scheduler
395 /// admitted into the lookup-then-spawn pipeline; cascade-
396 /// skipped tasks surface through
397 /// [`Self::on_task_skipped`] instead.
398 fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord);
399
400 /// Called once at the moment the scheduler marks `task`
401 /// cascade-skipped per `EXEC-010` / `EXEC-011`. Fires before
402 /// the next admission round; NEVER paired with
403 /// [`Self::on_task_started`] or [`Self::on_task_finished`]
404 /// for the same task.
405 fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord);
406
407 /// Called once at the moment the cancellation flow records a
408 /// terminal state for `task` per `EXEC-012..015`. The
409 /// [`CancelledRecord`] variant distinguishes the three
410 /// shapes: an in-flight task that was signalled by the
411 /// executor; a cascade descendant of a cancelled task; or a
412 /// task drained from `state.ready` on cancel-fire. For an
413 /// in-flight task, [`Self::on_task_started`] has already
414 /// fired and [`Self::on_task_finished`] does NOT fire; for
415 /// the other two shapes, no other lifecycle callback fires
416 /// for the same task.
417 fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord);
418}
419
420/// Failure modes of [`run_task`].
421///
422/// The variants enumerate only the *executor-level* failures: cases
423/// where the lifecycle could not be carried through to a meaningful
424/// [`CompletedRecord`]. Task-level failures (non-zero exit, signalled
425/// exit) surface as [`Ok(CompletedRecord { state: Failed, .. })`]
426/// per `EXEC-009`, not as an [`Err`] here.
427#[derive(Debug, Snafu)]
428#[snafu(visibility(pub(crate)))]
429pub enum RunTaskError {
430 /// Cache-key derivation failed before lookup could be attempted.
431 #[snafu(display("failed to derive cache key: {source}"))]
432 BuildKeyFailed {
433 /// Originating [`BuildKeyError`].
434 source: BuildKeyError,
435 },
436
437 /// [`haz_cache::Cache::restore`] failed during the cache-hit
438 /// path (`CACHE-019`). Cache misses are not errors per
439 /// `CACHE-016`, so this variant covers only the successful-
440 /// lookup-then-restore-failed shape.
441 #[snafu(display("failed to restore cache entry: {source}"))]
442 RestoreFailed {
443 /// Originating [`RestoreError`].
444 source: RestoreError,
445 },
446
447 /// The [`ProcessSpawner`] refused to start the child (executable
448 /// not found, permission denied, fork failure, etc.). Distinct
449 /// from `EXEC-009`'s failed state: that rule covers commands
450 /// that *did* run; this variant is for commands that never did.
451 #[snafu(display("failed to spawn process: {source}"))]
452 SpawnFailed {
453 /// Originating [`ProcessError`].
454 source: ProcessError,
455 },
456
457 /// [`crate::process::Process::wait`] surfaced an I/O failure
458 /// while reaping the child. Rare on healthy hosts; usually indicates
459 /// that the child was reaped out from under us by a parent
460 /// signal handler or that the OS lost the descriptor.
461 #[snafu(display("failed to wait for spawned process: {source}"))]
462 WaitFailed {
463 /// Originating [`ProcessError`].
464 source: ProcessError,
465 },
466
467 /// Reading the child's stdout or stderr pipe to EOF failed
468 /// during stream capture. The reader task that returned this
469 /// error is the one whose stream did NOT reach EOF; the other
470 /// stream's capture may or may not have completed.
471 #[snafu(display("failed to read captured stream: {source}"))]
472 CapturedStreamReadFailed {
473 /// Which of the two streams the failure came from, for
474 /// diagnostic precision (the underlying [`io::Error`] often
475 /// looks the same on both pipes).
476 stream: CapturedStream,
477 /// Underlying I/O error.
478 source: io::Error,
479 },
480
481 /// Walking the filesystem to resolve a task's `outputs`
482 /// patterns failed (output-side parallel of
483 /// [`BuildKeyError::InputPatternResolutionFailed`]). Distinct
484 /// from [`Self::OutputNotARegularFile`] and
485 /// [`Self::OutputDeclaredButNotProduced`] so the run summary
486 /// can tell "walk failed" from "the task did not honour its
487 /// declared outputs".
488 #[snafu(display(
489 "failed to resolve output patterns under: {}: {source}",
490 root.display()
491 ))]
492 OutputPatternResolutionFailed {
493 /// The absolute path being walked when the failure occurred.
494 root: PathBuf,
495 /// Underlying filesystem error.
496 source: FsError,
497 },
498
499 /// Reading the Unix permission bits of a matched output file
500 /// failed. The bytes themselves are read inside
501 /// [`haz_cache::Cache::store`]; this variant covers only the
502 /// mode lookup the executor performs to build
503 /// [`haz_cache::StoredOutput`].
504 #[snafu(display("failed to read mode of output file: {}: {source}", path.display()))]
505 OutputModeReadFailed {
506 /// The output file whose mode could not be read.
507 path: PathBuf,
508 /// Underlying filesystem error.
509 source: FsError,
510 },
511
512 /// A matched output path is not a regular file (a directory,
513 /// symlink to a non-file, socket, FIFO, etc.). The cache only
514 /// stores regular-file blobs (`CACHE-013`); non-file matches
515 /// cannot be ingested.
516 #[snafu(display("output path is not a regular file: {}", path.display()))]
517 OutputNotARegularFile {
518 /// Absolute path of the offending entry.
519 path: PathBuf,
520 },
521
522 /// A literal output pattern named a path the task did not
523 /// produce (the file does not exist on disk after a successful
524 /// run). Distinct from the input-side
525 /// [`BuildKeyError::InputPatternResolutionFailed`] because the
526 /// caller's intent is the inverse: outputs are a contract the
527 /// task is supposed to honour; a missing literal output is a
528 /// task-level bug, not a filesystem accident.
529 #[snafu(display("task declared output but did not produce it: {}", path.display()))]
530 OutputDeclaredButNotProduced {
531 /// Absolute path of the declared-but-missing output.
532 path: PathBuf,
533 },
534
535 /// [`haz_cache::Cache::store`] failed while persisting a
536 /// successful run as a cache entry (`CACHE-017`).
537 #[snafu(display("failed to store cache entry: {source}"))]
538 StoreFailed {
539 /// Originating [`StoreError`].
540 source: StoreError,
541 },
542
543 /// A workspace-absolute path produced by the executor's
544 /// output-resolution pass failed to parse as a
545 /// [`CanonicalPath`].
546 ///
547 /// Indicates an internal invariant violation: the resolver
548 /// builds each path string from already-validated
549 /// [`haz_domain::path::segment::PathSegment`]s, so the parse
550 /// is expected to succeed unconditionally. The variant exists
551 /// so the failure surfaces typed rather than panicking; in
552 /// practice it should never fire.
553 #[snafu(display("materialised output path is not workspace-absolute: {path}: {source}"))]
554 MaterialisedOutputPathInvalid {
555 /// The offending path string.
556 path: String,
557 /// Originating parse error.
558 source: ParseAbsoluteError,
559 },
560}
561
562/// Which of the two captured byte streams a
563/// [`RunTaskError::CapturedStreamReadFailed`] refers to.
564#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
565pub enum CapturedStream {
566 /// The child's standard output pipe.
567 Stdout,
568 /// The child's standard error pipe.
569 Stderr,
570}
571
572/// Borrowed bundle of long-lived state that every [`run_task`]
573/// invocation in a given `haz` run shares.
574///
575/// The split between this context and the per-task arguments
576/// matches a scheduler's natural shape: one `RunContext` covers
577/// the entire scheduling loop, while `task`, `predecessor_streams`,
578/// and `created_at_unix` change between individual [`run_task`]
579/// calls.
580pub struct RunContext<'a, F, S, O>
581where
582 F: WritableFilesystem,
583 S: ProcessSpawner,
584 O: RunObserver,
585{
586 /// Filesystem the executor reads inputs / outputs / cache state
587 /// through.
588 pub fs: &'a F,
589 /// Cache handle (lookup, restore, store).
590 pub cache: &'a Cache<F>,
591 /// Process spawner (a test-double mock under `cfg(test)`,
592 /// [`crate::std_impl::StdProcessSpawner`] in production).
593 pub spawner: &'a S,
594 /// Observer that receives lifecycle events and captured streams.
595 pub observer: &'a O,
596 /// Validated workspace state.
597 pub workspace: &'a Workspace,
598 /// Validated dependency graph.
599 pub graph: &'a TaskGraph,
600 /// Host-environment snapshot. Names not present here are
601 /// treated as absent per `CACHE-008`.
602 pub host_env: &'a BTreeMap<EnvVarName, String>,
603 /// Active hash algorithm for cache-key derivation, input /
604 /// output content hashing, and captured-stream hashing.
605 pub algo: HashAlgo,
606 /// Cancellation signal for the run (`EXEC-012` triggers).
607 /// The scheduler and per-task spawn-step futures observe
608 /// this token: when it fires, the scheduler stops admitting
609 /// new tasks and in-flight futures send SIGTERM to their
610 /// children. The token is long-lived ambient state owned by
611 /// the run's caller (typically `haz-cli` installing an OS
612 /// signal handler); tests pass a never-cancelled token.
613 pub cancel: &'a CancellationToken,
614}
615
616/// Run one task per its workspace declaration, cache-aware.
617///
618/// Lifecycle:
619///
620/// 1. [`RunObserver::on_task_started`] fires.
621/// 2. [`crate::cache_key::build_cache_key`] derives the key from
622/// `task`'s declared inputs, env contribution, and hard-edge
623/// predecessors' captured stream hashes.
624/// 3. [`haz_cache::Cache::lookup`] is consulted:
625/// - On a hit, [`haz_cache::Cache::restore`] materialises the
626/// manifest's outputs at their workspace-absolute paths;
627/// `stdout` and `stderr` flow through the observer; the
628/// function returns [`RunSource::CacheHit`] with the
629/// manifest's recorded stream hashes.
630/// - On a miss, the function builds a [`crate::process::SpawnPlan`]
631/// from the task's [`haz_domain::action::TaskAction`] and the
632/// effective env (`CACHE-008` runtime view: allow-listed host
633/// values plus task overrides, override wins), spawns through
634/// `spawner`, captures both streams to memory, awaits exit,
635/// emits the streams through the observer, classifies per
636/// `EXEC-009`. On success the outputs are enumerated and
637/// [`haz_cache::Cache::store`] persists the entry. On
638/// failure no store fires.
639/// 4. [`RunObserver::on_task_finished`] fires with the terminal
640/// [`CompletedRecord`].
641///
642/// The function is async because spawning, waiting, and stream
643/// capture all use the tokio runtime. Filesystem and cache calls
644/// are sync and run on the calling worker thread; callers that want
645/// to off-load the cache-key derivation (which hashes input file
646/// bytes) and the store phase (which hashes output file bytes) from
647/// an async context can wrap the function in
648/// [`tokio::task::spawn_blocking`] -- both phases dominate the
649/// runtime cost of a single-task lifecycle.
650///
651/// `created_at_unix` is caller-supplied so the function stays a
652/// pure function of its arguments: tests pin the value, the
653/// scheduler passes the current wall-clock time.
654///
655/// # Errors
656///
657/// Returns a [`RunTaskError`] for any executor-level failure
658/// (cache-key derivation, restore, spawn, wait, stream read,
659/// output resolution, store). Task-level failures (non-zero exit
660/// or signalled exit) are reported as
661/// `Ok(CompletedRecord { state: Failed, .. })` per `EXEC-009`;
662/// the cache is not consulted for store in that branch
663/// (`CACHE-018`).
664/// Carrier for the borrows + computed values
665/// [`cache_lookup_phase`] hands back to the caller (typically the
666/// scheduler in [`crate::run_graph::run_graph`]).
667///
668/// The struct lets `EXEC-007` step 1 (cache lookup) be performed
669/// independently of `EXEC-007` step 2 (restore) and step 3 (spawn);
670/// the scheduler interposes its mutex-compatibility check between
671/// the lookup and the spawn branch.
672///
673/// Lifetime `'ws` ties the `project` and `task_def` borrows to the
674/// workspace borrow held inside the supplying [`RunContext`].
675#[derive(Debug)]
676pub struct TaskLookup<'ws> {
677 /// Bearing project of `task` (looked up once so neither
678 /// downstream branch has to repeat the search).
679 pub project: &'ws Project,
680 /// Bearing task definition (action, mutex, env, etc.).
681 pub task_def: &'ws Task,
682 /// Computed cache key.
683 pub key: haz_cache::CacheKey,
684 /// Manifest of the matching cache entry, if any. `Some(_)`
685 /// drives the restore branch; `None` drives the spawn branch.
686 pub manifest: Option<haz_cache::Manifest>,
687}
688
689/// `EXEC-007` step 1 of the single-task lifecycle: resolve the
690/// bearing project + task, derive the cache key, and consult
691/// the cache. No mutex hold is taken; no observer event fires.
692///
693/// Returns a [`TaskLookup`] the caller drives forward into either
694/// [`restore_from_hit`] (manifest present) or [`run_fresh`]
695/// (manifest absent). The scheduler interposes its `EXEC-006`
696/// condition-3 mutex check between this function and
697/// [`run_fresh`].
698///
699/// # Errors
700///
701/// - [`RunTaskError::BuildKeyFailed`] when the task is missing from
702/// the workspace, or when cache-key derivation fails for any
703/// reason (input pattern resolution, file read, etc.).
704pub fn cache_lookup_phase<'ws, F, S, O>(
705 ctx: &RunContext<'ws, F, S, O>,
706 task: &TaskId,
707 predecessor_streams: &BTreeMap<TaskId, PredecessorStreamHashes>,
708) -> Result<TaskLookup<'ws>, RunTaskError>
709where
710 F: WritableFilesystem,
711 S: ProcessSpawner,
712 O: RunObserver,
713{
714 let project =
715 ctx.workspace
716 .projects
717 .get(&task.project)
718 .ok_or_else(|| RunTaskError::BuildKeyFailed {
719 source: BuildKeyError::TaskNotInWorkspace { task: task.clone() },
720 })?;
721 let task_def = project
722 .tasks
723 .get(&task.task)
724 .ok_or_else(|| RunTaskError::BuildKeyFailed {
725 source: BuildKeyError::TaskNotInWorkspace { task: task.clone() },
726 })?;
727
728 let key = build_cache_key(
729 ctx.fs,
730 ctx.workspace,
731 ctx.graph,
732 task,
733 ctx.host_env,
734 predecessor_streams,
735 ctx.algo,
736 )
737 .context(BuildKeyFailedSnafu)?;
738
739 let manifest = ctx.cache.lookup(&key);
740 Ok(TaskLookup {
741 project,
742 task_def,
743 key,
744 manifest,
745 })
746}
747
748/// Single-task lifecycle entry point: composes
749/// [`cache_lookup_phase`], [`restore_from_hit`], and
750/// [`run_fresh`] without any mutex orchestration.
751///
752/// Treats every task as if its mutex were always compatible.
753/// Callers that need `EXEC-006` condition 3 / `EXEC-007` mutex
754/// semantics (i.e., the scheduler) MUST drive the three phases
755/// directly.
756///
757/// Fires [`RunObserver::on_task_started`] before the lookup step
758/// and [`RunObserver::on_task_finished`] after the record is in
759/// hand.
760///
761/// # Errors
762///
763/// Surfaces the first error encountered across the composed
764/// phases; see the docs on each underlying function for the
765/// variants reachable from each phase.
766pub async fn run_task<F, S, O>(
767 ctx: &RunContext<'_, F, S, O>,
768 task: &TaskId,
769 predecessor_streams: &BTreeMap<TaskId, PredecessorStreamHashes>,
770 created_at_unix: u64,
771) -> Result<CompletedRecord, RunTaskError>
772where
773 F: WritableFilesystem,
774 S: ProcessSpawner,
775 O: RunObserver,
776{
777 ctx.observer.on_task_started(task);
778
779 let lookup = cache_lookup_phase(ctx, task, predecessor_streams)?;
780
781 let record = if let Some(manifest) = lookup.manifest.as_ref() {
782 restore_from_hit(ctx, task, manifest)?
783 } else {
784 run_fresh(
785 ctx,
786 task,
787 lookup.project,
788 lookup.task_def,
789 &lookup.key,
790 created_at_unix,
791 )
792 .await?
793 };
794
795 ctx.observer.on_task_finished(task, &record);
796 Ok(record)
797}
798
799/// Cache-hit branch: restore the manifest's outputs, emit the
800/// recorded streams to the observer, and build the matching
801/// [`CompletedRecord`] (`EXEC-007` step 2, `EXEC-017`,
802/// `CACHE-019`).
803///
804/// Per `MUTEX-007`, this function MUST NOT touch any mutex hold
805/// set: a cache hit reproduces the recorded effect via file
806/// writes and stream emission and never touches the resource the
807/// mutex protects.
808///
809/// Does NOT fire [`RunObserver::on_task_started`] or
810/// [`RunObserver::on_task_finished`]; the caller drives the
811/// observer lifecycle so the start/finish pair stays balanced
812/// across all three phases.
813///
814/// # Errors
815///
816/// - [`RunTaskError::RestoreFailed`] when the cache restore step
817/// fails (a `manifest` field references a blob that cannot be
818/// read, a destination cannot be written, etc.).
819pub fn restore_from_hit<F, S, O>(
820 ctx: &RunContext<'_, F, S, O>,
821 task: &TaskId,
822 manifest: &haz_cache::Manifest,
823) -> Result<CompletedRecord, RunTaskError>
824where
825 F: WritableFilesystem,
826 S: ProcessSpawner,
827 O: RunObserver,
828{
829 let restored = ctx.cache.restore(manifest).context(RestoreFailedSnafu)?;
830 ctx.observer.on_stdout(task, &restored.stdout);
831 ctx.observer.on_stderr(task, &restored.stderr);
832 let materialised_outputs = manifest
833 .outputs
834 .iter()
835 .map(|blob| blob.workspace_absolute_path.clone())
836 .collect();
837 Ok(CompletedRecord {
838 task: task.clone(),
839 source: RunSource::CacheHit,
840 state: RunState::Succeeded,
841 exit_status: None,
842 stdout_hash: manifest.stdout_hash,
843 stderr_hash: manifest.stderr_hash,
844 materialised_outputs,
845 })
846}
847
848/// Cache-miss branch (`EXEC-007` step 3, compatible mutex path):
849/// spawn the task's command, capture both streams, classify per
850/// `EXEC-009`, and on success persist the run to the cache.
851///
852/// Reader futures run concurrently with `process.wait()` so a child
853/// that fills its stdout/stderr pipe buffer never blocks waiting for
854/// the parent to drain (the classic pipe-deadlock shape). Stream
855/// hashes are computed under [`RunContext::algo`] from the captured
856/// byte buffers (`CACHE-011` corollary for fresh runs).
857///
858/// Per `MUTEX-006`, the caller MUST acquire the task's mutex
859/// before invoking this function and MUST release it after the
860/// returned future resolves (regardless of success or failure).
861/// This function does not consult any hold set itself.
862///
863/// Does NOT fire [`RunObserver::on_task_started`] or
864/// [`RunObserver::on_task_finished`]; the caller drives the
865/// observer lifecycle.
866///
867/// # Errors
868///
869/// - [`RunTaskError::SpawnFailed`] when the process spawner
870/// rejects the [`SpawnPlan`].
871/// - [`RunTaskError::WaitFailed`] when waiting on the spawned
872/// process surfaces an OS error.
873/// - [`RunTaskError::CapturedStreamReadFailed`] when reading
874/// either captured stream errors.
875/// - The output-resolution and store error families when the run
876/// succeeded and the cache-store step fails.
877pub async fn run_fresh<F, S, O>(
878 ctx: &RunContext<'_, F, S, O>,
879 task: &TaskId,
880 project: &Project,
881 task_def: &Task,
882 key: &haz_cache::CacheKey,
883 created_at_unix: u64,
884) -> Result<CompletedRecord, RunTaskError>
885where
886 F: WritableFilesystem,
887 S: ProcessSpawner,
888 O: RunObserver,
889{
890 let plan = build_spawn_plan(
891 ctx.workspace.root.as_path(),
892 project,
893 task_def,
894 ctx.host_env,
895 );
896
897 let Spawned {
898 mut process,
899 mut stdout,
900 mut stderr,
901 } = ctx.spawner.spawn(&plan).await.context(SpawnFailedSnafu)?;
902
903 let mut stdout_bytes = Vec::new();
904 let mut stderr_bytes = Vec::new();
905 let grace = ctx.workspace.settings.execution.cancel_grace.as_duration();
906 let ((wait_result, was_cancelled), stdout_result, stderr_result) = tokio::join!(
907 await_exit_with_cancel(&mut process, ctx.cancel, grace),
908 AsyncReadExt::read_to_end(&mut stdout, &mut stdout_bytes),
909 AsyncReadExt::read_to_end(&mut stderr, &mut stderr_bytes),
910 );
911
912 let exit_status = wait_result.context(WaitFailedSnafu)?;
913 stdout_result.context(CapturedStreamReadFailedSnafu {
914 stream: CapturedStream::Stdout,
915 })?;
916 stderr_result.context(CapturedStreamReadFailedSnafu {
917 stream: CapturedStream::Stderr,
918 })?;
919
920 ctx.observer.on_stdout(task, &stdout_bytes);
921 ctx.observer.on_stderr(task, &stderr_bytes);
922
923 let stdout_hash = hash_bytes(ctx.algo, &stdout_bytes);
924 let stderr_hash = hash_bytes(ctx.algo, &stderr_bytes);
925
926 let (state, materialised_outputs) = if was_cancelled {
927 // `EXEC-015`: cancelled runs never hit the store branch
928 // even on a zero exit code; cancellation is neither
929 // success nor a half-success.
930 (RunState::Cancelled, Vec::new())
931 } else if exit_status.success() {
932 let outputs_owned =
933 resolve_output_files(ctx.fs, ctx.workspace, project, &task_def.outputs)?;
934 let materialised = canonical_paths_from_owned(&outputs_owned)?;
935 store_successful_run(
936 ctx,
937 key,
938 &outputs_owned,
939 &stdout_bytes,
940 &stderr_bytes,
941 created_at_unix,
942 )?;
943 (RunState::Succeeded, materialised)
944 } else {
945 (RunState::Failed, Vec::new())
946 };
947
948 Ok(CompletedRecord {
949 task: task.clone(),
950 source: RunSource::FreshRun,
951 state,
952 exit_status: Some(exit_status),
953 stdout_hash,
954 stderr_hash,
955 materialised_outputs,
956 })
957}
958
959/// Wait for the spawned child to exit, observing the run's
960/// cancellation token.
961///
962/// On the normal path the function delegates to [`Process::wait`]
963/// and returns `(Ok(status), false)`. When the cancellation token
964/// fires while the child is still running, the function:
965///
966/// 1. Sends [`Signal::Terminate`] (best-effort; the error is
967/// swallowed so Windows hosts that surface
968/// [`std::io::ErrorKind::Unsupported`] still escalate).
969/// 2. Races a fresh [`Process::wait`] against
970/// [`tokio::time::sleep`] of `grace`.
971/// 3. If the sleep wins, sends [`Signal::Kill`] (best-effort) and
972/// awaits the child.
973///
974/// In both cancellation paths the second element of the returned
975/// tuple is `true`; the caller classifies the run as
976/// [`RunState::Cancelled`]. A `grace` of [`Duration::ZERO`]
977/// collapses the SIGTERM-to-SIGKILL window: SIGKILL fires
978/// immediately after the SIGTERM (`EXEC-014`).
979async fn await_exit_with_cancel<P>(
980 process: &mut P,
981 cancel: &CancellationToken,
982 grace: std::time::Duration,
983) -> (Result<ExitStatus, ProcessError>, bool)
984where
985 P: Process,
986{
987 tokio::select! {
988 biased;
989 () = cancel.cancelled() => {
990 let _ = process.send_signal(Signal::Terminate);
991 tokio::select! {
992 result = process.wait() => (result, true),
993 () = tokio::time::sleep(grace) => {
994 let _ = process.send_signal(Signal::Kill);
995 (process.wait().await, true)
996 }
997 }
998 }
999 result = process.wait() => (result, false),
1000 }
1001}
1002
1003/// Persist the successful run as a cache entry under `key`
1004/// (`CACHE-017`, `CACHE-018`).
1005///
1006/// Called only when the fresh run's exit status was zero; the
1007/// caller has already classified the run as succeeded and has
1008/// already resolved the task's declared outputs (so the same
1009/// resolved Vec can drive both the cache store and the
1010/// scheduler's materialised-outputs view).
1011fn store_successful_run<F, S, O>(
1012 ctx: &RunContext<'_, F, S, O>,
1013 key: &haz_cache::CacheKey,
1014 outputs_owned: &[OwnedOutputFile],
1015 stdout: &[u8],
1016 stderr: &[u8],
1017 created_at_unix: u64,
1018) -> Result<(), RunTaskError>
1019where
1020 F: WritableFilesystem,
1021 S: ProcessSpawner,
1022 O: RunObserver,
1023{
1024 let stored_outputs: Vec<StoredOutput<'_>> = outputs_owned
1025 .iter()
1026 .map(|f| StoredOutput {
1027 workspace_absolute_path: &f.workspace_absolute_path,
1028 on_disk_path: &f.on_disk_path,
1029 mode: f.mode,
1030 })
1031 .collect();
1032 let inputs = StoreInputs {
1033 outputs: &stored_outputs,
1034 stdout,
1035 stderr,
1036 created_at_unix,
1037 };
1038 ctx.cache.store(key, &inputs).context(StoreFailedSnafu)
1039}
1040
1041/// Parse the workspace-absolute path string of each
1042/// [`OwnedOutputFile`] into a [`CanonicalPath`] for the
1043/// scheduler's runtime DAG validation pass.
1044///
1045/// The strings come from
1046/// [`pattern_walk::workspace_absolute_string_from_segments`]
1047/// where every segment was validated upstream by
1048/// [`PathSegment`]'s constructors, so the parse should never
1049/// fail in practice; the [`Result`] exists so a hypothetical
1050/// invariant violation surfaces as a typed
1051/// [`RunTaskError::MaterialisedOutputPathInvalid`] rather than
1052/// a panic.
1053fn canonical_paths_from_owned(
1054 files: &[OwnedOutputFile],
1055) -> Result<Vec<CanonicalPath>, RunTaskError> {
1056 files
1057 .iter()
1058 .map(|f| {
1059 CanonicalPath::parse_workspace_absolute(&f.workspace_absolute_path).context(
1060 MaterialisedOutputPathInvalidSnafu {
1061 path: f.workspace_absolute_path.clone(),
1062 },
1063 )
1064 })
1065 .collect()
1066}
1067
1068/// Hash `bytes` under the active [`HashAlgo`]. Used to compute the
1069/// captured-stream hashes recorded on a fresh-run
1070/// [`CompletedRecord`].
1071fn hash_bytes(algo: HashAlgo, bytes: &[u8]) -> [u8; 32] {
1072 let mut hasher = Hasher::new(algo);
1073 hasher.update(bytes);
1074 hasher.finalize()
1075}
1076
1077/// Assemble the [`SpawnPlan`] that [`run_task`]'s miss branch hands
1078/// to the [`ProcessSpawner`].
1079///
1080/// Three pieces are derived from `task_def`:
1081///
1082/// - `program` and `args` from `task_def.action`. A
1083/// [`TaskAction::Command`] splits as `program = argv.head`,
1084/// `args = argv.tail`. A [`TaskAction::Shell`] becomes
1085/// `program = <shell-binary-name>`, `args = ["-c", script]`.
1086/// - `cwd` is the project's host path: `workspace_host` walked
1087/// through the project root's segments. An implicit-mode
1088/// project ([`ProjectRoot::WorkspaceRoot`], per `DISC-003`) lands
1089/// at `workspace_host` itself with no further pushing.
1090/// - `env` is the effective env per `CACHE-008` runtime view:
1091/// names in `task_def.env.from_host` propagate the value the
1092/// host snapshot recorded for them, names absent from the host
1093/// simply do not appear in the env vector; `task_def.env.overrides`
1094/// entries are layered on top, override winning on collision.
1095/// The std backend's `env_clear()` ensures the child sees this
1096/// set and nothing else.
1097fn build_spawn_plan(
1098 workspace_host: &Path,
1099 project: &Project,
1100 task_def: &Task,
1101 host_env: &BTreeMap<EnvVarName, String>,
1102) -> SpawnPlan {
1103 let (program, args) = program_and_args(&task_def.action);
1104 SpawnPlan {
1105 program,
1106 args,
1107 env: build_env_vec(&task_def.env, host_env),
1108 cwd: project_cwd(workspace_host, &project.root),
1109 }
1110}
1111
1112/// Walk `workspace_host` through the project root's segments. For
1113/// [`ProjectRoot::WorkspaceRoot`] (implicit-mode project per
1114/// `DISC-003`) returns the workspace host path verbatim.
1115fn project_cwd(workspace_host: &Path, project_root: &ProjectRoot) -> PathBuf {
1116 let mut p = workspace_host.to_path_buf();
1117 if let ProjectRoot::Nested(canonical) = project_root {
1118 for seg in canonical.segments() {
1119 p.push(seg.as_str());
1120 }
1121 }
1122 p
1123}
1124
1125/// Convert a [`TaskAction`] into the
1126/// (`program`, `args`) pair the spawner consumes.
1127fn program_and_args(action: &TaskAction) -> (OsString, Vec<OsString>) {
1128 match action {
1129 TaskAction::Command(nonempty_argv) => {
1130 let program = OsString::from(nonempty_argv.head.as_str());
1131 let args = nonempty_argv.tail.iter().map(OsString::from).collect();
1132 (program, args)
1133 }
1134 TaskAction::Shell { script, shell } => {
1135 let program = OsString::from(shell_binary_name(shell));
1136 let args = vec![OsString::from("-c"), OsString::from(script)];
1137 (program, args)
1138 }
1139 }
1140}
1141
1142/// Resolve [`ShellType`] to the binary name passed as `argv[0]` to
1143/// `exec`.
1144///
1145/// First-class variants emit their canonical identifiers (`sh`,
1146/// `bash`); [`ShellType::Other`] emits the validated name verbatim.
1147/// The shell is resolved through `PATH` at exec time, matching how
1148/// `system(3)` and `/bin/sh -c` behave on POSIX hosts.
1149fn shell_binary_name(shell: &ShellType) -> &str {
1150 match shell {
1151 ShellType::Sh => "sh",
1152 ShellType::Bash => "bash",
1153 ShellType::Other(name) => AsRef::<str>::as_ref(name.as_ref()),
1154 }
1155}
1156
1157/// Assemble the effective env vector per `CACHE-008` runtime view.
1158///
1159/// Steps:
1160///
1161/// 1. For each name in `env.from_host`, look it up in `host_env`.
1162/// Present names contribute their host value; absent names
1163/// contribute nothing (the child simply will not see that name).
1164/// 2. For each entry in `env.overrides`, set the name to the
1165/// override value, replacing any same-name entry from step 1
1166/// (`CACHE-008` "overrides wins on collision").
1167///
1168/// Names not in either map do not appear; the std spawner's
1169/// `env_clear()` ensures the child's env is exactly the returned
1170/// set.
1171///
1172/// Order in the returned vector is lexicographic by name (the
1173/// [`BTreeMap`] keeps it so). The std spawner preserves order on
1174/// `Command::env`; either order works on POSIX hosts because the
1175/// child sees a set, not a sequence, but lexicographic ordering
1176/// makes test assertions deterministic.
1177fn build_env_vec(
1178 env: &EnvSettings,
1179 host_env: &BTreeMap<EnvVarName, String>,
1180) -> Vec<(OsString, OsString)> {
1181 let mut effective: BTreeMap<OsString, OsString> = BTreeMap::new();
1182 for name in &env.from_host {
1183 if let Some(value) = host_env.get(name) {
1184 effective.insert(
1185 OsString::from(AsRef::<str>::as_ref(name.as_ref())),
1186 OsString::from(value),
1187 );
1188 }
1189 }
1190 for (name, value) in &env.overrides {
1191 effective.insert(
1192 OsString::from(AsRef::<str>::as_ref(name.as_ref())),
1193 OsString::from(value),
1194 );
1195 }
1196 effective.into_iter().collect()
1197}
1198
1199/// One file resolved from a task's `outputs` patterns, paired with
1200/// the host filesystem path it lives at and the Unix permission bits
1201/// recorded for it.
1202///
1203/// `workspace_absolute_path` is the workspace-anchored string (rooted
1204/// at `/`) the cache library records in the manifest;
1205/// `on_disk_path` is the real host path the cache reads to ingest the
1206/// blob; `mode` is the value [`haz_cache::StoredOutput::mode`]
1207/// receives on store. Borrowed views of these fields drop straight
1208/// into [`haz_cache::StoredOutput`].
1209#[derive(Debug, Clone, PartialEq, Eq)]
1210struct OwnedOutputFile {
1211 workspace_absolute_path: String,
1212 on_disk_path: PathBuf,
1213 mode: u32,
1214}
1215
1216/// Resolve every [`OutputSpec`] declared on a task against `fs` and
1217/// record each match's host path and Unix mode.
1218///
1219/// Output-side mirror of the input resolver in [`crate::cache_key`]:
1220/// a [`PathPattern::Literal`] contributes exactly one entry on a
1221/// regular-file match and a typed failure otherwise; a
1222/// [`PathPattern::Glob`] contributes zero or more entries through
1223/// the [`GlobWalk`] machinery. Errors surface the first failure
1224/// encountered while walking, reading metadata, or reading the
1225/// permission bits; later outputs in the same task are not
1226/// attempted.
1227///
1228/// Diagnostic divergence from the input resolver: a literal output
1229/// that resolves to [`FsError::NotFound`] surfaces as
1230/// [`RunTaskError::OutputDeclaredButNotProduced`] rather than
1231/// [`RunTaskError::OutputPatternResolutionFailed`]. Outputs are a
1232/// contract the task is supposed to honour, so a missing literal
1233/// output is a task-level failure, not a filesystem accident.
1234///
1235/// Symlink semantics follow the input resolver: a symlink-to-file
1236/// contributes the link's own workspace-absolute path, while the
1237/// mode bits come from the canonical target via
1238/// [`Filesystem::permissions`] (which follows symlinks).
1239fn resolve_output_files<F: Filesystem>(
1240 fs: &F,
1241 workspace: &Workspace,
1242 project: &Project,
1243 outputs: &[OutputSpec],
1244) -> Result<Vec<OwnedOutputFile>, RunTaskError> {
1245 let workspace_host = workspace.root.as_path();
1246 let action = OutputAction;
1247 let mut out = Vec::new();
1248
1249 for spec in outputs {
1250 match spec.pattern() {
1251 PathPattern::Literal(haz_path) => {
1252 resolve_literal_output(
1253 fs,
1254 workspace_host,
1255 &project.root,
1256 haz_path,
1257 &action,
1258 &mut out,
1259 )?;
1260 }
1261 PathPattern::Glob(glob_pattern) => {
1262 let glob = glob_pattern.compile();
1263 let matcher = glob.compile_matcher();
1264 let (walk_host, workspace_prefix, candidate_prefix) =
1265 glob_walk_origin(workspace_host, &project.root, glob_pattern.anchor());
1266 let walker = GlobWalk {
1267 fs,
1268 matcher: &matcher,
1269 candidate_prefix,
1270 workspace_prefix,
1271 action: &action,
1272 };
1273 let mut walk_rel: Vec<String> = Vec::new();
1274 walker.walk(&walk_host, &mut walk_rel, &mut out)?;
1275 }
1276 }
1277 }
1278
1279 Ok(out)
1280}
1281
1282/// Resolve a literal [`OutputSpec`] under the supplied project root
1283/// and dispatch to the per-match action on a regular-file match.
1284///
1285/// Splits the failure space three ways:
1286///
1287/// - [`FsError::NotFound`] becomes
1288/// [`RunTaskError::OutputDeclaredButNotProduced`].
1289/// - Any other [`FsError`] from the metadata read becomes
1290/// [`RunTaskError::OutputPatternResolutionFailed`].
1291/// - A non-regular-file kind becomes
1292/// [`RunTaskError::OutputNotARegularFile`].
1293fn resolve_literal_output<F: Filesystem>(
1294 fs: &F,
1295 workspace_host: &Path,
1296 project_root: &ProjectRoot,
1297 haz_path: &haz_domain::path::HazPath,
1298 action: &OutputAction,
1299 out: &mut Vec<OwnedOutputFile>,
1300) -> Result<(), RunTaskError> {
1301 let ws_segments = literal_workspace_segments(haz_path, project_root);
1302 let host = host_path_from_segments(workspace_host, &ws_segments);
1303
1304 let meta = match fs.metadata(&host) {
1305 Ok(m) => m,
1306 Err(FsError::NotFound { path }) => {
1307 return Err(RunTaskError::OutputDeclaredButNotProduced { path });
1308 }
1309 Err(source) => {
1310 return Err(RunTaskError::OutputPatternResolutionFailed { root: host, source });
1311 }
1312 };
1313 if meta.kind != EntryKind::File {
1314 return Err(RunTaskError::OutputNotARegularFile { path: host });
1315 }
1316
1317 let workspace_absolute_path = workspace_absolute_string_from_segments(&ws_segments);
1318 action.on_match(fs, &host, workspace_absolute_path, out)
1319}
1320
1321/// Per-match action for output resolution: record the matched file's
1322/// host path and Unix mode. Walk-level [`FsError`]s surface as
1323/// [`RunTaskError::OutputPatternResolutionFailed`]; mode-read failures
1324/// surface as [`RunTaskError::OutputModeReadFailed`].
1325///
1326/// The action carries no state: the host snapshot, project root,
1327/// and workspace path projection are all threaded through the
1328/// [`GlobWalk`] machinery.
1329struct OutputAction;
1330
1331impl<F: Filesystem> GlobMatchAction<F> for OutputAction {
1332 type Output = OwnedOutputFile;
1333 type Error = RunTaskError;
1334
1335 fn map_walk_error(&self, root: PathBuf, source: FsError) -> RunTaskError {
1336 RunTaskError::OutputPatternResolutionFailed { root, source }
1337 }
1338
1339 fn on_match(
1340 &self,
1341 fs: &F,
1342 host_path: &Path,
1343 workspace_absolute_path: String,
1344 out: &mut Vec<OwnedOutputFile>,
1345 ) -> Result<(), RunTaskError> {
1346 let mode = fs
1347 .permissions(host_path)
1348 .context(OutputModeReadFailedSnafu {
1349 path: host_path.to_path_buf(),
1350 })?;
1351 out.push(OwnedOutputFile {
1352 workspace_absolute_path,
1353 on_disk_path: host_path.to_path_buf(),
1354 mode,
1355 });
1356 Ok(())
1357 }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362 use std::collections::{BTreeMap, BTreeSet};
1363 use std::ffi::OsString;
1364 use std::path::{Path, PathBuf};
1365 use std::str::FromStr;
1366
1367 use nonempty::NonEmpty;
1368
1369 use haz_domain::action::{ShellType, TaskAction};
1370 use haz_domain::env::{EnvSettings, EnvVarName};
1371 use haz_domain::name::{ProjectName, TaskName};
1372 use haz_domain::path::{CanonicalPath, HazPath, InputSpec, ProjectRoot};
1373 use haz_domain::project::Project;
1374 use haz_domain::task::Task;
1375
1376 use super::{build_env_vec, build_spawn_plan, project_cwd, shell_binary_name};
1377
1378 fn env_name(s: &str) -> EnvVarName {
1379 EnvVarName::try_new(s).unwrap()
1380 }
1381
1382 fn project_name(s: &str) -> ProjectName {
1383 ProjectName::try_new(s).unwrap()
1384 }
1385
1386 fn task_name(s: &str) -> TaskName {
1387 TaskName::try_new(s).unwrap()
1388 }
1389
1390 fn nested_project(name: &str, root: &str) -> Project {
1391 Project {
1392 name: project_name(name),
1393 root: ProjectRoot::Nested(
1394 CanonicalPath::from_absolute(&HazPath::parse(root).unwrap()).unwrap(),
1395 ),
1396 tags: BTreeSet::new(),
1397 tasks: BTreeMap::new(),
1398 }
1399 }
1400
1401 fn implicit_project(name: &str) -> Project {
1402 Project {
1403 name: project_name(name),
1404 root: ProjectRoot::WorkspaceRoot,
1405 tags: BTreeSet::new(),
1406 tasks: BTreeMap::new(),
1407 }
1408 }
1409
1410 fn task_with(action: TaskAction, env: EnvSettings) -> Task {
1411 Task {
1412 name: task_name("run"),
1413 action,
1414 inputs: Vec::<InputSpec>::new(),
1415 outputs: Vec::new(),
1416 deps: Vec::new(),
1417 weak_deps: Vec::new(),
1418 mutex: None,
1419 env,
1420 }
1421 }
1422
1423 fn command(argv: &[&str]) -> TaskAction {
1424 TaskAction::Command(
1425 NonEmpty::from_vec(argv.iter().map(|s| (*s).to_owned()).collect())
1426 .expect("non-empty argv"),
1427 )
1428 }
1429
1430 fn shell(script: &str, shell_type: ShellType) -> TaskAction {
1431 TaskAction::Shell {
1432 script: script.to_owned(),
1433 shell: shell_type,
1434 }
1435 }
1436
1437 // ---- shell_binary_name ----
1438
1439 #[test]
1440 fn shell_binary_name_for_sh() {
1441 assert_eq!(shell_binary_name(&ShellType::Sh), "sh");
1442 }
1443
1444 #[test]
1445 fn shell_binary_name_for_bash() {
1446 assert_eq!(shell_binary_name(&ShellType::Bash), "bash");
1447 }
1448
1449 #[test]
1450 fn shell_binary_name_for_other_uses_validated_name() {
1451 let name = haz_domain::action::NonEmptyAsciiName::from_str("zsh").unwrap();
1452 assert_eq!(shell_binary_name(&ShellType::Other(name)), "zsh");
1453 }
1454
1455 // ---- project_cwd ----
1456
1457 #[test]
1458 fn project_cwd_for_nested_appends_segments() {
1459 let p = nested_project("lib_core", "/lib_core");
1460 let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
1461 assert_eq!(cwd, PathBuf::from("/abs/ws/lib_core"));
1462 }
1463
1464 #[test]
1465 fn project_cwd_for_deep_nested_walks_every_segment() {
1466 let p = nested_project("frontend", "/web/frontend");
1467 let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
1468 assert_eq!(cwd, PathBuf::from("/abs/ws/web/frontend"));
1469 }
1470
1471 #[test]
1472 fn project_cwd_for_implicit_mode_is_workspace_host() {
1473 let p = implicit_project("root");
1474 let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
1475 assert_eq!(cwd, PathBuf::from("/abs/ws"));
1476 }
1477
1478 // ---- build_env_vec ----
1479
1480 fn env_settings(from_host: &[&str], overrides: &[(&str, &str)]) -> EnvSettings {
1481 EnvSettings {
1482 from_host: from_host.iter().map(|s| env_name(s)).collect(),
1483 overrides: overrides
1484 .iter()
1485 .map(|(k, v)| (env_name(k), (*v).to_owned()))
1486 .collect(),
1487 }
1488 }
1489
1490 fn host_snapshot(entries: &[(&str, &str)]) -> BTreeMap<EnvVarName, String> {
1491 entries
1492 .iter()
1493 .map(|(k, v)| (env_name(k), (*v).to_owned()))
1494 .collect()
1495 }
1496
1497 fn osstr_pairs(slice: &[(&str, &str)]) -> Vec<(OsString, OsString)> {
1498 slice
1499 .iter()
1500 .map(|(k, v)| (OsString::from(*k), OsString::from(*v)))
1501 .collect()
1502 }
1503
1504 #[test]
1505 fn build_env_vec_from_host_present_name_propagates() {
1506 let settings = env_settings(&["PATH"], &[]);
1507 let host = host_snapshot(&[("PATH", "/usr/bin")]);
1508 let got = build_env_vec(&settings, &host);
1509 assert_eq!(got, osstr_pairs(&[("PATH", "/usr/bin")]));
1510 }
1511
1512 #[test]
1513 fn build_env_vec_from_host_absent_name_is_excluded() {
1514 // CACHE-008: a from_host name absent from the host
1515 // snapshot contributes the `0x00` absent marker to the
1516 // cache key, but does NOT appear in the child's runtime
1517 // env. The child simply does not see the variable.
1518 let settings = env_settings(&["NEVER_SET"], &[]);
1519 let host = host_snapshot(&[("OTHER", "v")]);
1520 let got = build_env_vec(&settings, &host);
1521 assert!(
1522 got.is_empty(),
1523 "absent from_host names must not enter the child env, got {got:?}",
1524 );
1525 }
1526
1527 #[test]
1528 fn build_env_vec_override_wins_on_collision() {
1529 // CACHE-008 "overrides wins on collision": at runtime the
1530 // child sees the override value, not the host value.
1531 let settings = env_settings(&["X"], &[("X", "override-val")]);
1532 let host = host_snapshot(&[("X", "host-val")]);
1533 let got = build_env_vec(&settings, &host);
1534 assert_eq!(got, osstr_pairs(&[("X", "override-val")]));
1535 }
1536
1537 #[test]
1538 fn build_env_vec_override_only_entries_propagate() {
1539 let settings = env_settings(&[], &[("HAZ_MODE", "ci")]);
1540 let host = host_snapshot(&[]);
1541 let got = build_env_vec(&settings, &host);
1542 assert_eq!(got, osstr_pairs(&[("HAZ_MODE", "ci")]));
1543 }
1544
1545 #[test]
1546 fn build_env_vec_unrelated_host_names_do_not_appear() {
1547 // Cache soundness corollary: a host variable NOT named in
1548 // either from_host or overrides MUST NOT enter the child's
1549 // env, even though it sits in the host snapshot.
1550 let settings = env_settings(&["PATH"], &[]);
1551 let host = host_snapshot(&[("PATH", "/usr/bin"), ("HOME", "/home/me")]);
1552 let got = build_env_vec(&settings, &host);
1553 assert_eq!(got, osstr_pairs(&[("PATH", "/usr/bin")]));
1554 }
1555
1556 #[test]
1557 fn build_env_vec_result_is_lexicographic() {
1558 let settings = env_settings(&["ZULU", "ALPHA", "BRAVO"], &[]);
1559 let host = host_snapshot(&[("ALPHA", "a"), ("BRAVO", "b"), ("ZULU", "z")]);
1560 let got = build_env_vec(&settings, &host);
1561 let names: Vec<&OsString> = got.iter().map(|(k, _)| k).collect();
1562 assert_eq!(
1563 names,
1564 vec![
1565 &OsString::from("ALPHA"),
1566 &OsString::from("BRAVO"),
1567 &OsString::from("ZULU"),
1568 ]
1569 );
1570 }
1571
1572 #[test]
1573 fn build_env_vec_empty_value_is_kept() {
1574 // An empty-string value is a legitimate POSIX env entry,
1575 // and distinct from absence (which CACHE-008 separates via
1576 // the 0x00 marker). Make sure we propagate the empty string
1577 // rather than collapsing it to "absent".
1578 let settings = env_settings(&["X"], &[]);
1579 let host = host_snapshot(&[("X", "")]);
1580 let got = build_env_vec(&settings, &host);
1581 assert_eq!(got, osstr_pairs(&[("X", "")]));
1582 }
1583
1584 // ---- build_spawn_plan ----
1585
1586 #[test]
1587 fn build_spawn_plan_command_maps_argv_head_and_tail() {
1588 let p = nested_project("proj", "/proj");
1589 let t = task_with(
1590 command(&["cargo", "build", "--release"]),
1591 EnvSettings::default(),
1592 );
1593 let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
1594
1595 assert_eq!(plan.program, OsString::from("cargo"));
1596 assert_eq!(
1597 plan.args,
1598 vec![OsString::from("build"), OsString::from("--release")],
1599 );
1600 }
1601
1602 #[test]
1603 fn build_spawn_plan_command_with_single_arg_has_empty_args() {
1604 let p = nested_project("proj", "/proj");
1605 let t = task_with(command(&["true"]), EnvSettings::default());
1606 let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
1607
1608 assert_eq!(plan.program, OsString::from("true"));
1609 assert!(plan.args.is_empty());
1610 }
1611
1612 #[test]
1613 fn build_spawn_plan_shell_uses_dash_c_and_script() {
1614 let p = nested_project("proj", "/proj");
1615 let t = task_with(shell("echo hi", ShellType::Sh), EnvSettings::default());
1616 let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
1617
1618 assert_eq!(plan.program, OsString::from("sh"));
1619 assert_eq!(
1620 plan.args,
1621 vec![OsString::from("-c"), OsString::from("echo hi")],
1622 );
1623 }
1624
1625 #[test]
1626 fn build_spawn_plan_shell_other_uses_named_binary() {
1627 let p = nested_project("proj", "/proj");
1628 let other = haz_domain::action::NonEmptyAsciiName::from_str("zsh").unwrap();
1629 let t = task_with(
1630 shell("echo hi", ShellType::Other(other)),
1631 EnvSettings::default(),
1632 );
1633 let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
1634
1635 assert_eq!(plan.program, OsString::from("zsh"));
1636 assert_eq!(
1637 plan.args,
1638 vec![OsString::from("-c"), OsString::from("echo hi")],
1639 );
1640 }
1641
1642 #[test]
1643 fn build_spawn_plan_sets_cwd_to_project_host_path_for_nested() {
1644 let p = nested_project("frontend", "/web/frontend");
1645 let t = task_with(command(&["true"]), EnvSettings::default());
1646 let plan = build_spawn_plan(Path::new("/abs/ws"), &p, &t, &BTreeMap::new());
1647
1648 assert_eq!(plan.cwd, PathBuf::from("/abs/ws/web/frontend"));
1649 }
1650
1651 #[test]
1652 fn build_spawn_plan_sets_cwd_to_workspace_host_for_implicit_project() {
1653 let p = implicit_project("root");
1654 let t = task_with(command(&["true"]), EnvSettings::default());
1655 let plan = build_spawn_plan(Path::new("/abs/ws"), &p, &t, &BTreeMap::new());
1656
1657 assert_eq!(plan.cwd, PathBuf::from("/abs/ws"));
1658 }
1659
1660 #[test]
1661 fn build_spawn_plan_env_reflects_from_host_overrides_and_excludes_unrelated() {
1662 // End-to-end: feed a representative env config into the
1663 // whole helper and confirm the SpawnPlan.env matches the
1664 // CACHE-008 runtime view exactly.
1665 let settings = env_settings(
1666 &["PATH", "MISSING"],
1667 &[("HAZ_MODE", "ci"), ("PATH", "/override/bin")],
1668 );
1669 let p = nested_project("proj", "/proj");
1670 let t = task_with(command(&["true"]), settings);
1671
1672 let host = host_snapshot(&[("PATH", "/usr/bin"), ("UNRELATED", "should-not-appear")]);
1673 let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &host);
1674
1675 assert_eq!(
1676 plan.env,
1677 osstr_pairs(&[("HAZ_MODE", "ci"), ("PATH", "/override/bin"),]),
1678 "override wins over from_host; UNRELATED and MISSING (absent) are excluded",
1679 );
1680 }
1681
1682 // ---- resolve_output_files ----
1683
1684 mod output_resolution {
1685 use std::collections::{BTreeMap, BTreeSet};
1686 use std::path::PathBuf;
1687
1688 use haz_domain::path::{
1689 CanonicalPath, HazPath, OutputSpec, ProjectRoot, WorkspaceRootPath,
1690 };
1691 use haz_domain::project::Project;
1692 use haz_domain::settings::WorkspaceSettings;
1693 use haz_domain::workspace::Workspace;
1694 use haz_vfs::{MemFilesystem, WritableFilesystem};
1695
1696 use super::super::{OwnedOutputFile, RunTaskError, resolve_output_files};
1697
1698 const WORKSPACE_HOST: &str = "/ws";
1699 const PROJECT_HOST: &str = "/ws/proj";
1700
1701 fn nested_project() -> Project {
1702 Project {
1703 name: haz_domain::name::ProjectName::try_new("proj").unwrap(),
1704 root: ProjectRoot::Nested(
1705 CanonicalPath::from_absolute(&HazPath::parse("/proj").unwrap()).unwrap(),
1706 ),
1707 tags: BTreeSet::new(),
1708 tasks: BTreeMap::new(),
1709 }
1710 }
1711
1712 fn implicit_project() -> Project {
1713 Project {
1714 name: haz_domain::name::ProjectName::try_new("root").unwrap(),
1715 root: ProjectRoot::WorkspaceRoot,
1716 tags: BTreeSet::new(),
1717 tasks: BTreeMap::new(),
1718 }
1719 }
1720
1721 fn workspace_with(project: &Project) -> Workspace {
1722 let mut projects = BTreeMap::new();
1723 projects.insert(project.name.clone(), project.clone());
1724 Workspace {
1725 root: WorkspaceRootPath::try_new(PathBuf::from(WORKSPACE_HOST)).unwrap(),
1726 projects,
1727 overlays: BTreeMap::new(),
1728 settings: WorkspaceSettings::default(),
1729 }
1730 }
1731
1732 fn paths_of(files: &[OwnedOutputFile]) -> BTreeSet<String> {
1733 files
1734 .iter()
1735 .map(|f| f.workspace_absolute_path.clone())
1736 .collect()
1737 }
1738
1739 fn host_paths_of(files: &[OwnedOutputFile]) -> BTreeSet<PathBuf> {
1740 files.iter().map(|f| f.on_disk_path.clone()).collect()
1741 }
1742
1743 fn modes_by_workspace_path(files: &[OwnedOutputFile]) -> BTreeMap<String, u32> {
1744 files
1745 .iter()
1746 .map(|f| (f.workspace_absolute_path.clone(), f.mode))
1747 .collect()
1748 }
1749
1750 #[test]
1751 fn literal_hit_returns_one_output_with_recorded_mode() {
1752 let mut fs = MemFilesystem::new();
1753 fs.add_dir(PROJECT_HOST).unwrap();
1754 fs.add_file_with_mode(format!("{PROJECT_HOST}/bundle.js"), b"data".to_vec(), 0o640)
1755 .unwrap();
1756
1757 let project = nested_project();
1758 let workspace = workspace_with(&project);
1759 let outputs = vec![OutputSpec::parse("bundle.js").unwrap()];
1760
1761 let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1762 assert_eq!(result.len(), 1);
1763 assert_eq!(result[0].workspace_absolute_path, "/proj/bundle.js");
1764 assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/proj/bundle.js"));
1765 assert_eq!(result[0].mode & 0o7777, 0o640);
1766 }
1767
1768 #[test]
1769 fn literal_workspace_absolute_resolves_under_workspace_root() {
1770 let mut fs = MemFilesystem::new();
1771 fs.add_dir("/ws/dist").unwrap();
1772 fs.add_file_with_mode("/ws/dist/main.js", b"x".to_vec(), 0o755)
1773 .unwrap();
1774 fs.add_dir(PROJECT_HOST).unwrap();
1775
1776 let project = nested_project();
1777 let workspace = workspace_with(&project);
1778 let outputs = vec![OutputSpec::parse("/dist/main.js").unwrap()];
1779
1780 let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1781 assert_eq!(result.len(), 1);
1782 assert_eq!(result[0].workspace_absolute_path, "/dist/main.js");
1783 assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/dist/main.js"));
1784 assert_eq!(result[0].mode & 0o7777, 0o755);
1785 }
1786
1787 #[test]
1788 fn literal_missing_surfaces_output_declared_but_not_produced() {
1789 // Outputs are a contract the task is supposed to honour;
1790 // a missing literal must NOT collapse into the input-side
1791 // PatternResolutionFailed shape.
1792 let mut fs = MemFilesystem::new();
1793 fs.add_dir(PROJECT_HOST).unwrap();
1794
1795 let project = nested_project();
1796 let workspace = workspace_with(&project);
1797 let outputs = vec![OutputSpec::parse("absent.txt").unwrap()];
1798
1799 match resolve_output_files(&fs, &workspace, &project, &outputs) {
1800 Err(RunTaskError::OutputDeclaredButNotProduced { path }) => {
1801 assert_eq!(path, PathBuf::from("/ws/proj/absent.txt"));
1802 }
1803 other => panic!("expected OutputDeclaredButNotProduced, got {other:?}"),
1804 }
1805 }
1806
1807 #[test]
1808 fn literal_pointing_at_directory_surfaces_output_not_a_regular_file() {
1809 let mut fs = MemFilesystem::new();
1810 fs.add_dir(format!("{PROJECT_HOST}/subdir")).unwrap();
1811
1812 let project = nested_project();
1813 let workspace = workspace_with(&project);
1814 let outputs = vec![OutputSpec::parse("subdir").unwrap()];
1815
1816 match resolve_output_files(&fs, &workspace, &project, &outputs) {
1817 Err(RunTaskError::OutputNotARegularFile { path }) => {
1818 assert_eq!(path, PathBuf::from("/ws/proj/subdir"));
1819 }
1820 other => panic!("expected OutputNotARegularFile, got {other:?}"),
1821 }
1822 }
1823
1824 #[test]
1825 fn glob_multi_match_collects_every_matching_file_with_its_mode() {
1826 let mut fs = MemFilesystem::new();
1827 fs.add_dir(PROJECT_HOST).unwrap();
1828 fs.add_file_with_mode(format!("{PROJECT_HOST}/a.js"), b"a".to_vec(), 0o644)
1829 .unwrap();
1830 fs.add_file_with_mode(format!("{PROJECT_HOST}/b.js"), b"b".to_vec(), 0o600)
1831 .unwrap();
1832 // A non-matching neighbour MUST NOT contribute.
1833 fs.add_file_with_mode(
1834 format!("{PROJECT_HOST}/keep.txt"),
1835 b"ignored".to_vec(),
1836 0o644,
1837 )
1838 .unwrap();
1839
1840 let project = nested_project();
1841 let workspace = workspace_with(&project);
1842 let outputs = vec![OutputSpec::parse("*.js").unwrap()];
1843
1844 let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1845 assert_eq!(result.len(), 2);
1846 assert_eq!(
1847 paths_of(&result),
1848 BTreeSet::from(["/proj/a.js".to_owned(), "/proj/b.js".to_owned()]),
1849 );
1850 let modes = modes_by_workspace_path(&result);
1851 assert_eq!(
1852 modes.get("/proj/a.js").copied().map(|m| m & 0o7777),
1853 Some(0o644)
1854 );
1855 assert_eq!(
1856 modes.get("/proj/b.js").copied().map(|m| m & 0o7777),
1857 Some(0o600)
1858 );
1859 }
1860
1861 #[test]
1862 fn glob_no_match_returns_empty_contribution() {
1863 let mut fs = MemFilesystem::new();
1864 fs.add_dir(PROJECT_HOST).unwrap();
1865 fs.add_file(format!("{PROJECT_HOST}/only.txt"), b"x".to_vec())
1866 .unwrap();
1867
1868 let project = nested_project();
1869 let workspace = workspace_with(&project);
1870 let outputs = vec![OutputSpec::parse("*.js").unwrap()];
1871
1872 let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1873 assert!(result.is_empty());
1874 }
1875
1876 #[test]
1877 fn glob_nested_double_star_recurses_into_subdirectories() {
1878 let mut fs = MemFilesystem::new();
1879 fs.add_dir(format!("{PROJECT_HOST}/dist")).unwrap();
1880 fs.add_dir(format!("{PROJECT_HOST}/dist/inner")).unwrap();
1881 fs.add_file(format!("{PROJECT_HOST}/dist/top.js"), b"top".to_vec())
1882 .unwrap();
1883 fs.add_file(
1884 format!("{PROJECT_HOST}/dist/inner/deep.js"),
1885 b"deep".to_vec(),
1886 )
1887 .unwrap();
1888 // Outside the `dist/` prefix; MUST NOT be matched.
1889 fs.add_file(format!("{PROJECT_HOST}/other.js"), b"other".to_vec())
1890 .unwrap();
1891
1892 let project = nested_project();
1893 let workspace = workspace_with(&project);
1894 let outputs = vec![OutputSpec::parse("dist/**/*.js").unwrap()];
1895
1896 let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1897 assert_eq!(result.len(), 2);
1898 assert_eq!(
1899 paths_of(&result),
1900 BTreeSet::from([
1901 "/proj/dist/top.js".to_owned(),
1902 "/proj/dist/inner/deep.js".to_owned(),
1903 ]),
1904 );
1905 assert_eq!(
1906 host_paths_of(&result),
1907 BTreeSet::from([
1908 PathBuf::from("/ws/proj/dist/top.js"),
1909 PathBuf::from("/ws/proj/dist/inner/deep.js"),
1910 ]),
1911 );
1912 }
1913
1914 #[test]
1915 fn glob_symlink_to_file_records_link_path_with_target_mode() {
1916 // Mirrors the input-side symlink contract: both the
1917 // real file and the symlink that points at it are
1918 // distinct contributions; each one's mode comes from
1919 // Filesystem::permissions (which follows symlinks),
1920 // so both entries report the target file's mode bits.
1921 let mut fs = MemFilesystem::new();
1922 fs.add_dir(PROJECT_HOST).unwrap();
1923 fs.add_file_with_mode(
1924 format!("{PROJECT_HOST}/real.txt"),
1925 b"real bytes".to_vec(),
1926 0o600,
1927 )
1928 .unwrap();
1929 fs.add_symlink(
1930 format!("{PROJECT_HOST}/link.txt"),
1931 format!("{PROJECT_HOST}/real.txt"),
1932 )
1933 .unwrap();
1934
1935 let project = nested_project();
1936 let workspace = workspace_with(&project);
1937 let outputs = vec![OutputSpec::parse("*.txt").unwrap()];
1938
1939 let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1940 assert_eq!(
1941 result.len(),
1942 2,
1943 "both the real file and the symlink to it are distinct contributions",
1944 );
1945 assert_eq!(
1946 paths_of(&result),
1947 BTreeSet::from(["/proj/real.txt".to_owned(), "/proj/link.txt".to_owned(),]),
1948 );
1949 // Both entries see the target's mode bits, because
1950 // permissions follows the symlink.
1951 for file in &result {
1952 assert_eq!(
1953 file.mode & 0o7777,
1954 0o600,
1955 "{} should report target's mode",
1956 file.workspace_absolute_path,
1957 );
1958 }
1959 }
1960
1961 #[test]
1962 fn implicit_mode_project_relative_literal_is_workspace_absolute() {
1963 // ProjectRoot::WorkspaceRoot: project-relative output
1964 // lands at the workspace root directly, with no extra
1965 // prefix.
1966 let mut fs = MemFilesystem::new();
1967 fs.add_dir(WORKSPACE_HOST).unwrap();
1968 fs.add_file_with_mode(
1969 format!("{WORKSPACE_HOST}/manifest.json"),
1970 b"{}".to_vec(),
1971 0o644,
1972 )
1973 .unwrap();
1974
1975 let project = implicit_project();
1976 let workspace = workspace_with(&project);
1977 let outputs = vec![OutputSpec::parse("manifest.json").unwrap()];
1978
1979 let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1980 assert_eq!(result.len(), 1);
1981 assert_eq!(result[0].workspace_absolute_path, "/manifest.json");
1982 assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/manifest.json"));
1983 assert_eq!(result[0].mode & 0o7777, 0o644);
1984 }
1985
1986 #[test]
1987 fn glob_walk_error_surfaces_output_pattern_resolution_failed() {
1988 // The project root directory does not exist on disk;
1989 // the glob walker's read_dir fails on entry and the
1990 // action's map_walk_error wraps it as
1991 // OutputPatternResolutionFailed (NOT the literal-side
1992 // OutputDeclaredButNotProduced shape, which only
1993 // applies to literal patterns).
1994 let fs = MemFilesystem::new();
1995
1996 let project = nested_project();
1997 let workspace = workspace_with(&project);
1998 let outputs = vec![OutputSpec::parse("*.js").unwrap()];
1999
2000 match resolve_output_files(&fs, &workspace, &project, &outputs) {
2001 Err(RunTaskError::OutputPatternResolutionFailed { root, .. }) => {
2002 assert_eq!(root, PathBuf::from("/ws/proj"));
2003 }
2004 other => panic!("expected OutputPatternResolutionFailed, got {other:?}"),
2005 }
2006 }
2007
2008 #[test]
2009 fn permissions_round_trip_through_set_permissions() {
2010 // Sanity check: a freshly-set mode is faithfully
2011 // surfaced via the output resolver. Guards against
2012 // future MemFilesystem regressions where set_permissions
2013 // and permissions get out of sync.
2014 let mut fs = MemFilesystem::new();
2015 fs.add_dir(PROJECT_HOST).unwrap();
2016 fs.add_file(format!("{PROJECT_HOST}/bin"), b"!".to_vec())
2017 .unwrap();
2018 fs.set_permissions(std::path::Path::new("/ws/proj/bin"), 0o751)
2019 .unwrap();
2020
2021 let project = nested_project();
2022 let workspace = workspace_with(&project);
2023 let outputs = vec![OutputSpec::parse("bin").unwrap()];
2024
2025 let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
2026 assert_eq!(result.len(), 1);
2027 assert_eq!(result[0].mode & 0o7777, 0o751);
2028 }
2029 }
2030
2031 // ---- end-to-end run_task ----
2032
2033 mod end_to_end {
2034 use std::collections::{BTreeMap, BTreeSet};
2035 use std::path::PathBuf;
2036 use std::sync::Mutex;
2037
2038 use nonempty::NonEmpty;
2039
2040 use haz_cache::Cache;
2041 use haz_dag::graph::TaskGraph;
2042 use haz_domain::action::TaskAction;
2043 use haz_domain::env::{EnvSettings, EnvVarName};
2044 use haz_domain::name::{ProjectName, TaskName};
2045 use haz_domain::path::{
2046 CanonicalPath, HazPath, OutputSpec, ProjectRoot, WorkspaceRootPath,
2047 };
2048 use haz_domain::project::Project;
2049 use haz_domain::settings::WorkspaceSettings;
2050 use haz_domain::settings::cache::HashAlgo;
2051 use haz_domain::task::Task;
2052 use haz_domain::task_id::TaskId;
2053 use haz_domain::workspace::Workspace;
2054 use haz_vfs::{Filesystem, FsError, MemFilesystem, WritableFilesystem};
2055
2056 use crate::cache_key::BuildKeyError;
2057 use crate::mock_impl::{MockProcessSpawner, MockSpec};
2058
2059 use super::super::{
2060 CancelledRecord, CompletedRecord, RunContext, RunObserver, RunSource, RunState,
2061 RunTaskError, SkipRecord, run_task,
2062 };
2063 use tokio_util::sync::CancellationToken;
2064
2065 const WORKSPACE_HOST: &str = "/ws";
2066 const PROJECT_HOST: &str = "/ws/proj";
2067
2068 fn env_var(s: &str) -> EnvVarName {
2069 EnvVarName::try_new(s).unwrap()
2070 }
2071
2072 fn task_id_for(project: &str, task: &str) -> TaskId {
2073 TaskId {
2074 project: ProjectName::try_new(project).unwrap(),
2075 task: TaskName::try_new(task).unwrap(),
2076 }
2077 }
2078
2079 fn command(argv: &[&str]) -> TaskAction {
2080 TaskAction::Command(
2081 NonEmpty::from_vec(argv.iter().map(|s| (*s).to_owned()).collect())
2082 .expect("non-empty argv"),
2083 )
2084 }
2085
2086 /// Event captured by [`Recorder`]. The cache-hit branch and
2087 /// the fresh-run branch both flow through the same observer
2088 /// surface, so the captured event stream is mode-agnostic.
2089 #[derive(Debug, Clone, PartialEq, Eq)]
2090 enum RecordedEvent {
2091 Started(TaskId),
2092 Stdout(TaskId, Vec<u8>),
2093 Stderr(TaskId, Vec<u8>),
2094 Finished(TaskId, CompletedRecord),
2095 }
2096
2097 /// Test-double [`RunObserver`]: records every lifecycle
2098 /// callback into an in-memory log the assertions inspect.
2099 #[derive(Default)]
2100 struct Recorder {
2101 events: Mutex<Vec<RecordedEvent>>,
2102 }
2103
2104 impl Recorder {
2105 fn events(&self) -> Vec<RecordedEvent> {
2106 self.events.lock().unwrap().clone()
2107 }
2108 }
2109
2110 impl RunObserver for Recorder {
2111 fn on_task_started(&self, task: &TaskId) {
2112 self.events
2113 .lock()
2114 .unwrap()
2115 .push(RecordedEvent::Started(task.clone()));
2116 }
2117 fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
2118 self.events
2119 .lock()
2120 .unwrap()
2121 .push(RecordedEvent::Stdout(task.clone(), bytes.to_vec()));
2122 }
2123 fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
2124 self.events
2125 .lock()
2126 .unwrap()
2127 .push(RecordedEvent::Stderr(task.clone(), bytes.to_vec()));
2128 }
2129 fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
2130 self.events
2131 .lock()
2132 .unwrap()
2133 .push(RecordedEvent::Finished(task.clone(), record.clone()));
2134 }
2135 fn on_task_skipped(&self, _task: &TaskId, _record: &SkipRecord) {
2136 // run_task never produces a Skipped, so this
2137 // recorder leaves the path unrecorded.
2138 }
2139 fn on_task_cancelled(&self, _task: &TaskId, _record: &CancelledRecord) {
2140 // run_task never produces a Cancelled until the
2141 // spawn-step cancellation flow lands; recorder
2142 // leaves the path unrecorded for now.
2143 }
2144 }
2145
2146 /// Common end-to-end fixture: one `proj:build` task with the
2147 /// supplied action/env/outputs, a workspace + graph + cache
2148 /// backed by a fresh [`MemFilesystem`] rooted at `/ws/proj`.
2149 struct Fixture {
2150 cache: Cache<MemFilesystem>,
2151 workspace: Workspace,
2152 graph: TaskGraph,
2153 task_id: TaskId,
2154 host_env: BTreeMap<EnvVarName, String>,
2155 cancel: CancellationToken,
2156 }
2157
2158 impl Fixture {
2159 fn new(action: TaskAction, env: EnvSettings, outputs: Vec<OutputSpec>) -> Self {
2160 let mut fs = MemFilesystem::new();
2161 fs.add_dir(WORKSPACE_HOST).unwrap();
2162 fs.add_dir(PROJECT_HOST).unwrap();
2163
2164 let task = Task {
2165 name: TaskName::try_new("build").unwrap(),
2166 action,
2167 inputs: Vec::new(),
2168 outputs,
2169 deps: Vec::new(),
2170 weak_deps: Vec::new(),
2171 mutex: None,
2172 env,
2173 };
2174 let project = Project {
2175 name: ProjectName::try_new("proj").unwrap(),
2176 root: ProjectRoot::Nested(
2177 CanonicalPath::from_absolute(&HazPath::parse("/proj").unwrap()).unwrap(),
2178 ),
2179 tags: BTreeSet::new(),
2180 tasks: BTreeMap::from([(task.name.clone(), task)]),
2181 };
2182 let task_id = task_id_for("proj", "build");
2183 let workspace = Workspace {
2184 root: WorkspaceRootPath::try_new(PathBuf::from(WORKSPACE_HOST)).unwrap(),
2185 projects: BTreeMap::from([(project.name.clone(), project)]),
2186 overlays: BTreeMap::new(),
2187 settings: WorkspaceSettings::default(),
2188 };
2189 let graph = TaskGraph {
2190 nodes: BTreeSet::from([task_id.clone()]),
2191 edges: BTreeSet::new(),
2192 };
2193 let cache = Cache::new(fs, std::path::Path::new(WORKSPACE_HOST), HashAlgo::Blake3);
2194 Self {
2195 cache,
2196 workspace,
2197 graph,
2198 task_id,
2199 host_env: BTreeMap::new(),
2200 cancel: CancellationToken::new(),
2201 }
2202 }
2203 }
2204
2205 fn make_ctx<'a>(
2206 fixture: &'a Fixture,
2207 spawner: &'a MockProcessSpawner,
2208 observer: &'a Recorder,
2209 ) -> RunContext<'a, MemFilesystem, MockProcessSpawner, Recorder> {
2210 RunContext {
2211 fs: fixture.cache.fs(),
2212 cache: &fixture.cache,
2213 spawner,
2214 observer,
2215 workspace: &fixture.workspace,
2216 graph: &fixture.graph,
2217 host_env: &fixture.host_env,
2218 algo: HashAlgo::Blake3,
2219 cancel: &fixture.cancel,
2220 }
2221 }
2222
2223 #[tokio::test]
2224 async fn cache_miss_success_records_observer_events_and_persists_manifest() {
2225 let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
2226 let spawner = MockProcessSpawner::new();
2227 spawner.push_spec(MockSpec {
2228 stdout: b"out\n".to_vec(),
2229 stderr: b"err\n".to_vec(),
2230 exit_code: 0,
2231 ..MockSpec::default()
2232 });
2233 let observer = Recorder::default();
2234 let ctx = make_ctx(&fixture, &spawner, &observer);
2235
2236 let outcome = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1_700_000_000)
2237 .await
2238 .expect("baseline run should succeed");
2239
2240 assert_eq!(outcome.source, RunSource::FreshRun);
2241 assert_eq!(outcome.state, RunState::Succeeded);
2242 assert!(outcome.exit_status.is_some_and(|s| s.success()));
2243 assert_eq!(outcome.task, fixture.task_id);
2244
2245 let events = observer.events();
2246 assert_eq!(events.len(), 4, "expected 4 events, got {events:?}");
2247 assert_eq!(events[0], RecordedEvent::Started(fixture.task_id.clone()));
2248 assert_eq!(
2249 events[1],
2250 RecordedEvent::Stdout(fixture.task_id.clone(), b"out\n".to_vec()),
2251 );
2252 assert_eq!(
2253 events[2],
2254 RecordedEvent::Stderr(fixture.task_id.clone(), b"err\n".to_vec()),
2255 );
2256 match &events[3] {
2257 RecordedEvent::Finished(id, finished_outcome) => {
2258 assert_eq!(id, &fixture.task_id);
2259 assert_eq!(finished_outcome, &outcome);
2260 }
2261 other => panic!("expected Finished event, got {other:?}"),
2262 }
2263 }
2264
2265 #[tokio::test]
2266 async fn cache_miss_then_second_call_hits_cache() {
2267 let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
2268 let spawner = MockProcessSpawner::new();
2269 spawner.push_spec(MockSpec {
2270 stdout: b"out\n".to_vec(),
2271 stderr: b"err\n".to_vec(),
2272 exit_code: 0,
2273 ..MockSpec::default()
2274 });
2275
2276 let observer1 = Recorder::default();
2277 let ctx1 = make_ctx(&fixture, &spawner, &observer1);
2278 let first = run_task(&ctx1, &fixture.task_id, &BTreeMap::new(), 1)
2279 .await
2280 .unwrap();
2281 assert_eq!(first.source, RunSource::FreshRun);
2282
2283 // Second call: same key, manifest present, expect hit.
2284 let observer2 = Recorder::default();
2285 let ctx2 = make_ctx(&fixture, &spawner, &observer2);
2286 let second = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
2287 .await
2288 .unwrap();
2289
2290 assert_eq!(second.source, RunSource::CacheHit);
2291 assert_eq!(second.state, RunState::Succeeded);
2292 assert!(second.exit_status.is_none());
2293 assert_eq!(second.stdout_hash, first.stdout_hash);
2294 assert_eq!(second.stderr_hash, first.stderr_hash);
2295
2296 // The hit branch surfaces the recorded streams through
2297 // the observer (EXEC-017).
2298 let events = observer2.events();
2299 assert!(
2300 events.iter().any(|e| matches!(
2301 e,
2302 RecordedEvent::Stdout(_, bytes) if bytes == b"out\n"
2303 )),
2304 "missing recorded stdout in hit-path events: {events:?}",
2305 );
2306 assert!(
2307 events.iter().any(|e| matches!(
2308 e,
2309 RecordedEvent::Stderr(_, bytes) if bytes == b"err\n"
2310 )),
2311 "missing recorded stderr in hit-path events: {events:?}",
2312 );
2313
2314 // The mock was only called once; the second run did NOT
2315 // respawn.
2316 assert_eq!(
2317 spawner.spawns().len(),
2318 1,
2319 "second call must not respawn, got {} total spawns",
2320 spawner.spawns().len(),
2321 );
2322 }
2323
2324 #[tokio::test]
2325 async fn cache_018_failed_run_does_not_store_a_cache_entry() {
2326 // CACHE-018: a run whose process exit status is non-zero
2327 // (or signalled, or timed out) MUST NOT produce a cache
2328 // entry. The cache root MUST remain absent and a follow-up
2329 // invocation MUST observe a fresh run (not a hit) at the
2330 // same key.
2331 let fixture = Fixture::new(command(&["false"]), EnvSettings::default(), Vec::new());
2332 let spawner = MockProcessSpawner::new();
2333 spawner.push_spec(MockSpec {
2334 stdout: Vec::new(),
2335 stderr: b"error\n".to_vec(),
2336 exit_code: 1,
2337 ..MockSpec::default()
2338 });
2339 let observer = Recorder::default();
2340 let ctx = make_ctx(&fixture, &spawner, &observer);
2341
2342 let outcome = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
2343 .await
2344 .unwrap();
2345
2346 assert_eq!(outcome.source, RunSource::FreshRun);
2347 assert_eq!(outcome.state, RunState::Failed);
2348 assert!(outcome.exit_status.is_some_and(|s| !s.success()));
2349
2350 // The captured failure-side stderr still flowed through
2351 // the observer.
2352 assert!(
2353 observer.events().iter().any(|e| matches!(
2354 e,
2355 RecordedEvent::Stderr(_, bytes) if bytes == b"error\n"
2356 )),
2357 "failure-side stderr must reach the observer",
2358 );
2359
2360 // Direct CACHE-018 check: the cache root MUST NOT have
2361 // been created. CACHE-010 mandates lazy creation on first
2362 // store, and a failed run is not a store; the cache
2363 // directory therefore stays absent.
2364 let cache_root_meta = fixture.cache.fs().metadata(fixture.cache.cache_root());
2365 assert!(
2366 matches!(cache_root_meta, Err(FsError::NotFound { .. })),
2367 "cache root must remain absent after a failed run; got {cache_root_meta:?}",
2368 );
2369
2370 // Second call: nothing stored, expect another miss.
2371 spawner.push_spec(MockSpec {
2372 stdout: Vec::new(),
2373 stderr: b"again\n".to_vec(),
2374 exit_code: 1,
2375 ..MockSpec::default()
2376 });
2377 let observer2 = Recorder::default();
2378 let ctx2 = make_ctx(&fixture, &spawner, &observer2);
2379 let outcome2 = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
2380 .await
2381 .unwrap();
2382 assert_eq!(outcome2.source, RunSource::FreshRun);
2383 assert_eq!(spawner.spawns().len(), 2);
2384 }
2385
2386 #[tokio::test]
2387 async fn cache_miss_success_stores_outputs_and_restore_overwrites_target() {
2388 // The task declares one output; pre-write it to
2389 // simulate the command producing it. After a fresh run,
2390 // the cache holds the output; a subsequent hit restores
2391 // the cached bytes even when the workspace copy has been
2392 // tampered with.
2393 let fixture = Fixture::new(
2394 command(&["true"]),
2395 EnvSettings::default(),
2396 vec![OutputSpec::parse("artifact.bin").unwrap()],
2397 );
2398 fixture
2399 .cache
2400 .fs()
2401 .write_file(
2402 std::path::Path::new("/ws/proj/artifact.bin"),
2403 b"artifact-bytes",
2404 )
2405 .unwrap();
2406
2407 let spawner = MockProcessSpawner::new();
2408 spawner.push_spec(MockSpec::default());
2409 let observer = Recorder::default();
2410 let ctx = make_ctx(&fixture, &spawner, &observer);
2411
2412 let first = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
2413 .await
2414 .unwrap();
2415 assert_eq!(first.source, RunSource::FreshRun);
2416 assert_eq!(first.state, RunState::Succeeded);
2417
2418 // Tamper with the target. The hit branch's restore
2419 // (CACHE-019) should overwrite this with the cached
2420 // bytes.
2421 fixture
2422 .cache
2423 .fs()
2424 .write_file(std::path::Path::new("/ws/proj/artifact.bin"), b"garbage")
2425 .unwrap();
2426
2427 let observer2 = Recorder::default();
2428 let ctx2 = make_ctx(&fixture, &spawner, &observer2);
2429 let second = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
2430 .await
2431 .unwrap();
2432 assert_eq!(second.source, RunSource::CacheHit);
2433
2434 let restored_bytes = fixture
2435 .cache
2436 .fs()
2437 .read(std::path::Path::new("/ws/proj/artifact.bin"))
2438 .unwrap();
2439 assert_eq!(restored_bytes, b"artifact-bytes");
2440 }
2441
2442 #[tokio::test]
2443 async fn cache_023_hit_is_observationally_equivalent_to_fresh_run() {
2444 // CACHE-023: a cache hit MUST produce, from the
2445 // perspective of every external observer that consults
2446 // only declared outputs and captured streams, a state
2447 // indistinguishable from a successful fresh run.
2448 //
2449 // Setup: one task with one declared output and a mock
2450 // command that emits non-empty stdout AND stderr. Run
2451 // once (miss -> fresh). Delete the output file so the
2452 // second run cannot accidentally observe stale bytes.
2453 // Run again (hit -> restore). Compare the three
2454 // externally-visible byte streams.
2455 const OUTPUT_BYTES: &[u8] = b"\x00built\x01artifact\xFF";
2456 const STDOUT_BYTES: &[u8] = b"hello from the task\n";
2457 const STDERR_BYTES: &[u8] = b"warning: be careful\n";
2458
2459 let fixture = Fixture::new(
2460 command(&["true"]),
2461 EnvSettings::default(),
2462 vec![OutputSpec::parse("artifact.bin").unwrap()],
2463 );
2464
2465 // Pre-write the workspace file so the fresh run can
2466 // hash it into the cache as the task's "produced"
2467 // output.
2468 let artifact_path = std::path::Path::new("/ws/proj/artifact.bin");
2469 fixture
2470 .cache
2471 .fs()
2472 .write_file(artifact_path, OUTPUT_BYTES)
2473 .unwrap();
2474
2475 let spawner = MockProcessSpawner::new();
2476 spawner.push_spec(MockSpec {
2477 stdout: STDOUT_BYTES.to_vec(),
2478 stderr: STDERR_BYTES.to_vec(),
2479 exit_code: 0,
2480 ..MockSpec::default()
2481 });
2482
2483 // Fresh run.
2484 let observer_fresh = Recorder::default();
2485 let ctx_fresh = make_ctx(&fixture, &spawner, &observer_fresh);
2486 let fresh = run_task(&ctx_fresh, &fixture.task_id, &BTreeMap::new(), 1)
2487 .await
2488 .unwrap();
2489 assert_eq!(fresh.source, RunSource::FreshRun);
2490 assert_eq!(fresh.state, RunState::Succeeded);
2491
2492 let fresh_output = fixture.cache.fs().read(artifact_path).unwrap();
2493 let fresh_stdout = concat_stdout(&observer_fresh.events());
2494 let fresh_stderr = concat_stderr(&observer_fresh.events());
2495
2496 // Overwrite the workspace artefact with distinctive
2497 // garbage so the second run cannot accidentally
2498 // observe stale-but-equal bytes; the assertion below
2499 // proves the restore put the cached bytes back. The
2500 // cache entry under .haz/cache/ is untouched.
2501 fixture
2502 .cache
2503 .fs()
2504 .write_file(artifact_path, b"garbage-should-be-overwritten")
2505 .unwrap();
2506
2507 // Second run: cache hit, restoration only.
2508 let observer_hit = Recorder::default();
2509 let ctx_hit = make_ctx(&fixture, &spawner, &observer_hit);
2510 let hit = run_task(&ctx_hit, &fixture.task_id, &BTreeMap::new(), 2)
2511 .await
2512 .unwrap();
2513 assert_eq!(hit.source, RunSource::CacheHit);
2514 assert_eq!(hit.state, RunState::Succeeded);
2515
2516 let hit_output = fixture.cache.fs().read(artifact_path).unwrap();
2517 let hit_stdout = concat_stdout(&observer_hit.events());
2518 let hit_stderr = concat_stderr(&observer_hit.events());
2519
2520 // The three externally-visible byte sequences MUST
2521 // match the fresh-run values exactly.
2522 assert_eq!(
2523 fresh_output, hit_output,
2524 "CACHE-023: declared output bytes differ between fresh and hit",
2525 );
2526 assert_eq!(
2527 fresh_stdout, hit_stdout,
2528 "CACHE-023: captured stdout differs between fresh and hit",
2529 );
2530 assert_eq!(
2531 fresh_stderr, hit_stderr,
2532 "CACHE-023: captured stderr differs between fresh and hit",
2533 );
2534
2535 // And the bytes themselves agree with the source-of-
2536 // truth values, not some accidentally-equal corruption.
2537 assert_eq!(hit_output, OUTPUT_BYTES);
2538 assert_eq!(hit_stdout, STDOUT_BYTES);
2539 assert_eq!(hit_stderr, STDERR_BYTES);
2540
2541 // Exactly one spawn happened (the fresh run); the hit
2542 // did not re-spawn.
2543 assert_eq!(spawner.spawns().len(), 1);
2544 }
2545
2546 fn concat_stdout(events: &[RecordedEvent]) -> Vec<u8> {
2547 let mut acc = Vec::new();
2548 for event in events {
2549 if let RecordedEvent::Stdout(_, bytes) = event {
2550 acc.extend_from_slice(bytes);
2551 }
2552 }
2553 acc
2554 }
2555
2556 fn concat_stderr(events: &[RecordedEvent]) -> Vec<u8> {
2557 let mut acc = Vec::new();
2558 for event in events {
2559 if let RecordedEvent::Stderr(_, bytes) = event {
2560 acc.extend_from_slice(bytes);
2561 }
2562 }
2563 acc
2564 }
2565
2566 #[tokio::test]
2567 async fn task_not_in_workspace_surfaces_build_key_failed() {
2568 let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
2569 let spawner = MockProcessSpawner::new();
2570 let observer = Recorder::default();
2571 let ctx = make_ctx(&fixture, &spawner, &observer);
2572
2573 let absent = task_id_for("absent_project", "build");
2574
2575 match run_task(&ctx, &absent, &BTreeMap::new(), 1).await {
2576 Err(RunTaskError::BuildKeyFailed {
2577 source: BuildKeyError::TaskNotInWorkspace { task },
2578 }) => assert_eq!(task, absent),
2579 other => panic!("expected BuildKeyFailed(TaskNotInWorkspace), got {other:?}"),
2580 }
2581
2582 // on_task_started fires before the lookup phase: the
2583 // wrapper marks the lifecycle as entered before any
2584 // workspace resolution attempt. on_task_finished does
2585 // NOT fire because no CompletedRecord was produced.
2586 let events = observer.events();
2587 assert_eq!(
2588 events.len(),
2589 1,
2590 "expected one Started event, got {events:?}"
2591 );
2592 assert!(
2593 matches!(events[0], RecordedEvent::Started(ref id) if id == &absent),
2594 "expected Started(absent), got {events:?}",
2595 );
2596 // No spawn either.
2597 assert!(spawner.spawns().is_empty());
2598 }
2599
2600 #[tokio::test]
2601 async fn spawn_plan_env_reflects_cache_008_runtime_view() {
2602 let env = EnvSettings {
2603 from_host: BTreeSet::from([env_var("PATH"), env_var("MISSING")]),
2604 overrides: BTreeMap::from([
2605 (env_var("HAZ_MODE"), "ci".to_owned()),
2606 (env_var("PATH"), "/override/bin".to_owned()),
2607 ]),
2608 };
2609 let mut fixture = Fixture::new(command(&["true"]), env, Vec::new());
2610 fixture
2611 .host_env
2612 .insert(env_var("PATH"), "/usr/bin".to_owned());
2613 fixture
2614 .host_env
2615 .insert(env_var("UNRELATED"), "should-not-appear".to_owned());
2616
2617 let spawner = MockProcessSpawner::new();
2618 spawner.push_spec(MockSpec::default());
2619 let observer = Recorder::default();
2620 let ctx = make_ctx(&fixture, &spawner, &observer);
2621
2622 let _ = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
2623 .await
2624 .unwrap();
2625
2626 let records = spawner.spawns();
2627 assert_eq!(records.len(), 1);
2628 let env_vec = &records[0].plan.env;
2629 let env_names: BTreeSet<String> = env_vec
2630 .iter()
2631 .map(|(k, _)| k.to_str().unwrap().to_owned())
2632 .collect();
2633 // Override-only entries propagate; from_host present
2634 // names propagate (with override winning on
2635 // collision); from_host absent names are excluded;
2636 // unrelated host names are excluded.
2637 assert_eq!(
2638 env_names,
2639 BTreeSet::from(["HAZ_MODE".to_owned(), "PATH".to_owned()]),
2640 );
2641
2642 let path_value = env_vec
2643 .iter()
2644 .find(|(k, _)| k.to_str() == Some("PATH"))
2645 .map(|(_, v)| v.to_str().unwrap().to_owned());
2646 assert_eq!(path_value.as_deref(), Some("/override/bin"));
2647 }
2648 }
2649}