Skip to main content

haz_exec/
run_task.rs

1//! Single-task lifecycle: cache lookup, hit-path restoration, or
2//! miss-path spawn-and-capture with success-only store.
3//!
4//! [`run_task`] is the async entry point. It does no scheduling, no
5//! cap accounting, no mutex acquisition, no cancellation, and emits
6//! no presentation-layer bytes itself: captured streams flow to the
7//! caller through the [`RunObserver`] trait. Concrete observer
8//! implementations distinguish the two `EXEC-016` modes: `live`
9//! (per-line `[project:task] `-tag-prefixed writes to a shared
10//! parent sink) and `buffered` (one contiguous block per stream on
11//! task completion).
12//!
13//! The function composes the cache-key derivation
14//! ([`crate::cache_key::build_cache_key`]) with the
15//! [`crate::process::ProcessSpawner`] trait
16//! into the executor's smallest
17//! end-to-end unit. A scheduler layer combines many [`run_task`]
18//! calls with the workspace's concurrency caps and mutex hold-set
19//! (`EXEC-004..007`).
20//!
21//! # Spec coverage
22//!
23//! - `EXEC-007` step 1 -- key derivation and lookup precede every
24//!   other concern. Mutex acquisition is not implemented by this
25//!   function; that responsibility belongs to a scheduler layer.
26//! - `EXEC-007` step 2 -- cache-hit restoration runs mutex-free.
27//! - `EXEC-009` -- terminal classification: zero exit is
28//!   [`RunState::Succeeded`]; non-zero or signalled is
29//!   [`RunState::Failed`]. A cache hit is [`RunState::Succeeded`].
30//!   The cancelled state of `EXEC-009`'s third option is not
31//!   reachable from this function: cancellation is not implemented
32//!   here.
33//! - `EXEC-016` -- the full `stdout` and `stderr` byte streams of
34//!   every fresh run are captured (the cache stores them per
35//!   `CACHE-012`) and surfaced through [`RunObserver`].
36//! - `EXEC-017` -- on a cache hit, the recorded `stdout` and
37//!   `stderr` flow through [`RunObserver`] with the same
38//!   mode-agnostic signal as a fresh run.
39//! - `CACHE-008` (runtime) -- the spawned process sees only the
40//!   effective env: allow-listed host values plus task-level
41//!   overrides, override wins on collision. The spawn-plan
42//!   builder assembles that env vector; the std backend's
43//!   `env_clear()` ensures the child sees this set and nothing
44//!   else.
45//! - `CACHE-014..016` -- [`haz_cache::Cache::lookup`] folds every
46//!   failure into a clean miss, so no [`RunTaskError`] variant
47//!   covers it.
48//! - `CACHE-017..018` -- the cache store fires only when the fresh
49//!   run's exit status was zero. The store call itself surfaces
50//!   as [`RunTaskError::StoreFailed`].
51//! - `CACHE-019` -- restoration uses [`haz_cache::Cache::restore`],
52//!   surfacing [`haz_cache::RestoredStreams`] through the observer.
53//!
54//! # Out of scope for this function
55//!
56//! - Concurrency caps (`EXEC-004..006`) and the canonical-order
57//!   tie-breaking of `EXEC-003`: scheduler-level concerns.
58//! - Mutex acquisition (`EXEC-007` step 3) and `EXEC-006`
59//!   condition 3: scheduler-level concerns.
60//! - Failure cascade and the `skipped` state (`EXEC-009..011`):
61//!   scheduler-level concerns.
62//! - Cancellation (`EXEC-012..015`): not implemented in this
63//!   function.
64//! - Live-mode per-line tag prefixing and atomic sink writes
65//!   (`EXEC-016` presentation): implemented inside concrete
66//!   [`RunObserver`] types.
67//! - Runtime cycle (`EXEC-019`) and output-overlap (`EXEC-020`)
68//!   detection: scheduler-level concerns.
69
70use std::collections::BTreeMap;
71use std::ffi::OsString;
72use std::io;
73use std::path::{Path, PathBuf};
74
75use snafu::{ResultExt, Snafu};
76use tokio::io::AsyncReadExt;
77use tokio_util::sync::CancellationToken;
78
79use haz_cache::{Cache, Hasher, RestoreError, StoreError, StoreInputs, StoredOutput};
80use haz_dag::graph::TaskGraph;
81use haz_domain::action::{ShellType, TaskAction};
82use haz_domain::env::{EnvSettings, EnvVarName};
83use haz_domain::path::{CanonicalPath, OutputSpec, ParseAbsoluteError, PathPattern, ProjectRoot};
84use haz_domain::project::Project;
85use haz_domain::settings::cache::HashAlgo;
86use haz_domain::task::Task;
87use haz_domain::task_id::TaskId;
88use haz_domain::workspace::Workspace;
89use haz_vfs::{EntryKind, Filesystem, FsError, WritableFilesystem};
90
91use crate::cache_key::{BuildKeyError, PredecessorStreamHashes, build_cache_key};
92use crate::pattern_walk::{
93    GlobMatchAction, GlobWalk, glob_walk_origin, host_path_from_segments,
94    literal_workspace_segments, workspace_absolute_string_from_segments,
95};
96use crate::process::{
97    ExitStatus, Process, ProcessError, ProcessSpawner, Signal, SpawnPlan, Spawned,
98};
99
100/// Where a [`CompletedRecord`]'s success came from.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub enum RunSource {
103    /// The cache held a valid entry for the derived key
104    /// (`CACHE-015`); restoration materialised the outputs and the
105    /// recorded streams were surfaced through the observer.
106    CacheHit,
107    /// The cache missed; the executor spawned the task's command,
108    /// captured its stdout and stderr, and (on success) stored the
109    /// result for future hits.
110    FreshRun,
111}
112
113/// Terminal classification of a single-task run per `EXEC-009`.
114///
115/// The variants exhaustively cover the spec's three terminal
116/// classifications: succeeded, failed, and cancelled-by-executor.
117/// The cancelled variant is produced only by the spawn-step
118/// future when the run-context's cancellation token fires while
119/// the child is in flight; until that wiring lands no
120/// [`run_task`] call returns [`RunState::Cancelled`].
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
122pub enum RunState {
123    /// A fresh run with exit status zero, or a cache hit (a hit
124    /// asserts a prior successful run per `CACHE-018`).
125    Succeeded,
126    /// A fresh run whose command terminated with a non-zero exit
127    /// status or whose process was killed by a signal not initiated
128    /// by the executor.
129    Failed,
130    /// A fresh run whose child was signalled by the executor in
131    /// response to the run's cancellation token firing
132    /// (`EXEC-009` cancelled state, `EXEC-013` step 2). Distinct
133    /// from [`Self::Failed`] so the run summary can tell
134    /// executor-initiated termination apart from a task that
135    /// failed under its own power.
136    Cancelled,
137}
138
139/// Run-record fields produced by one [`run_task`] invocation: the
140/// observation of a task that the scheduler admitted into the
141/// lookup-then-spawn pipeline and that reached a run classification
142/// (`EXEC-009`).
143///
144/// Carries enough information for a scheduler layer to feed the
145/// per-`TaskId` `predecessor_streams` map that
146/// [`crate::cache_key::build_cache_key`] consumes: `stdout_hash` and
147/// `stderr_hash` are always present, whether the run was a cache
148/// hit (sourced from the manifest's recorded hashes per
149/// `CACHE-011`) or a fresh run (computed from the captured byte
150/// buffers under the cache's active [`HashAlgo`]).
151///
152/// A scheduler-level cascade decision produces
153/// [`RunOutcome::Skipped`] instead (`EXEC-010` / `EXEC-011`);
154/// [`run_task`] itself never produces a Skipped because skipped
155/// tasks never enter the pipeline.
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct CompletedRecord {
158    /// The task this record describes.
159    pub task: TaskId,
160    /// Whether the success came from a cache hit or a fresh run.
161    pub source: RunSource,
162    /// Terminal classification per `EXEC-009`.
163    pub state: RunState,
164    /// Process exit status for a fresh run, [`None`] for a cache
165    /// hit (no process ran).
166    pub exit_status: Option<ExitStatus>,
167    /// Hash of the run's captured stdout under the cache's
168    /// [`HashAlgo`]. For [`RunSource::CacheHit`] this is read from
169    /// the manifest; for [`RunSource::FreshRun`] it is computed
170    /// from the captured byte buffer.
171    pub stdout_hash: [u8; 32],
172    /// Hash of the run's captured stderr under the cache's
173    /// [`HashAlgo`].
174    pub stderr_hash: [u8; 32],
175    /// Workspace-absolute paths the task materialised on disk.
176    ///
177    /// Populated only for [`RunState::Succeeded`]; empty for
178    /// [`RunState::Failed`] and [`RunState::Cancelled`]. The
179    /// source is the executor's output-resolution pass for a
180    /// fresh run and [`haz_cache::Manifest::outputs`] for a
181    /// cache hit.
182    ///
183    /// Consumed by the scheduler's runtime DAG validation pass
184    /// (`EXEC-019` runtime cycle detection and `EXEC-020` runtime
185    /// output-overlap detection) to derive runtime producer-
186    /// matching edges and to populate the output-claim tracker.
187    pub materialised_outputs: Vec<CanonicalPath>,
188}
189
190/// Record of a task that the scheduler cascade-skipped because an
191/// upstream task failed (`EXEC-010` / `EXEC-011`).
192///
193/// A skipped task is never started by the executor and never
194/// produces a cache entry. The `cause` field names the root
195/// failing task (NOT an intermediate cascade-skipped predecessor),
196/// so the run summary can attribute every skip to a single
197/// actually-failed task.
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub struct SkipRecord {
200    /// The task that was skipped.
201    pub task: TaskId,
202    /// Reason the task was skipped, naming the originating
203    /// failure.
204    pub cause: SkipCause,
205}
206
207/// Reason a task was cascade-skipped per `EXEC-011`.
208///
209/// The `upstream` field on each variant carries the root failing
210/// task identity, not the immediate hard predecessor: when `A`
211/// fails and the cascade marks `B` (direct hard child) and `C`
212/// (hard child of `B`), both record `upstream = A`.
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub enum SkipCause {
215    /// A transitively-upstream task reached
216    /// [`RunState::Failed`] (per `EXEC-009`: non-zero exit,
217    /// non-executor signal, or executor-imposed timeout).
218    UpstreamFailed {
219        /// Identity of the originally-failed upstream task.
220        upstream: TaskId,
221    },
222    /// A transitively-upstream task surfaced a [`RunTaskError`]
223    /// (executor-level failure: cache derive, spawn, wait,
224    /// stream-read, output-resolution, or store). Distinct from
225    /// [`Self::UpstreamFailed`] so the run summary can tell
226    /// "the upstream task ran and failed" from "the executor
227    /// could not even run the upstream task".
228    UpstreamErrored {
229        /// Identity of the upstream task whose [`run_task`] call
230        /// returned an [`Err`].
231        upstream: TaskId,
232    },
233    /// The task was identified as a member of a runtime cycle
234    /// detected by `EXEC-019`. The scheduler stops admitting
235    /// further tasks once the cycle is observed; cycle members
236    /// still in the ready set never start. The cycle's full node
237    /// set and the offending edge appear in the run's
238    /// [`crate::run_graph::RunGraphOutcome::invariant_violations`]
239    /// diagnostic; this per-task cause attributes the skip to
240    /// that workspace-level event without rewriting the cycle
241    /// node's outcome to a misleading "failed" classification.
242    RuntimeCycle,
243}
244
245/// Record of a task that ended in the executor-initiated
246/// cancelled state per `EXEC-009` / `EXEC-013`.
247///
248/// The three variants distinguish the three structural shapes a
249/// cancellation can take in the run-graph view:
250///
251/// - [`Self::SignaledInFlight`] is the only shape a single-task
252///   spawn-step future can produce: the cancellation token fired
253///   while the child was running, and the executor sent SIGTERM
254///   (and possibly SIGKILL after `EXEC-014`'s grace period).
255/// - [`Self::UpstreamCancelled`] is the cascade counterpart to
256///   [`SkipCause::UpstreamFailed`]: a hard descendant of a
257///   cancelled task is itself cancelled. `upstream` carries the
258///   root cancelled task, not the immediate hard predecessor
259///   (same root-cause attribution as [`SkipRecord`]).
260/// - [`Self::RunCancelled`] covers tasks that were still in
261///   `state.ready` (or whose lookup-step was in flight) when the
262///   scheduler observed the cancellation signal and drained the
263///   admission front. These tasks never made it to the spawn
264///   step; no process was started or signalled.
265#[derive(Debug, Clone, PartialEq, Eq)]
266pub enum CancelledRecord {
267    /// The task was admitted, spawned, and signalled by the
268    /// executor in response to the cancellation token firing.
269    /// `exit_status` records the child's exit (typically
270    /// signal-terminated, but a polite child may exit cleanly
271    /// after SIGTERM); the captured stream hashes are present
272    /// for diagnostic purposes even though `EXEC-015` blocks
273    /// the cache store.
274    SignaledInFlight {
275        /// The task that was signalled.
276        task: TaskId,
277        /// The child's exit status after the signal flow ran to
278        /// completion.
279        exit_status: ExitStatus,
280        /// Hash of captured stdout under the cache's
281        /// [`HashAlgo`], for diagnostic / predecessor-streams use.
282        stdout_hash: [u8; 32],
283        /// Hash of captured stderr under the cache's
284        /// [`HashAlgo`], for diagnostic / predecessor-streams use.
285        stderr_hash: [u8; 32],
286    },
287    /// The task was cascade-cancelled because a transitively-
288    /// upstream task entered the cancelled state. Mirrors
289    /// [`SkipCause::UpstreamFailed`] for the cancellation flow.
290    UpstreamCancelled {
291        /// The task that was cascade-cancelled.
292        task: TaskId,
293        /// The root cancelled task that triggered the cascade.
294        upstream: TaskId,
295    },
296    /// The task was in `state.ready` (or its lookup-step was in
297    /// flight) when the scheduler observed the cancellation
298    /// signal. No process was spawned; the task simply never
299    /// entered the spawn-step pipeline.
300    RunCancelled {
301        /// The task whose admission was drained on cancel.
302        task: TaskId,
303    },
304}
305
306impl CancelledRecord {
307    /// The task this record describes, available on every
308    /// variant.
309    #[must_use]
310    pub fn task(&self) -> &TaskId {
311        match self {
312            Self::SignaledInFlight { task, .. }
313            | Self::UpstreamCancelled { task, .. }
314            | Self::RunCancelled { task } => task,
315        }
316    }
317}
318
319/// Terminal state of one task in a [`crate::run_graph`] invocation.
320///
321/// `Completed` wraps a [`CompletedRecord`] for any task that the
322/// scheduler admitted into the lookup-then-spawn pipeline and
323/// that reached a run classification (`EXEC-009`); `Skipped`
324/// wraps a [`SkipRecord`] for any task the cascade marked
325/// do-not-schedule before admission (`EXEC-010` / `EXEC-011`);
326/// `Cancelled` wraps a [`CancelledRecord`] for any task that
327/// the cancellation flow (`EXEC-012..015`) reached, either
328/// directly (signalled in flight, drained from the ready set)
329/// or via cascade.
330///
331/// [`run_task`] itself returns the bare [`CompletedRecord`]
332/// (never wrapped in this enum) because a single-task lifecycle
333/// has no cascade to inspect.
334#[derive(Debug, Clone, PartialEq, Eq)]
335pub enum RunOutcome {
336    /// A task that ran to a [`CompletedRecord`] (succeeded or
337    /// failed per `EXEC-009`).
338    Completed(CompletedRecord),
339    /// A task the scheduler cascade-skipped before admission.
340    Skipped(SkipRecord),
341    /// A task the cancellation flow caught at any point (in
342    /// flight, on the ready set, or via cascade).
343    Cancelled(CancelledRecord),
344}
345
346impl RunOutcome {
347    /// The task this outcome describes, available on every
348    /// variant.
349    #[must_use]
350    pub fn task(&self) -> &TaskId {
351        match self {
352            Self::Completed(record) => &record.task,
353            Self::Skipped(record) => &record.task,
354            Self::Cancelled(record) => record.task(),
355        }
356    }
357}
358
359/// Observation surface for one [`run_task`] invocation.
360///
361/// The four callbacks define a mode-agnostic surface: byte chunks
362/// flow through [`RunObserver::on_stdout`] and
363/// [`RunObserver::on_stderr`] for both fresh runs and cache hits.
364/// Concrete implementations distinguish the `EXEC-016` presentation
365/// modes:
366///
367/// - A `buffered` observer collects the chunks per task and emits
368///   one contiguous block to the parent process on
369///   [`RunObserver::on_task_finished`].
370/// - A `live` observer prefixes each line with `[project:task] `
371///   and writes it to a shared sink with per-line atomicity.
372///
373/// Implementations MAY use interior mutability; the trait methods
374/// take `&self` so that one observer can serve many concurrent
375/// [`run_task`] calls (the scheduler exercises this via concurrent
376/// invocations).
377pub trait RunObserver {
378    /// Called once at the start of the run, before any cache work.
379    fn on_task_started(&self, task: &TaskId);
380
381    /// One chunk of captured stdout bytes. For a cache hit, called
382    /// exactly once with the recorded stdout in full. For a fresh
383    /// run, the call shape depends on the observer's presentation
384    /// strategy: a buffered observer typically receives a single
385    /// call with the whole captured stdout; a live observer may
386    /// receive many sub-line chunks as bytes flow from the pipe.
387    fn on_stdout(&self, task: &TaskId, bytes: &[u8]);
388
389    /// One chunk of captured stderr bytes. Same call shape as
390    /// [`Self::on_stdout`].
391    fn on_stderr(&self, task: &TaskId, bytes: &[u8]);
392
393    /// Called once at the end of the run with the run-record
394    /// classification. Fires only for tasks the scheduler
395    /// admitted into the lookup-then-spawn pipeline; cascade-
396    /// skipped tasks surface through
397    /// [`Self::on_task_skipped`] instead.
398    fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord);
399
400    /// Called once at the moment the scheduler marks `task`
401    /// cascade-skipped per `EXEC-010` / `EXEC-011`. Fires before
402    /// the next admission round; NEVER paired with
403    /// [`Self::on_task_started`] or [`Self::on_task_finished`]
404    /// for the same task.
405    fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord);
406
407    /// Called once at the moment the cancellation flow records a
408    /// terminal state for `task` per `EXEC-012..015`. The
409    /// [`CancelledRecord`] variant distinguishes the three
410    /// shapes: an in-flight task that was signalled by the
411    /// executor; a cascade descendant of a cancelled task; or a
412    /// task drained from `state.ready` on cancel-fire. For an
413    /// in-flight task, [`Self::on_task_started`] has already
414    /// fired and [`Self::on_task_finished`] does NOT fire; for
415    /// the other two shapes, no other lifecycle callback fires
416    /// for the same task.
417    fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord);
418}
419
420/// Failure modes of [`run_task`].
421///
422/// The variants enumerate only the *executor-level* failures: cases
423/// where the lifecycle could not be carried through to a meaningful
424/// [`CompletedRecord`]. Task-level failures (non-zero exit, signalled
425/// exit) surface as [`Ok(CompletedRecord { state: Failed, .. })`]
426/// per `EXEC-009`, not as an [`Err`] here.
427#[derive(Debug, Snafu)]
428#[snafu(visibility(pub(crate)))]
429pub enum RunTaskError {
430    /// Cache-key derivation failed before lookup could be attempted.
431    #[snafu(display("failed to derive cache key: {source}"))]
432    BuildKeyFailed {
433        /// Originating [`BuildKeyError`].
434        source: BuildKeyError,
435    },
436
437    /// [`haz_cache::Cache::restore`] failed during the cache-hit
438    /// path (`CACHE-019`). Cache misses are not errors per
439    /// `CACHE-016`, so this variant covers only the successful-
440    /// lookup-then-restore-failed shape.
441    #[snafu(display("failed to restore cache entry: {source}"))]
442    RestoreFailed {
443        /// Originating [`RestoreError`].
444        source: RestoreError,
445    },
446
447    /// The [`ProcessSpawner`] refused to start the child (executable
448    /// not found, permission denied, fork failure, etc.). Distinct
449    /// from `EXEC-009`'s failed state: that rule covers commands
450    /// that *did* run; this variant is for commands that never did.
451    #[snafu(display("failed to spawn process: {source}"))]
452    SpawnFailed {
453        /// Originating [`ProcessError`].
454        source: ProcessError,
455    },
456
457    /// [`crate::process::Process::wait`] surfaced an I/O failure
458    /// while reaping the child. Rare on healthy hosts; usually indicates
459    /// that the child was reaped out from under us by a parent
460    /// signal handler or that the OS lost the descriptor.
461    #[snafu(display("failed to wait for spawned process: {source}"))]
462    WaitFailed {
463        /// Originating [`ProcessError`].
464        source: ProcessError,
465    },
466
467    /// Reading the child's stdout or stderr pipe to EOF failed
468    /// during stream capture. The reader task that returned this
469    /// error is the one whose stream did NOT reach EOF; the other
470    /// stream's capture may or may not have completed.
471    #[snafu(display("failed to read captured stream: {source}"))]
472    CapturedStreamReadFailed {
473        /// Which of the two streams the failure came from, for
474        /// diagnostic precision (the underlying [`io::Error`] often
475        /// looks the same on both pipes).
476        stream: CapturedStream,
477        /// Underlying I/O error.
478        source: io::Error,
479    },
480
481    /// Walking the filesystem to resolve a task's `outputs`
482    /// patterns failed (output-side parallel of
483    /// [`BuildKeyError::InputPatternResolutionFailed`]). Distinct
484    /// from [`Self::OutputNotARegularFile`] and
485    /// [`Self::OutputDeclaredButNotProduced`] so the run summary
486    /// can tell "walk failed" from "the task did not honour its
487    /// declared outputs".
488    #[snafu(display(
489        "failed to resolve output patterns under: {}: {source}",
490        root.display()
491    ))]
492    OutputPatternResolutionFailed {
493        /// The absolute path being walked when the failure occurred.
494        root: PathBuf,
495        /// Underlying filesystem error.
496        source: FsError,
497    },
498
499    /// Reading the Unix permission bits of a matched output file
500    /// failed. The bytes themselves are read inside
501    /// [`haz_cache::Cache::store`]; this variant covers only the
502    /// mode lookup the executor performs to build
503    /// [`haz_cache::StoredOutput`].
504    #[snafu(display("failed to read mode of output file: {}: {source}", path.display()))]
505    OutputModeReadFailed {
506        /// The output file whose mode could not be read.
507        path: PathBuf,
508        /// Underlying filesystem error.
509        source: FsError,
510    },
511
512    /// A matched output path is not a regular file (a directory,
513    /// symlink to a non-file, socket, FIFO, etc.). The cache only
514    /// stores regular-file blobs (`CACHE-013`); non-file matches
515    /// cannot be ingested.
516    #[snafu(display("output path is not a regular file: {}", path.display()))]
517    OutputNotARegularFile {
518        /// Absolute path of the offending entry.
519        path: PathBuf,
520    },
521
522    /// A literal output pattern named a path the task did not
523    /// produce (the file does not exist on disk after a successful
524    /// run). Distinct from the input-side
525    /// [`BuildKeyError::InputPatternResolutionFailed`] because the
526    /// caller's intent is the inverse: outputs are a contract the
527    /// task is supposed to honour; a missing literal output is a
528    /// task-level bug, not a filesystem accident.
529    #[snafu(display("task declared output but did not produce it: {}", path.display()))]
530    OutputDeclaredButNotProduced {
531        /// Absolute path of the declared-but-missing output.
532        path: PathBuf,
533    },
534
535    /// [`haz_cache::Cache::store`] failed while persisting a
536    /// successful run as a cache entry (`CACHE-017`).
537    #[snafu(display("failed to store cache entry: {source}"))]
538    StoreFailed {
539        /// Originating [`StoreError`].
540        source: StoreError,
541    },
542
543    /// A workspace-absolute path produced by the executor's
544    /// output-resolution pass failed to parse as a
545    /// [`CanonicalPath`].
546    ///
547    /// Indicates an internal invariant violation: the resolver
548    /// builds each path string from already-validated
549    /// [`haz_domain::path::segment::PathSegment`]s, so the parse
550    /// is expected to succeed unconditionally. The variant exists
551    /// so the failure surfaces typed rather than panicking; in
552    /// practice it should never fire.
553    #[snafu(display("materialised output path is not workspace-absolute: {path}: {source}"))]
554    MaterialisedOutputPathInvalid {
555        /// The offending path string.
556        path: String,
557        /// Originating parse error.
558        source: ParseAbsoluteError,
559    },
560}
561
562/// Which of the two captured byte streams a
563/// [`RunTaskError::CapturedStreamReadFailed`] refers to.
564#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
565pub enum CapturedStream {
566    /// The child's standard output pipe.
567    Stdout,
568    /// The child's standard error pipe.
569    Stderr,
570}
571
572/// Borrowed bundle of long-lived state that every [`run_task`]
573/// invocation in a given `haz` run shares.
574///
575/// The split between this context and the per-task arguments
576/// matches a scheduler's natural shape: one `RunContext` covers
577/// the entire scheduling loop, while `task`, `predecessor_streams`,
578/// and `created_at_unix` change between individual [`run_task`]
579/// calls.
580pub struct RunContext<'a, F, S, O>
581where
582    F: WritableFilesystem,
583    S: ProcessSpawner,
584    O: RunObserver,
585{
586    /// Filesystem the executor reads inputs / outputs / cache state
587    /// through.
588    pub fs: &'a F,
589    /// Cache handle (lookup, restore, store).
590    pub cache: &'a Cache<F>,
591    /// Process spawner (a test-double mock under `cfg(test)`,
592    /// [`crate::std_impl::StdProcessSpawner`] in production).
593    pub spawner: &'a S,
594    /// Observer that receives lifecycle events and captured streams.
595    pub observer: &'a O,
596    /// Validated workspace state.
597    pub workspace: &'a Workspace,
598    /// Validated dependency graph.
599    pub graph: &'a TaskGraph,
600    /// Host-environment snapshot. Names not present here are
601    /// treated as absent per `CACHE-008`.
602    pub host_env: &'a BTreeMap<EnvVarName, String>,
603    /// Active hash algorithm for cache-key derivation, input /
604    /// output content hashing, and captured-stream hashing.
605    pub algo: HashAlgo,
606    /// Cancellation signal for the run (`EXEC-012` triggers).
607    /// The scheduler and per-task spawn-step futures observe
608    /// this token: when it fires, the scheduler stops admitting
609    /// new tasks and in-flight futures send SIGTERM to their
610    /// children. The token is long-lived ambient state owned by
611    /// the run's caller (typically `haz-cli` installing an OS
612    /// signal handler); tests pass a never-cancelled token.
613    pub cancel: &'a CancellationToken,
614}
615
616/// Run one task per its workspace declaration, cache-aware.
617///
618/// Lifecycle:
619///
620/// 1. [`RunObserver::on_task_started`] fires.
621/// 2. [`crate::cache_key::build_cache_key`] derives the key from
622///    `task`'s declared inputs, env contribution, and hard-edge
623///    predecessors' captured stream hashes.
624/// 3. [`haz_cache::Cache::lookup`] is consulted:
625///    - On a hit, [`haz_cache::Cache::restore`] materialises the
626///      manifest's outputs at their workspace-absolute paths;
627///      `stdout` and `stderr` flow through the observer; the
628///      function returns [`RunSource::CacheHit`] with the
629///      manifest's recorded stream hashes.
630///    - On a miss, the function builds a [`crate::process::SpawnPlan`]
631///      from the task's [`haz_domain::action::TaskAction`] and the
632///      effective env (`CACHE-008` runtime view: allow-listed host
633///      values plus task overrides, override wins), spawns through
634///      `spawner`, captures both streams to memory, awaits exit,
635///      emits the streams through the observer, classifies per
636///      `EXEC-009`. On success the outputs are enumerated and
637///      [`haz_cache::Cache::store`] persists the entry. On
638///      failure no store fires.
639/// 4. [`RunObserver::on_task_finished`] fires with the terminal
640///    [`CompletedRecord`].
641///
642/// The function is async because spawning, waiting, and stream
643/// capture all use the tokio runtime. Filesystem and cache calls
644/// are sync and run on the calling worker thread; callers that want
645/// to off-load the cache-key derivation (which hashes input file
646/// bytes) and the store phase (which hashes output file bytes) from
647/// an async context can wrap the function in
648/// [`tokio::task::spawn_blocking`] -- both phases dominate the
649/// runtime cost of a single-task lifecycle.
650///
651/// `created_at_unix` is caller-supplied so the function stays a
652/// pure function of its arguments: tests pin the value, the
653/// scheduler passes the current wall-clock time.
654///
655/// # Errors
656///
657/// Returns a [`RunTaskError`] for any executor-level failure
658/// (cache-key derivation, restore, spawn, wait, stream read,
659/// output resolution, store). Task-level failures (non-zero exit
660/// or signalled exit) are reported as
661/// `Ok(CompletedRecord { state: Failed, .. })` per `EXEC-009`;
662/// the cache is not consulted for store in that branch
663/// (`CACHE-018`).
664/// Carrier for the borrows + computed values
665/// [`cache_lookup_phase`] hands back to the caller (typically the
666/// scheduler in [`crate::run_graph::run_graph`]).
667///
668/// The struct lets `EXEC-007` step 1 (cache lookup) be performed
669/// independently of `EXEC-007` step 2 (restore) and step 3 (spawn);
670/// the scheduler interposes its mutex-compatibility check between
671/// the lookup and the spawn branch.
672///
673/// Lifetime `'ws` ties the `project` and `task_def` borrows to the
674/// workspace borrow held inside the supplying [`RunContext`].
675#[derive(Debug)]
676pub struct TaskLookup<'ws> {
677    /// Bearing project of `task` (looked up once so neither
678    /// downstream branch has to repeat the search).
679    pub project: &'ws Project,
680    /// Bearing task definition (action, mutex, env, etc.).
681    pub task_def: &'ws Task,
682    /// Computed cache key.
683    pub key: haz_cache::CacheKey,
684    /// Manifest of the matching cache entry, if any. `Some(_)`
685    /// drives the restore branch; `None` drives the spawn branch.
686    pub manifest: Option<haz_cache::Manifest>,
687}
688
689/// `EXEC-007` step 1 of the single-task lifecycle: resolve the
690/// bearing project + task, derive the cache key, and consult
691/// the cache. No mutex hold is taken; no observer event fires.
692///
693/// Returns a [`TaskLookup`] the caller drives forward into either
694/// [`restore_from_hit`] (manifest present) or [`run_fresh`]
695/// (manifest absent). The scheduler interposes its `EXEC-006`
696/// condition-3 mutex check between this function and
697/// [`run_fresh`].
698///
699/// # Errors
700///
701/// - [`RunTaskError::BuildKeyFailed`] when the task is missing from
702///   the workspace, or when cache-key derivation fails for any
703///   reason (input pattern resolution, file read, etc.).
704pub fn cache_lookup_phase<'ws, F, S, O>(
705    ctx: &RunContext<'ws, F, S, O>,
706    task: &TaskId,
707    predecessor_streams: &BTreeMap<TaskId, PredecessorStreamHashes>,
708) -> Result<TaskLookup<'ws>, RunTaskError>
709where
710    F: WritableFilesystem,
711    S: ProcessSpawner,
712    O: RunObserver,
713{
714    let project =
715        ctx.workspace
716            .projects
717            .get(&task.project)
718            .ok_or_else(|| RunTaskError::BuildKeyFailed {
719                source: BuildKeyError::TaskNotInWorkspace { task: task.clone() },
720            })?;
721    let task_def = project
722        .tasks
723        .get(&task.task)
724        .ok_or_else(|| RunTaskError::BuildKeyFailed {
725            source: BuildKeyError::TaskNotInWorkspace { task: task.clone() },
726        })?;
727
728    let key = build_cache_key(
729        ctx.fs,
730        ctx.workspace,
731        ctx.graph,
732        task,
733        ctx.host_env,
734        predecessor_streams,
735        ctx.algo,
736    )
737    .context(BuildKeyFailedSnafu)?;
738
739    let manifest = ctx.cache.lookup(&key);
740    Ok(TaskLookup {
741        project,
742        task_def,
743        key,
744        manifest,
745    })
746}
747
748/// Single-task lifecycle entry point: composes
749/// [`cache_lookup_phase`], [`restore_from_hit`], and
750/// [`run_fresh`] without any mutex orchestration.
751///
752/// Treats every task as if its mutex were always compatible.
753/// Callers that need `EXEC-006` condition 3 / `EXEC-007` mutex
754/// semantics (i.e., the scheduler) MUST drive the three phases
755/// directly.
756///
757/// Fires [`RunObserver::on_task_started`] before the lookup step
758/// and [`RunObserver::on_task_finished`] after the record is in
759/// hand.
760///
761/// # Errors
762///
763/// Surfaces the first error encountered across the composed
764/// phases; see the docs on each underlying function for the
765/// variants reachable from each phase.
766pub async fn run_task<F, S, O>(
767    ctx: &RunContext<'_, F, S, O>,
768    task: &TaskId,
769    predecessor_streams: &BTreeMap<TaskId, PredecessorStreamHashes>,
770    created_at_unix: u64,
771) -> Result<CompletedRecord, RunTaskError>
772where
773    F: WritableFilesystem,
774    S: ProcessSpawner,
775    O: RunObserver,
776{
777    ctx.observer.on_task_started(task);
778
779    let lookup = cache_lookup_phase(ctx, task, predecessor_streams)?;
780
781    let record = if let Some(manifest) = lookup.manifest.as_ref() {
782        restore_from_hit(ctx, task, manifest)?
783    } else {
784        run_fresh(
785            ctx,
786            task,
787            lookup.project,
788            lookup.task_def,
789            &lookup.key,
790            created_at_unix,
791        )
792        .await?
793    };
794
795    ctx.observer.on_task_finished(task, &record);
796    Ok(record)
797}
798
799/// Cache-hit branch: restore the manifest's outputs, emit the
800/// recorded streams to the observer, and build the matching
801/// [`CompletedRecord`] (`EXEC-007` step 2, `EXEC-017`,
802/// `CACHE-019`).
803///
804/// Per `MUTEX-007`, this function MUST NOT touch any mutex hold
805/// set: a cache hit reproduces the recorded effect via file
806/// writes and stream emission and never touches the resource the
807/// mutex protects.
808///
809/// Does NOT fire [`RunObserver::on_task_started`] or
810/// [`RunObserver::on_task_finished`]; the caller drives the
811/// observer lifecycle so the start/finish pair stays balanced
812/// across all three phases.
813///
814/// # Errors
815///
816/// - [`RunTaskError::RestoreFailed`] when the cache restore step
817///   fails (a `manifest` field references a blob that cannot be
818///   read, a destination cannot be written, etc.).
819pub fn restore_from_hit<F, S, O>(
820    ctx: &RunContext<'_, F, S, O>,
821    task: &TaskId,
822    manifest: &haz_cache::Manifest,
823) -> Result<CompletedRecord, RunTaskError>
824where
825    F: WritableFilesystem,
826    S: ProcessSpawner,
827    O: RunObserver,
828{
829    let restored = ctx.cache.restore(manifest).context(RestoreFailedSnafu)?;
830    ctx.observer.on_stdout(task, &restored.stdout);
831    ctx.observer.on_stderr(task, &restored.stderr);
832    let materialised_outputs = manifest
833        .outputs
834        .iter()
835        .map(|blob| blob.workspace_absolute_path.clone())
836        .collect();
837    Ok(CompletedRecord {
838        task: task.clone(),
839        source: RunSource::CacheHit,
840        state: RunState::Succeeded,
841        exit_status: None,
842        stdout_hash: manifest.stdout_hash,
843        stderr_hash: manifest.stderr_hash,
844        materialised_outputs,
845    })
846}
847
848/// Cache-miss branch (`EXEC-007` step 3, compatible mutex path):
849/// spawn the task's command, capture both streams, classify per
850/// `EXEC-009`, and on success persist the run to the cache.
851///
852/// Reader futures run concurrently with `process.wait()` so a child
853/// that fills its stdout/stderr pipe buffer never blocks waiting for
854/// the parent to drain (the classic pipe-deadlock shape). Stream
855/// hashes are computed under [`RunContext::algo`] from the captured
856/// byte buffers (`CACHE-011` corollary for fresh runs).
857///
858/// Per `MUTEX-006`, the caller MUST acquire the task's mutex
859/// before invoking this function and MUST release it after the
860/// returned future resolves (regardless of success or failure).
861/// This function does not consult any hold set itself.
862///
863/// Does NOT fire [`RunObserver::on_task_started`] or
864/// [`RunObserver::on_task_finished`]; the caller drives the
865/// observer lifecycle.
866///
867/// # Errors
868///
869/// - [`RunTaskError::SpawnFailed`] when the process spawner
870///   rejects the [`SpawnPlan`].
871/// - [`RunTaskError::WaitFailed`] when waiting on the spawned
872///   process surfaces an OS error.
873/// - [`RunTaskError::CapturedStreamReadFailed`] when reading
874///   either captured stream errors.
875/// - The output-resolution and store error families when the run
876///   succeeded and the cache-store step fails.
877pub async fn run_fresh<F, S, O>(
878    ctx: &RunContext<'_, F, S, O>,
879    task: &TaskId,
880    project: &Project,
881    task_def: &Task,
882    key: &haz_cache::CacheKey,
883    created_at_unix: u64,
884) -> Result<CompletedRecord, RunTaskError>
885where
886    F: WritableFilesystem,
887    S: ProcessSpawner,
888    O: RunObserver,
889{
890    let plan = build_spawn_plan(
891        ctx.workspace.root.as_path(),
892        project,
893        task_def,
894        ctx.host_env,
895    );
896
897    let Spawned {
898        mut process,
899        mut stdout,
900        mut stderr,
901    } = ctx.spawner.spawn(&plan).await.context(SpawnFailedSnafu)?;
902
903    let mut stdout_bytes = Vec::new();
904    let mut stderr_bytes = Vec::new();
905    let grace = ctx.workspace.settings.execution.cancel_grace.as_duration();
906    let ((wait_result, was_cancelled), stdout_result, stderr_result) = tokio::join!(
907        await_exit_with_cancel(&mut process, ctx.cancel, grace),
908        AsyncReadExt::read_to_end(&mut stdout, &mut stdout_bytes),
909        AsyncReadExt::read_to_end(&mut stderr, &mut stderr_bytes),
910    );
911
912    let exit_status = wait_result.context(WaitFailedSnafu)?;
913    stdout_result.context(CapturedStreamReadFailedSnafu {
914        stream: CapturedStream::Stdout,
915    })?;
916    stderr_result.context(CapturedStreamReadFailedSnafu {
917        stream: CapturedStream::Stderr,
918    })?;
919
920    ctx.observer.on_stdout(task, &stdout_bytes);
921    ctx.observer.on_stderr(task, &stderr_bytes);
922
923    let stdout_hash = hash_bytes(ctx.algo, &stdout_bytes);
924    let stderr_hash = hash_bytes(ctx.algo, &stderr_bytes);
925
926    let (state, materialised_outputs) = if was_cancelled {
927        // `EXEC-015`: cancelled runs never hit the store branch
928        // even on a zero exit code; cancellation is neither
929        // success nor a half-success.
930        (RunState::Cancelled, Vec::new())
931    } else if exit_status.success() {
932        let outputs_owned =
933            resolve_output_files(ctx.fs, ctx.workspace, project, &task_def.outputs)?;
934        let materialised = canonical_paths_from_owned(&outputs_owned)?;
935        store_successful_run(
936            ctx,
937            key,
938            &outputs_owned,
939            &stdout_bytes,
940            &stderr_bytes,
941            created_at_unix,
942        )?;
943        (RunState::Succeeded, materialised)
944    } else {
945        (RunState::Failed, Vec::new())
946    };
947
948    Ok(CompletedRecord {
949        task: task.clone(),
950        source: RunSource::FreshRun,
951        state,
952        exit_status: Some(exit_status),
953        stdout_hash,
954        stderr_hash,
955        materialised_outputs,
956    })
957}
958
959/// Wait for the spawned child to exit, observing the run's
960/// cancellation token.
961///
962/// On the normal path the function delegates to [`Process::wait`]
963/// and returns `(Ok(status), false)`. When the cancellation token
964/// fires while the child is still running, the function:
965///
966/// 1. Sends [`Signal::Terminate`] (best-effort; the error is
967///    swallowed so Windows hosts that surface
968///    [`std::io::ErrorKind::Unsupported`] still escalate).
969/// 2. Races a fresh [`Process::wait`] against
970///    [`tokio::time::sleep`] of `grace`.
971/// 3. If the sleep wins, sends [`Signal::Kill`] (best-effort) and
972///    awaits the child.
973///
974/// In both cancellation paths the second element of the returned
975/// tuple is `true`; the caller classifies the run as
976/// [`RunState::Cancelled`]. A `grace` of [`Duration::ZERO`]
977/// collapses the SIGTERM-to-SIGKILL window: SIGKILL fires
978/// immediately after the SIGTERM (`EXEC-014`).
979async fn await_exit_with_cancel<P>(
980    process: &mut P,
981    cancel: &CancellationToken,
982    grace: std::time::Duration,
983) -> (Result<ExitStatus, ProcessError>, bool)
984where
985    P: Process,
986{
987    tokio::select! {
988        biased;
989        () = cancel.cancelled() => {
990            let _ = process.send_signal(Signal::Terminate);
991            tokio::select! {
992                result = process.wait() => (result, true),
993                () = tokio::time::sleep(grace) => {
994                    let _ = process.send_signal(Signal::Kill);
995                    (process.wait().await, true)
996                }
997            }
998        }
999        result = process.wait() => (result, false),
1000    }
1001}
1002
1003/// Persist the successful run as a cache entry under `key`
1004/// (`CACHE-017`, `CACHE-018`).
1005///
1006/// Called only when the fresh run's exit status was zero; the
1007/// caller has already classified the run as succeeded and has
1008/// already resolved the task's declared outputs (so the same
1009/// resolved Vec can drive both the cache store and the
1010/// scheduler's materialised-outputs view).
1011fn store_successful_run<F, S, O>(
1012    ctx: &RunContext<'_, F, S, O>,
1013    key: &haz_cache::CacheKey,
1014    outputs_owned: &[OwnedOutputFile],
1015    stdout: &[u8],
1016    stderr: &[u8],
1017    created_at_unix: u64,
1018) -> Result<(), RunTaskError>
1019where
1020    F: WritableFilesystem,
1021    S: ProcessSpawner,
1022    O: RunObserver,
1023{
1024    let stored_outputs: Vec<StoredOutput<'_>> = outputs_owned
1025        .iter()
1026        .map(|f| StoredOutput {
1027            workspace_absolute_path: &f.workspace_absolute_path,
1028            on_disk_path: &f.on_disk_path,
1029            mode: f.mode,
1030        })
1031        .collect();
1032    let inputs = StoreInputs {
1033        outputs: &stored_outputs,
1034        stdout,
1035        stderr,
1036        created_at_unix,
1037    };
1038    ctx.cache.store(key, &inputs).context(StoreFailedSnafu)
1039}
1040
1041/// Parse the workspace-absolute path string of each
1042/// [`OwnedOutputFile`] into a [`CanonicalPath`] for the
1043/// scheduler's runtime DAG validation pass.
1044///
1045/// The strings come from
1046/// [`pattern_walk::workspace_absolute_string_from_segments`]
1047/// where every segment was validated upstream by
1048/// [`PathSegment`]'s constructors, so the parse should never
1049/// fail in practice; the [`Result`] exists so a hypothetical
1050/// invariant violation surfaces as a typed
1051/// [`RunTaskError::MaterialisedOutputPathInvalid`] rather than
1052/// a panic.
1053fn canonical_paths_from_owned(
1054    files: &[OwnedOutputFile],
1055) -> Result<Vec<CanonicalPath>, RunTaskError> {
1056    files
1057        .iter()
1058        .map(|f| {
1059            CanonicalPath::parse_workspace_absolute(&f.workspace_absolute_path).context(
1060                MaterialisedOutputPathInvalidSnafu {
1061                    path: f.workspace_absolute_path.clone(),
1062                },
1063            )
1064        })
1065        .collect()
1066}
1067
1068/// Hash `bytes` under the active [`HashAlgo`]. Used to compute the
1069/// captured-stream hashes recorded on a fresh-run
1070/// [`CompletedRecord`].
1071fn hash_bytes(algo: HashAlgo, bytes: &[u8]) -> [u8; 32] {
1072    let mut hasher = Hasher::new(algo);
1073    hasher.update(bytes);
1074    hasher.finalize()
1075}
1076
1077/// Assemble the [`SpawnPlan`] that [`run_task`]'s miss branch hands
1078/// to the [`ProcessSpawner`].
1079///
1080/// Three pieces are derived from `task_def`:
1081///
1082/// - `program` and `args` from `task_def.action`. A
1083///   [`TaskAction::Command`] splits as `program = argv.head`,
1084///   `args = argv.tail`. A [`TaskAction::Shell`] becomes
1085///   `program = <shell-binary-name>`, `args = ["-c", script]`.
1086/// - `cwd` is the project's host path: `workspace_host` walked
1087///   through the project root's segments. An implicit-mode
1088///   project ([`ProjectRoot::WorkspaceRoot`], per `DISC-003`) lands
1089///   at `workspace_host` itself with no further pushing.
1090/// - `env` is the effective env per `CACHE-008` runtime view:
1091///   names in `task_def.env.from_host` propagate the value the
1092///   host snapshot recorded for them, names absent from the host
1093///   simply do not appear in the env vector; `task_def.env.overrides`
1094///   entries are layered on top, override winning on collision.
1095///   The std backend's `env_clear()` ensures the child sees this
1096///   set and nothing else.
1097fn build_spawn_plan(
1098    workspace_host: &Path,
1099    project: &Project,
1100    task_def: &Task,
1101    host_env: &BTreeMap<EnvVarName, String>,
1102) -> SpawnPlan {
1103    let (program, args) = program_and_args(&task_def.action);
1104    SpawnPlan {
1105        program,
1106        args,
1107        env: build_env_vec(&task_def.env, host_env),
1108        cwd: project_cwd(workspace_host, &project.root),
1109    }
1110}
1111
1112/// Walk `workspace_host` through the project root's segments. For
1113/// [`ProjectRoot::WorkspaceRoot`] (implicit-mode project per
1114/// `DISC-003`) returns the workspace host path verbatim.
1115fn project_cwd(workspace_host: &Path, project_root: &ProjectRoot) -> PathBuf {
1116    let mut p = workspace_host.to_path_buf();
1117    if let ProjectRoot::Nested(canonical) = project_root {
1118        for seg in canonical.segments() {
1119            p.push(seg.as_str());
1120        }
1121    }
1122    p
1123}
1124
1125/// Convert a [`TaskAction`] into the
1126/// (`program`, `args`) pair the spawner consumes.
1127fn program_and_args(action: &TaskAction) -> (OsString, Vec<OsString>) {
1128    match action {
1129        TaskAction::Command(nonempty_argv) => {
1130            let program = OsString::from(nonempty_argv.head.as_str());
1131            let args = nonempty_argv.tail.iter().map(OsString::from).collect();
1132            (program, args)
1133        }
1134        TaskAction::Shell { script, shell } => {
1135            let program = OsString::from(shell_binary_name(shell));
1136            let args = vec![OsString::from("-c"), OsString::from(script)];
1137            (program, args)
1138        }
1139    }
1140}
1141
1142/// Resolve [`ShellType`] to the binary name passed as `argv[0]` to
1143/// `exec`.
1144///
1145/// First-class variants emit their canonical identifiers (`sh`,
1146/// `bash`); [`ShellType::Other`] emits the validated name verbatim.
1147/// The shell is resolved through `PATH` at exec time, matching how
1148/// `system(3)` and `/bin/sh -c` behave on POSIX hosts.
1149fn shell_binary_name(shell: &ShellType) -> &str {
1150    match shell {
1151        ShellType::Sh => "sh",
1152        ShellType::Bash => "bash",
1153        ShellType::Other(name) => AsRef::<str>::as_ref(name.as_ref()),
1154    }
1155}
1156
1157/// Assemble the effective env vector per `CACHE-008` runtime view.
1158///
1159/// Steps:
1160///
1161/// 1. For each name in `env.from_host`, look it up in `host_env`.
1162///    Present names contribute their host value; absent names
1163///    contribute nothing (the child simply will not see that name).
1164/// 2. For each entry in `env.overrides`, set the name to the
1165///    override value, replacing any same-name entry from step 1
1166///    (`CACHE-008` "overrides wins on collision").
1167///
1168/// Names not in either map do not appear; the std spawner's
1169/// `env_clear()` ensures the child's env is exactly the returned
1170/// set.
1171///
1172/// Order in the returned vector is lexicographic by name (the
1173/// [`BTreeMap`] keeps it so). The std spawner preserves order on
1174/// `Command::env`; either order works on POSIX hosts because the
1175/// child sees a set, not a sequence, but lexicographic ordering
1176/// makes test assertions deterministic.
1177fn build_env_vec(
1178    env: &EnvSettings,
1179    host_env: &BTreeMap<EnvVarName, String>,
1180) -> Vec<(OsString, OsString)> {
1181    let mut effective: BTreeMap<OsString, OsString> = BTreeMap::new();
1182    for name in &env.from_host {
1183        if let Some(value) = host_env.get(name) {
1184            effective.insert(
1185                OsString::from(AsRef::<str>::as_ref(name.as_ref())),
1186                OsString::from(value),
1187            );
1188        }
1189    }
1190    for (name, value) in &env.overrides {
1191        effective.insert(
1192            OsString::from(AsRef::<str>::as_ref(name.as_ref())),
1193            OsString::from(value),
1194        );
1195    }
1196    effective.into_iter().collect()
1197}
1198
1199/// One file resolved from a task's `outputs` patterns, paired with
1200/// the host filesystem path it lives at and the Unix permission bits
1201/// recorded for it.
1202///
1203/// `workspace_absolute_path` is the workspace-anchored string (rooted
1204/// at `/`) the cache library records in the manifest;
1205/// `on_disk_path` is the real host path the cache reads to ingest the
1206/// blob; `mode` is the value [`haz_cache::StoredOutput::mode`]
1207/// receives on store. Borrowed views of these fields drop straight
1208/// into [`haz_cache::StoredOutput`].
1209#[derive(Debug, Clone, PartialEq, Eq)]
1210struct OwnedOutputFile {
1211    workspace_absolute_path: String,
1212    on_disk_path: PathBuf,
1213    mode: u32,
1214}
1215
1216/// Resolve every [`OutputSpec`] declared on a task against `fs` and
1217/// record each match's host path and Unix mode.
1218///
1219/// Output-side mirror of the input resolver in [`crate::cache_key`]:
1220/// a [`PathPattern::Literal`] contributes exactly one entry on a
1221/// regular-file match and a typed failure otherwise; a
1222/// [`PathPattern::Glob`] contributes zero or more entries through
1223/// the [`GlobWalk`] machinery. Errors surface the first failure
1224/// encountered while walking, reading metadata, or reading the
1225/// permission bits; later outputs in the same task are not
1226/// attempted.
1227///
1228/// Diagnostic divergence from the input resolver: a literal output
1229/// that resolves to [`FsError::NotFound`] surfaces as
1230/// [`RunTaskError::OutputDeclaredButNotProduced`] rather than
1231/// [`RunTaskError::OutputPatternResolutionFailed`]. Outputs are a
1232/// contract the task is supposed to honour, so a missing literal
1233/// output is a task-level failure, not a filesystem accident.
1234///
1235/// Symlink semantics follow the input resolver: a symlink-to-file
1236/// contributes the link's own workspace-absolute path, while the
1237/// mode bits come from the canonical target via
1238/// [`Filesystem::permissions`] (which follows symlinks).
1239fn resolve_output_files<F: Filesystem>(
1240    fs: &F,
1241    workspace: &Workspace,
1242    project: &Project,
1243    outputs: &[OutputSpec],
1244) -> Result<Vec<OwnedOutputFile>, RunTaskError> {
1245    let workspace_host = workspace.root.as_path();
1246    let action = OutputAction;
1247    let mut out = Vec::new();
1248
1249    for spec in outputs {
1250        match spec.pattern() {
1251            PathPattern::Literal(haz_path) => {
1252                resolve_literal_output(
1253                    fs,
1254                    workspace_host,
1255                    &project.root,
1256                    haz_path,
1257                    &action,
1258                    &mut out,
1259                )?;
1260            }
1261            PathPattern::Glob(glob_pattern) => {
1262                let glob = glob_pattern.compile();
1263                let matcher = glob.compile_matcher();
1264                let (walk_host, workspace_prefix, candidate_prefix) =
1265                    glob_walk_origin(workspace_host, &project.root, glob_pattern.anchor());
1266                let walker = GlobWalk {
1267                    fs,
1268                    matcher: &matcher,
1269                    candidate_prefix,
1270                    workspace_prefix,
1271                    action: &action,
1272                };
1273                let mut walk_rel: Vec<String> = Vec::new();
1274                walker.walk(&walk_host, &mut walk_rel, &mut out)?;
1275            }
1276        }
1277    }
1278
1279    Ok(out)
1280}
1281
1282/// Resolve a literal [`OutputSpec`] under the supplied project root
1283/// and dispatch to the per-match action on a regular-file match.
1284///
1285/// Splits the failure space three ways:
1286///
1287/// - [`FsError::NotFound`] becomes
1288///   [`RunTaskError::OutputDeclaredButNotProduced`].
1289/// - Any other [`FsError`] from the metadata read becomes
1290///   [`RunTaskError::OutputPatternResolutionFailed`].
1291/// - A non-regular-file kind becomes
1292///   [`RunTaskError::OutputNotARegularFile`].
1293fn resolve_literal_output<F: Filesystem>(
1294    fs: &F,
1295    workspace_host: &Path,
1296    project_root: &ProjectRoot,
1297    haz_path: &haz_domain::path::HazPath,
1298    action: &OutputAction,
1299    out: &mut Vec<OwnedOutputFile>,
1300) -> Result<(), RunTaskError> {
1301    let ws_segments = literal_workspace_segments(haz_path, project_root);
1302    let host = host_path_from_segments(workspace_host, &ws_segments);
1303
1304    let meta = match fs.metadata(&host) {
1305        Ok(m) => m,
1306        Err(FsError::NotFound { path }) => {
1307            return Err(RunTaskError::OutputDeclaredButNotProduced { path });
1308        }
1309        Err(source) => {
1310            return Err(RunTaskError::OutputPatternResolutionFailed { root: host, source });
1311        }
1312    };
1313    if meta.kind != EntryKind::File {
1314        return Err(RunTaskError::OutputNotARegularFile { path: host });
1315    }
1316
1317    let workspace_absolute_path = workspace_absolute_string_from_segments(&ws_segments);
1318    action.on_match(fs, &host, workspace_absolute_path, out)
1319}
1320
1321/// Per-match action for output resolution: record the matched file's
1322/// host path and Unix mode. Walk-level [`FsError`]s surface as
1323/// [`RunTaskError::OutputPatternResolutionFailed`]; mode-read failures
1324/// surface as [`RunTaskError::OutputModeReadFailed`].
1325///
1326/// The action carries no state: the host snapshot, project root,
1327/// and workspace path projection are all threaded through the
1328/// [`GlobWalk`] machinery.
1329struct OutputAction;
1330
1331impl<F: Filesystem> GlobMatchAction<F> for OutputAction {
1332    type Output = OwnedOutputFile;
1333    type Error = RunTaskError;
1334
1335    fn map_walk_error(&self, root: PathBuf, source: FsError) -> RunTaskError {
1336        RunTaskError::OutputPatternResolutionFailed { root, source }
1337    }
1338
1339    fn on_match(
1340        &self,
1341        fs: &F,
1342        host_path: &Path,
1343        workspace_absolute_path: String,
1344        out: &mut Vec<OwnedOutputFile>,
1345    ) -> Result<(), RunTaskError> {
1346        let mode = fs
1347            .permissions(host_path)
1348            .context(OutputModeReadFailedSnafu {
1349                path: host_path.to_path_buf(),
1350            })?;
1351        out.push(OwnedOutputFile {
1352            workspace_absolute_path,
1353            on_disk_path: host_path.to_path_buf(),
1354            mode,
1355        });
1356        Ok(())
1357    }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362    use std::collections::{BTreeMap, BTreeSet};
1363    use std::ffi::OsString;
1364    use std::path::{Path, PathBuf};
1365    use std::str::FromStr;
1366
1367    use nonempty::NonEmpty;
1368
1369    use haz_domain::action::{ShellType, TaskAction};
1370    use haz_domain::env::{EnvSettings, EnvVarName};
1371    use haz_domain::name::{ProjectName, TaskName};
1372    use haz_domain::path::{CanonicalPath, HazPath, InputSpec, ProjectRoot};
1373    use haz_domain::project::Project;
1374    use haz_domain::task::Task;
1375
1376    use super::{build_env_vec, build_spawn_plan, project_cwd, shell_binary_name};
1377
1378    fn env_name(s: &str) -> EnvVarName {
1379        EnvVarName::try_new(s).unwrap()
1380    }
1381
1382    fn project_name(s: &str) -> ProjectName {
1383        ProjectName::try_new(s).unwrap()
1384    }
1385
1386    fn task_name(s: &str) -> TaskName {
1387        TaskName::try_new(s).unwrap()
1388    }
1389
1390    fn nested_project(name: &str, root: &str) -> Project {
1391        Project {
1392            name: project_name(name),
1393            root: ProjectRoot::Nested(
1394                CanonicalPath::from_absolute(&HazPath::parse(root).unwrap()).unwrap(),
1395            ),
1396            tags: BTreeSet::new(),
1397            tasks: BTreeMap::new(),
1398        }
1399    }
1400
1401    fn implicit_project(name: &str) -> Project {
1402        Project {
1403            name: project_name(name),
1404            root: ProjectRoot::WorkspaceRoot,
1405            tags: BTreeSet::new(),
1406            tasks: BTreeMap::new(),
1407        }
1408    }
1409
1410    fn task_with(action: TaskAction, env: EnvSettings) -> Task {
1411        Task {
1412            name: task_name("run"),
1413            action,
1414            inputs: Vec::<InputSpec>::new(),
1415            outputs: Vec::new(),
1416            deps: Vec::new(),
1417            weak_deps: Vec::new(),
1418            mutex: None,
1419            env,
1420        }
1421    }
1422
1423    fn command(argv: &[&str]) -> TaskAction {
1424        TaskAction::Command(
1425            NonEmpty::from_vec(argv.iter().map(|s| (*s).to_owned()).collect())
1426                .expect("non-empty argv"),
1427        )
1428    }
1429
1430    fn shell(script: &str, shell_type: ShellType) -> TaskAction {
1431        TaskAction::Shell {
1432            script: script.to_owned(),
1433            shell: shell_type,
1434        }
1435    }
1436
1437    // ---- shell_binary_name ----
1438
1439    #[test]
1440    fn shell_binary_name_for_sh() {
1441        assert_eq!(shell_binary_name(&ShellType::Sh), "sh");
1442    }
1443
1444    #[test]
1445    fn shell_binary_name_for_bash() {
1446        assert_eq!(shell_binary_name(&ShellType::Bash), "bash");
1447    }
1448
1449    #[test]
1450    fn shell_binary_name_for_other_uses_validated_name() {
1451        let name = haz_domain::action::NonEmptyAsciiName::from_str("zsh").unwrap();
1452        assert_eq!(shell_binary_name(&ShellType::Other(name)), "zsh");
1453    }
1454
1455    // ---- project_cwd ----
1456
1457    #[test]
1458    fn project_cwd_for_nested_appends_segments() {
1459        let p = nested_project("lib_core", "/lib_core");
1460        let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
1461        assert_eq!(cwd, PathBuf::from("/abs/ws/lib_core"));
1462    }
1463
1464    #[test]
1465    fn project_cwd_for_deep_nested_walks_every_segment() {
1466        let p = nested_project("frontend", "/web/frontend");
1467        let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
1468        assert_eq!(cwd, PathBuf::from("/abs/ws/web/frontend"));
1469    }
1470
1471    #[test]
1472    fn project_cwd_for_implicit_mode_is_workspace_host() {
1473        let p = implicit_project("root");
1474        let cwd = project_cwd(Path::new("/abs/ws"), &p.root);
1475        assert_eq!(cwd, PathBuf::from("/abs/ws"));
1476    }
1477
1478    // ---- build_env_vec ----
1479
1480    fn env_settings(from_host: &[&str], overrides: &[(&str, &str)]) -> EnvSettings {
1481        EnvSettings {
1482            from_host: from_host.iter().map(|s| env_name(s)).collect(),
1483            overrides: overrides
1484                .iter()
1485                .map(|(k, v)| (env_name(k), (*v).to_owned()))
1486                .collect(),
1487        }
1488    }
1489
1490    fn host_snapshot(entries: &[(&str, &str)]) -> BTreeMap<EnvVarName, String> {
1491        entries
1492            .iter()
1493            .map(|(k, v)| (env_name(k), (*v).to_owned()))
1494            .collect()
1495    }
1496
1497    fn osstr_pairs(slice: &[(&str, &str)]) -> Vec<(OsString, OsString)> {
1498        slice
1499            .iter()
1500            .map(|(k, v)| (OsString::from(*k), OsString::from(*v)))
1501            .collect()
1502    }
1503
1504    #[test]
1505    fn build_env_vec_from_host_present_name_propagates() {
1506        let settings = env_settings(&["PATH"], &[]);
1507        let host = host_snapshot(&[("PATH", "/usr/bin")]);
1508        let got = build_env_vec(&settings, &host);
1509        assert_eq!(got, osstr_pairs(&[("PATH", "/usr/bin")]));
1510    }
1511
1512    #[test]
1513    fn build_env_vec_from_host_absent_name_is_excluded() {
1514        // CACHE-008: a from_host name absent from the host
1515        // snapshot contributes the `0x00` absent marker to the
1516        // cache key, but does NOT appear in the child's runtime
1517        // env. The child simply does not see the variable.
1518        let settings = env_settings(&["NEVER_SET"], &[]);
1519        let host = host_snapshot(&[("OTHER", "v")]);
1520        let got = build_env_vec(&settings, &host);
1521        assert!(
1522            got.is_empty(),
1523            "absent from_host names must not enter the child env, got {got:?}",
1524        );
1525    }
1526
1527    #[test]
1528    fn build_env_vec_override_wins_on_collision() {
1529        // CACHE-008 "overrides wins on collision": at runtime the
1530        // child sees the override value, not the host value.
1531        let settings = env_settings(&["X"], &[("X", "override-val")]);
1532        let host = host_snapshot(&[("X", "host-val")]);
1533        let got = build_env_vec(&settings, &host);
1534        assert_eq!(got, osstr_pairs(&[("X", "override-val")]));
1535    }
1536
1537    #[test]
1538    fn build_env_vec_override_only_entries_propagate() {
1539        let settings = env_settings(&[], &[("HAZ_MODE", "ci")]);
1540        let host = host_snapshot(&[]);
1541        let got = build_env_vec(&settings, &host);
1542        assert_eq!(got, osstr_pairs(&[("HAZ_MODE", "ci")]));
1543    }
1544
1545    #[test]
1546    fn build_env_vec_unrelated_host_names_do_not_appear() {
1547        // Cache soundness corollary: a host variable NOT named in
1548        // either from_host or overrides MUST NOT enter the child's
1549        // env, even though it sits in the host snapshot.
1550        let settings = env_settings(&["PATH"], &[]);
1551        let host = host_snapshot(&[("PATH", "/usr/bin"), ("HOME", "/home/me")]);
1552        let got = build_env_vec(&settings, &host);
1553        assert_eq!(got, osstr_pairs(&[("PATH", "/usr/bin")]));
1554    }
1555
1556    #[test]
1557    fn build_env_vec_result_is_lexicographic() {
1558        let settings = env_settings(&["ZULU", "ALPHA", "BRAVO"], &[]);
1559        let host = host_snapshot(&[("ALPHA", "a"), ("BRAVO", "b"), ("ZULU", "z")]);
1560        let got = build_env_vec(&settings, &host);
1561        let names: Vec<&OsString> = got.iter().map(|(k, _)| k).collect();
1562        assert_eq!(
1563            names,
1564            vec![
1565                &OsString::from("ALPHA"),
1566                &OsString::from("BRAVO"),
1567                &OsString::from("ZULU"),
1568            ]
1569        );
1570    }
1571
1572    #[test]
1573    fn build_env_vec_empty_value_is_kept() {
1574        // An empty-string value is a legitimate POSIX env entry,
1575        // and distinct from absence (which CACHE-008 separates via
1576        // the 0x00 marker). Make sure we propagate the empty string
1577        // rather than collapsing it to "absent".
1578        let settings = env_settings(&["X"], &[]);
1579        let host = host_snapshot(&[("X", "")]);
1580        let got = build_env_vec(&settings, &host);
1581        assert_eq!(got, osstr_pairs(&[("X", "")]));
1582    }
1583
1584    // ---- build_spawn_plan ----
1585
1586    #[test]
1587    fn build_spawn_plan_command_maps_argv_head_and_tail() {
1588        let p = nested_project("proj", "/proj");
1589        let t = task_with(
1590            command(&["cargo", "build", "--release"]),
1591            EnvSettings::default(),
1592        );
1593        let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
1594
1595        assert_eq!(plan.program, OsString::from("cargo"));
1596        assert_eq!(
1597            plan.args,
1598            vec![OsString::from("build"), OsString::from("--release")],
1599        );
1600    }
1601
1602    #[test]
1603    fn build_spawn_plan_command_with_single_arg_has_empty_args() {
1604        let p = nested_project("proj", "/proj");
1605        let t = task_with(command(&["true"]), EnvSettings::default());
1606        let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
1607
1608        assert_eq!(plan.program, OsString::from("true"));
1609        assert!(plan.args.is_empty());
1610    }
1611
1612    #[test]
1613    fn build_spawn_plan_shell_uses_dash_c_and_script() {
1614        let p = nested_project("proj", "/proj");
1615        let t = task_with(shell("echo hi", ShellType::Sh), EnvSettings::default());
1616        let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
1617
1618        assert_eq!(plan.program, OsString::from("sh"));
1619        assert_eq!(
1620            plan.args,
1621            vec![OsString::from("-c"), OsString::from("echo hi")],
1622        );
1623    }
1624
1625    #[test]
1626    fn build_spawn_plan_shell_other_uses_named_binary() {
1627        let p = nested_project("proj", "/proj");
1628        let other = haz_domain::action::NonEmptyAsciiName::from_str("zsh").unwrap();
1629        let t = task_with(
1630            shell("echo hi", ShellType::Other(other)),
1631            EnvSettings::default(),
1632        );
1633        let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &BTreeMap::new());
1634
1635        assert_eq!(plan.program, OsString::from("zsh"));
1636        assert_eq!(
1637            plan.args,
1638            vec![OsString::from("-c"), OsString::from("echo hi")],
1639        );
1640    }
1641
1642    #[test]
1643    fn build_spawn_plan_sets_cwd_to_project_host_path_for_nested() {
1644        let p = nested_project("frontend", "/web/frontend");
1645        let t = task_with(command(&["true"]), EnvSettings::default());
1646        let plan = build_spawn_plan(Path::new("/abs/ws"), &p, &t, &BTreeMap::new());
1647
1648        assert_eq!(plan.cwd, PathBuf::from("/abs/ws/web/frontend"));
1649    }
1650
1651    #[test]
1652    fn build_spawn_plan_sets_cwd_to_workspace_host_for_implicit_project() {
1653        let p = implicit_project("root");
1654        let t = task_with(command(&["true"]), EnvSettings::default());
1655        let plan = build_spawn_plan(Path::new("/abs/ws"), &p, &t, &BTreeMap::new());
1656
1657        assert_eq!(plan.cwd, PathBuf::from("/abs/ws"));
1658    }
1659
1660    #[test]
1661    fn build_spawn_plan_env_reflects_from_host_overrides_and_excludes_unrelated() {
1662        // End-to-end: feed a representative env config into the
1663        // whole helper and confirm the SpawnPlan.env matches the
1664        // CACHE-008 runtime view exactly.
1665        let settings = env_settings(
1666            &["PATH", "MISSING"],
1667            &[("HAZ_MODE", "ci"), ("PATH", "/override/bin")],
1668        );
1669        let p = nested_project("proj", "/proj");
1670        let t = task_with(command(&["true"]), settings);
1671
1672        let host = host_snapshot(&[("PATH", "/usr/bin"), ("UNRELATED", "should-not-appear")]);
1673        let plan = build_spawn_plan(Path::new("/ws"), &p, &t, &host);
1674
1675        assert_eq!(
1676            plan.env,
1677            osstr_pairs(&[("HAZ_MODE", "ci"), ("PATH", "/override/bin"),]),
1678            "override wins over from_host; UNRELATED and MISSING (absent) are excluded",
1679        );
1680    }
1681
1682    // ---- resolve_output_files ----
1683
1684    mod output_resolution {
1685        use std::collections::{BTreeMap, BTreeSet};
1686        use std::path::PathBuf;
1687
1688        use haz_domain::path::{
1689            CanonicalPath, HazPath, OutputSpec, ProjectRoot, WorkspaceRootPath,
1690        };
1691        use haz_domain::project::Project;
1692        use haz_domain::settings::WorkspaceSettings;
1693        use haz_domain::workspace::Workspace;
1694        use haz_vfs::{MemFilesystem, WritableFilesystem};
1695
1696        use super::super::{OwnedOutputFile, RunTaskError, resolve_output_files};
1697
1698        const WORKSPACE_HOST: &str = "/ws";
1699        const PROJECT_HOST: &str = "/ws/proj";
1700
1701        fn nested_project() -> Project {
1702            Project {
1703                name: haz_domain::name::ProjectName::try_new("proj").unwrap(),
1704                root: ProjectRoot::Nested(
1705                    CanonicalPath::from_absolute(&HazPath::parse("/proj").unwrap()).unwrap(),
1706                ),
1707                tags: BTreeSet::new(),
1708                tasks: BTreeMap::new(),
1709            }
1710        }
1711
1712        fn implicit_project() -> Project {
1713            Project {
1714                name: haz_domain::name::ProjectName::try_new("root").unwrap(),
1715                root: ProjectRoot::WorkspaceRoot,
1716                tags: BTreeSet::new(),
1717                tasks: BTreeMap::new(),
1718            }
1719        }
1720
1721        fn workspace_with(project: &Project) -> Workspace {
1722            let mut projects = BTreeMap::new();
1723            projects.insert(project.name.clone(), project.clone());
1724            Workspace {
1725                root: WorkspaceRootPath::try_new(PathBuf::from(WORKSPACE_HOST)).unwrap(),
1726                projects,
1727                overlays: BTreeMap::new(),
1728                settings: WorkspaceSettings::default(),
1729            }
1730        }
1731
1732        fn paths_of(files: &[OwnedOutputFile]) -> BTreeSet<String> {
1733            files
1734                .iter()
1735                .map(|f| f.workspace_absolute_path.clone())
1736                .collect()
1737        }
1738
1739        fn host_paths_of(files: &[OwnedOutputFile]) -> BTreeSet<PathBuf> {
1740            files.iter().map(|f| f.on_disk_path.clone()).collect()
1741        }
1742
1743        fn modes_by_workspace_path(files: &[OwnedOutputFile]) -> BTreeMap<String, u32> {
1744            files
1745                .iter()
1746                .map(|f| (f.workspace_absolute_path.clone(), f.mode))
1747                .collect()
1748        }
1749
1750        #[test]
1751        fn literal_hit_returns_one_output_with_recorded_mode() {
1752            let mut fs = MemFilesystem::new();
1753            fs.add_dir(PROJECT_HOST).unwrap();
1754            fs.add_file_with_mode(format!("{PROJECT_HOST}/bundle.js"), b"data".to_vec(), 0o640)
1755                .unwrap();
1756
1757            let project = nested_project();
1758            let workspace = workspace_with(&project);
1759            let outputs = vec![OutputSpec::parse("bundle.js").unwrap()];
1760
1761            let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1762            assert_eq!(result.len(), 1);
1763            assert_eq!(result[0].workspace_absolute_path, "/proj/bundle.js");
1764            assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/proj/bundle.js"));
1765            assert_eq!(result[0].mode & 0o7777, 0o640);
1766        }
1767
1768        #[test]
1769        fn literal_workspace_absolute_resolves_under_workspace_root() {
1770            let mut fs = MemFilesystem::new();
1771            fs.add_dir("/ws/dist").unwrap();
1772            fs.add_file_with_mode("/ws/dist/main.js", b"x".to_vec(), 0o755)
1773                .unwrap();
1774            fs.add_dir(PROJECT_HOST).unwrap();
1775
1776            let project = nested_project();
1777            let workspace = workspace_with(&project);
1778            let outputs = vec![OutputSpec::parse("/dist/main.js").unwrap()];
1779
1780            let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1781            assert_eq!(result.len(), 1);
1782            assert_eq!(result[0].workspace_absolute_path, "/dist/main.js");
1783            assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/dist/main.js"));
1784            assert_eq!(result[0].mode & 0o7777, 0o755);
1785        }
1786
1787        #[test]
1788        fn literal_missing_surfaces_output_declared_but_not_produced() {
1789            // Outputs are a contract the task is supposed to honour;
1790            // a missing literal must NOT collapse into the input-side
1791            // PatternResolutionFailed shape.
1792            let mut fs = MemFilesystem::new();
1793            fs.add_dir(PROJECT_HOST).unwrap();
1794
1795            let project = nested_project();
1796            let workspace = workspace_with(&project);
1797            let outputs = vec![OutputSpec::parse("absent.txt").unwrap()];
1798
1799            match resolve_output_files(&fs, &workspace, &project, &outputs) {
1800                Err(RunTaskError::OutputDeclaredButNotProduced { path }) => {
1801                    assert_eq!(path, PathBuf::from("/ws/proj/absent.txt"));
1802                }
1803                other => panic!("expected OutputDeclaredButNotProduced, got {other:?}"),
1804            }
1805        }
1806
1807        #[test]
1808        fn literal_pointing_at_directory_surfaces_output_not_a_regular_file() {
1809            let mut fs = MemFilesystem::new();
1810            fs.add_dir(format!("{PROJECT_HOST}/subdir")).unwrap();
1811
1812            let project = nested_project();
1813            let workspace = workspace_with(&project);
1814            let outputs = vec![OutputSpec::parse("subdir").unwrap()];
1815
1816            match resolve_output_files(&fs, &workspace, &project, &outputs) {
1817                Err(RunTaskError::OutputNotARegularFile { path }) => {
1818                    assert_eq!(path, PathBuf::from("/ws/proj/subdir"));
1819                }
1820                other => panic!("expected OutputNotARegularFile, got {other:?}"),
1821            }
1822        }
1823
1824        #[test]
1825        fn glob_multi_match_collects_every_matching_file_with_its_mode() {
1826            let mut fs = MemFilesystem::new();
1827            fs.add_dir(PROJECT_HOST).unwrap();
1828            fs.add_file_with_mode(format!("{PROJECT_HOST}/a.js"), b"a".to_vec(), 0o644)
1829                .unwrap();
1830            fs.add_file_with_mode(format!("{PROJECT_HOST}/b.js"), b"b".to_vec(), 0o600)
1831                .unwrap();
1832            // A non-matching neighbour MUST NOT contribute.
1833            fs.add_file_with_mode(
1834                format!("{PROJECT_HOST}/keep.txt"),
1835                b"ignored".to_vec(),
1836                0o644,
1837            )
1838            .unwrap();
1839
1840            let project = nested_project();
1841            let workspace = workspace_with(&project);
1842            let outputs = vec![OutputSpec::parse("*.js").unwrap()];
1843
1844            let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1845            assert_eq!(result.len(), 2);
1846            assert_eq!(
1847                paths_of(&result),
1848                BTreeSet::from(["/proj/a.js".to_owned(), "/proj/b.js".to_owned()]),
1849            );
1850            let modes = modes_by_workspace_path(&result);
1851            assert_eq!(
1852                modes.get("/proj/a.js").copied().map(|m| m & 0o7777),
1853                Some(0o644)
1854            );
1855            assert_eq!(
1856                modes.get("/proj/b.js").copied().map(|m| m & 0o7777),
1857                Some(0o600)
1858            );
1859        }
1860
1861        #[test]
1862        fn glob_no_match_returns_empty_contribution() {
1863            let mut fs = MemFilesystem::new();
1864            fs.add_dir(PROJECT_HOST).unwrap();
1865            fs.add_file(format!("{PROJECT_HOST}/only.txt"), b"x".to_vec())
1866                .unwrap();
1867
1868            let project = nested_project();
1869            let workspace = workspace_with(&project);
1870            let outputs = vec![OutputSpec::parse("*.js").unwrap()];
1871
1872            let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1873            assert!(result.is_empty());
1874        }
1875
1876        #[test]
1877        fn glob_nested_double_star_recurses_into_subdirectories() {
1878            let mut fs = MemFilesystem::new();
1879            fs.add_dir(format!("{PROJECT_HOST}/dist")).unwrap();
1880            fs.add_dir(format!("{PROJECT_HOST}/dist/inner")).unwrap();
1881            fs.add_file(format!("{PROJECT_HOST}/dist/top.js"), b"top".to_vec())
1882                .unwrap();
1883            fs.add_file(
1884                format!("{PROJECT_HOST}/dist/inner/deep.js"),
1885                b"deep".to_vec(),
1886            )
1887            .unwrap();
1888            // Outside the `dist/` prefix; MUST NOT be matched.
1889            fs.add_file(format!("{PROJECT_HOST}/other.js"), b"other".to_vec())
1890                .unwrap();
1891
1892            let project = nested_project();
1893            let workspace = workspace_with(&project);
1894            let outputs = vec![OutputSpec::parse("dist/**/*.js").unwrap()];
1895
1896            let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1897            assert_eq!(result.len(), 2);
1898            assert_eq!(
1899                paths_of(&result),
1900                BTreeSet::from([
1901                    "/proj/dist/top.js".to_owned(),
1902                    "/proj/dist/inner/deep.js".to_owned(),
1903                ]),
1904            );
1905            assert_eq!(
1906                host_paths_of(&result),
1907                BTreeSet::from([
1908                    PathBuf::from("/ws/proj/dist/top.js"),
1909                    PathBuf::from("/ws/proj/dist/inner/deep.js"),
1910                ]),
1911            );
1912        }
1913
1914        #[test]
1915        fn glob_symlink_to_file_records_link_path_with_target_mode() {
1916            // Mirrors the input-side symlink contract: both the
1917            // real file and the symlink that points at it are
1918            // distinct contributions; each one's mode comes from
1919            // Filesystem::permissions (which follows symlinks),
1920            // so both entries report the target file's mode bits.
1921            let mut fs = MemFilesystem::new();
1922            fs.add_dir(PROJECT_HOST).unwrap();
1923            fs.add_file_with_mode(
1924                format!("{PROJECT_HOST}/real.txt"),
1925                b"real bytes".to_vec(),
1926                0o600,
1927            )
1928            .unwrap();
1929            fs.add_symlink(
1930                format!("{PROJECT_HOST}/link.txt"),
1931                format!("{PROJECT_HOST}/real.txt"),
1932            )
1933            .unwrap();
1934
1935            let project = nested_project();
1936            let workspace = workspace_with(&project);
1937            let outputs = vec![OutputSpec::parse("*.txt").unwrap()];
1938
1939            let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1940            assert_eq!(
1941                result.len(),
1942                2,
1943                "both the real file and the symlink to it are distinct contributions",
1944            );
1945            assert_eq!(
1946                paths_of(&result),
1947                BTreeSet::from(["/proj/real.txt".to_owned(), "/proj/link.txt".to_owned(),]),
1948            );
1949            // Both entries see the target's mode bits, because
1950            // permissions follows the symlink.
1951            for file in &result {
1952                assert_eq!(
1953                    file.mode & 0o7777,
1954                    0o600,
1955                    "{} should report target's mode",
1956                    file.workspace_absolute_path,
1957                );
1958            }
1959        }
1960
1961        #[test]
1962        fn implicit_mode_project_relative_literal_is_workspace_absolute() {
1963            // ProjectRoot::WorkspaceRoot: project-relative output
1964            // lands at the workspace root directly, with no extra
1965            // prefix.
1966            let mut fs = MemFilesystem::new();
1967            fs.add_dir(WORKSPACE_HOST).unwrap();
1968            fs.add_file_with_mode(
1969                format!("{WORKSPACE_HOST}/manifest.json"),
1970                b"{}".to_vec(),
1971                0o644,
1972            )
1973            .unwrap();
1974
1975            let project = implicit_project();
1976            let workspace = workspace_with(&project);
1977            let outputs = vec![OutputSpec::parse("manifest.json").unwrap()];
1978
1979            let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
1980            assert_eq!(result.len(), 1);
1981            assert_eq!(result[0].workspace_absolute_path, "/manifest.json");
1982            assert_eq!(result[0].on_disk_path, PathBuf::from("/ws/manifest.json"));
1983            assert_eq!(result[0].mode & 0o7777, 0o644);
1984        }
1985
1986        #[test]
1987        fn glob_walk_error_surfaces_output_pattern_resolution_failed() {
1988            // The project root directory does not exist on disk;
1989            // the glob walker's read_dir fails on entry and the
1990            // action's map_walk_error wraps it as
1991            // OutputPatternResolutionFailed (NOT the literal-side
1992            // OutputDeclaredButNotProduced shape, which only
1993            // applies to literal patterns).
1994            let fs = MemFilesystem::new();
1995
1996            let project = nested_project();
1997            let workspace = workspace_with(&project);
1998            let outputs = vec![OutputSpec::parse("*.js").unwrap()];
1999
2000            match resolve_output_files(&fs, &workspace, &project, &outputs) {
2001                Err(RunTaskError::OutputPatternResolutionFailed { root, .. }) => {
2002                    assert_eq!(root, PathBuf::from("/ws/proj"));
2003                }
2004                other => panic!("expected OutputPatternResolutionFailed, got {other:?}"),
2005            }
2006        }
2007
2008        #[test]
2009        fn permissions_round_trip_through_set_permissions() {
2010            // Sanity check: a freshly-set mode is faithfully
2011            // surfaced via the output resolver. Guards against
2012            // future MemFilesystem regressions where set_permissions
2013            // and permissions get out of sync.
2014            let mut fs = MemFilesystem::new();
2015            fs.add_dir(PROJECT_HOST).unwrap();
2016            fs.add_file(format!("{PROJECT_HOST}/bin"), b"!".to_vec())
2017                .unwrap();
2018            fs.set_permissions(std::path::Path::new("/ws/proj/bin"), 0o751)
2019                .unwrap();
2020
2021            let project = nested_project();
2022            let workspace = workspace_with(&project);
2023            let outputs = vec![OutputSpec::parse("bin").unwrap()];
2024
2025            let result = resolve_output_files(&fs, &workspace, &project, &outputs).unwrap();
2026            assert_eq!(result.len(), 1);
2027            assert_eq!(result[0].mode & 0o7777, 0o751);
2028        }
2029    }
2030
2031    // ---- end-to-end run_task ----
2032
2033    mod end_to_end {
2034        use std::collections::{BTreeMap, BTreeSet};
2035        use std::path::PathBuf;
2036        use std::sync::Mutex;
2037
2038        use nonempty::NonEmpty;
2039
2040        use haz_cache::Cache;
2041        use haz_dag::graph::TaskGraph;
2042        use haz_domain::action::TaskAction;
2043        use haz_domain::env::{EnvSettings, EnvVarName};
2044        use haz_domain::name::{ProjectName, TaskName};
2045        use haz_domain::path::{
2046            CanonicalPath, HazPath, OutputSpec, ProjectRoot, WorkspaceRootPath,
2047        };
2048        use haz_domain::project::Project;
2049        use haz_domain::settings::WorkspaceSettings;
2050        use haz_domain::settings::cache::HashAlgo;
2051        use haz_domain::task::Task;
2052        use haz_domain::task_id::TaskId;
2053        use haz_domain::workspace::Workspace;
2054        use haz_vfs::{Filesystem, FsError, MemFilesystem, WritableFilesystem};
2055
2056        use crate::cache_key::BuildKeyError;
2057        use crate::mock_impl::{MockProcessSpawner, MockSpec};
2058
2059        use super::super::{
2060            CancelledRecord, CompletedRecord, RunContext, RunObserver, RunSource, RunState,
2061            RunTaskError, SkipRecord, run_task,
2062        };
2063        use tokio_util::sync::CancellationToken;
2064
2065        const WORKSPACE_HOST: &str = "/ws";
2066        const PROJECT_HOST: &str = "/ws/proj";
2067
2068        fn env_var(s: &str) -> EnvVarName {
2069            EnvVarName::try_new(s).unwrap()
2070        }
2071
2072        fn task_id_for(project: &str, task: &str) -> TaskId {
2073            TaskId {
2074                project: ProjectName::try_new(project).unwrap(),
2075                task: TaskName::try_new(task).unwrap(),
2076            }
2077        }
2078
2079        fn command(argv: &[&str]) -> TaskAction {
2080            TaskAction::Command(
2081                NonEmpty::from_vec(argv.iter().map(|s| (*s).to_owned()).collect())
2082                    .expect("non-empty argv"),
2083            )
2084        }
2085
2086        /// Event captured by [`Recorder`]. The cache-hit branch and
2087        /// the fresh-run branch both flow through the same observer
2088        /// surface, so the captured event stream is mode-agnostic.
2089        #[derive(Debug, Clone, PartialEq, Eq)]
2090        enum RecordedEvent {
2091            Started(TaskId),
2092            Stdout(TaskId, Vec<u8>),
2093            Stderr(TaskId, Vec<u8>),
2094            Finished(TaskId, CompletedRecord),
2095        }
2096
2097        /// Test-double [`RunObserver`]: records every lifecycle
2098        /// callback into an in-memory log the assertions inspect.
2099        #[derive(Default)]
2100        struct Recorder {
2101            events: Mutex<Vec<RecordedEvent>>,
2102        }
2103
2104        impl Recorder {
2105            fn events(&self) -> Vec<RecordedEvent> {
2106                self.events.lock().unwrap().clone()
2107            }
2108        }
2109
2110        impl RunObserver for Recorder {
2111            fn on_task_started(&self, task: &TaskId) {
2112                self.events
2113                    .lock()
2114                    .unwrap()
2115                    .push(RecordedEvent::Started(task.clone()));
2116            }
2117            fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
2118                self.events
2119                    .lock()
2120                    .unwrap()
2121                    .push(RecordedEvent::Stdout(task.clone(), bytes.to_vec()));
2122            }
2123            fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
2124                self.events
2125                    .lock()
2126                    .unwrap()
2127                    .push(RecordedEvent::Stderr(task.clone(), bytes.to_vec()));
2128            }
2129            fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
2130                self.events
2131                    .lock()
2132                    .unwrap()
2133                    .push(RecordedEvent::Finished(task.clone(), record.clone()));
2134            }
2135            fn on_task_skipped(&self, _task: &TaskId, _record: &SkipRecord) {
2136                // run_task never produces a Skipped, so this
2137                // recorder leaves the path unrecorded.
2138            }
2139            fn on_task_cancelled(&self, _task: &TaskId, _record: &CancelledRecord) {
2140                // run_task never produces a Cancelled until the
2141                // spawn-step cancellation flow lands; recorder
2142                // leaves the path unrecorded for now.
2143            }
2144        }
2145
2146        /// Common end-to-end fixture: one `proj:build` task with the
2147        /// supplied action/env/outputs, a workspace + graph + cache
2148        /// backed by a fresh [`MemFilesystem`] rooted at `/ws/proj`.
2149        struct Fixture {
2150            cache: Cache<MemFilesystem>,
2151            workspace: Workspace,
2152            graph: TaskGraph,
2153            task_id: TaskId,
2154            host_env: BTreeMap<EnvVarName, String>,
2155            cancel: CancellationToken,
2156        }
2157
2158        impl Fixture {
2159            fn new(action: TaskAction, env: EnvSettings, outputs: Vec<OutputSpec>) -> Self {
2160                let mut fs = MemFilesystem::new();
2161                fs.add_dir(WORKSPACE_HOST).unwrap();
2162                fs.add_dir(PROJECT_HOST).unwrap();
2163
2164                let task = Task {
2165                    name: TaskName::try_new("build").unwrap(),
2166                    action,
2167                    inputs: Vec::new(),
2168                    outputs,
2169                    deps: Vec::new(),
2170                    weak_deps: Vec::new(),
2171                    mutex: None,
2172                    env,
2173                };
2174                let project = Project {
2175                    name: ProjectName::try_new("proj").unwrap(),
2176                    root: ProjectRoot::Nested(
2177                        CanonicalPath::from_absolute(&HazPath::parse("/proj").unwrap()).unwrap(),
2178                    ),
2179                    tags: BTreeSet::new(),
2180                    tasks: BTreeMap::from([(task.name.clone(), task)]),
2181                };
2182                let task_id = task_id_for("proj", "build");
2183                let workspace = Workspace {
2184                    root: WorkspaceRootPath::try_new(PathBuf::from(WORKSPACE_HOST)).unwrap(),
2185                    projects: BTreeMap::from([(project.name.clone(), project)]),
2186                    overlays: BTreeMap::new(),
2187                    settings: WorkspaceSettings::default(),
2188                };
2189                let graph = TaskGraph {
2190                    nodes: BTreeSet::from([task_id.clone()]),
2191                    edges: BTreeSet::new(),
2192                };
2193                let cache = Cache::new(fs, std::path::Path::new(WORKSPACE_HOST), HashAlgo::Blake3);
2194                Self {
2195                    cache,
2196                    workspace,
2197                    graph,
2198                    task_id,
2199                    host_env: BTreeMap::new(),
2200                    cancel: CancellationToken::new(),
2201                }
2202            }
2203        }
2204
2205        fn make_ctx<'a>(
2206            fixture: &'a Fixture,
2207            spawner: &'a MockProcessSpawner,
2208            observer: &'a Recorder,
2209        ) -> RunContext<'a, MemFilesystem, MockProcessSpawner, Recorder> {
2210            RunContext {
2211                fs: fixture.cache.fs(),
2212                cache: &fixture.cache,
2213                spawner,
2214                observer,
2215                workspace: &fixture.workspace,
2216                graph: &fixture.graph,
2217                host_env: &fixture.host_env,
2218                algo: HashAlgo::Blake3,
2219                cancel: &fixture.cancel,
2220            }
2221        }
2222
2223        #[tokio::test]
2224        async fn cache_miss_success_records_observer_events_and_persists_manifest() {
2225            let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
2226            let spawner = MockProcessSpawner::new();
2227            spawner.push_spec(MockSpec {
2228                stdout: b"out\n".to_vec(),
2229                stderr: b"err\n".to_vec(),
2230                exit_code: 0,
2231                ..MockSpec::default()
2232            });
2233            let observer = Recorder::default();
2234            let ctx = make_ctx(&fixture, &spawner, &observer);
2235
2236            let outcome = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1_700_000_000)
2237                .await
2238                .expect("baseline run should succeed");
2239
2240            assert_eq!(outcome.source, RunSource::FreshRun);
2241            assert_eq!(outcome.state, RunState::Succeeded);
2242            assert!(outcome.exit_status.is_some_and(|s| s.success()));
2243            assert_eq!(outcome.task, fixture.task_id);
2244
2245            let events = observer.events();
2246            assert_eq!(events.len(), 4, "expected 4 events, got {events:?}");
2247            assert_eq!(events[0], RecordedEvent::Started(fixture.task_id.clone()));
2248            assert_eq!(
2249                events[1],
2250                RecordedEvent::Stdout(fixture.task_id.clone(), b"out\n".to_vec()),
2251            );
2252            assert_eq!(
2253                events[2],
2254                RecordedEvent::Stderr(fixture.task_id.clone(), b"err\n".to_vec()),
2255            );
2256            match &events[3] {
2257                RecordedEvent::Finished(id, finished_outcome) => {
2258                    assert_eq!(id, &fixture.task_id);
2259                    assert_eq!(finished_outcome, &outcome);
2260                }
2261                other => panic!("expected Finished event, got {other:?}"),
2262            }
2263        }
2264
2265        #[tokio::test]
2266        async fn cache_miss_then_second_call_hits_cache() {
2267            let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
2268            let spawner = MockProcessSpawner::new();
2269            spawner.push_spec(MockSpec {
2270                stdout: b"out\n".to_vec(),
2271                stderr: b"err\n".to_vec(),
2272                exit_code: 0,
2273                ..MockSpec::default()
2274            });
2275
2276            let observer1 = Recorder::default();
2277            let ctx1 = make_ctx(&fixture, &spawner, &observer1);
2278            let first = run_task(&ctx1, &fixture.task_id, &BTreeMap::new(), 1)
2279                .await
2280                .unwrap();
2281            assert_eq!(first.source, RunSource::FreshRun);
2282
2283            // Second call: same key, manifest present, expect hit.
2284            let observer2 = Recorder::default();
2285            let ctx2 = make_ctx(&fixture, &spawner, &observer2);
2286            let second = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
2287                .await
2288                .unwrap();
2289
2290            assert_eq!(second.source, RunSource::CacheHit);
2291            assert_eq!(second.state, RunState::Succeeded);
2292            assert!(second.exit_status.is_none());
2293            assert_eq!(second.stdout_hash, first.stdout_hash);
2294            assert_eq!(second.stderr_hash, first.stderr_hash);
2295
2296            // The hit branch surfaces the recorded streams through
2297            // the observer (EXEC-017).
2298            let events = observer2.events();
2299            assert!(
2300                events.iter().any(|e| matches!(
2301                    e,
2302                    RecordedEvent::Stdout(_, bytes) if bytes == b"out\n"
2303                )),
2304                "missing recorded stdout in hit-path events: {events:?}",
2305            );
2306            assert!(
2307                events.iter().any(|e| matches!(
2308                    e,
2309                    RecordedEvent::Stderr(_, bytes) if bytes == b"err\n"
2310                )),
2311                "missing recorded stderr in hit-path events: {events:?}",
2312            );
2313
2314            // The mock was only called once; the second run did NOT
2315            // respawn.
2316            assert_eq!(
2317                spawner.spawns().len(),
2318                1,
2319                "second call must not respawn, got {} total spawns",
2320                spawner.spawns().len(),
2321            );
2322        }
2323
2324        #[tokio::test]
2325        async fn cache_018_failed_run_does_not_store_a_cache_entry() {
2326            // CACHE-018: a run whose process exit status is non-zero
2327            // (or signalled, or timed out) MUST NOT produce a cache
2328            // entry. The cache root MUST remain absent and a follow-up
2329            // invocation MUST observe a fresh run (not a hit) at the
2330            // same key.
2331            let fixture = Fixture::new(command(&["false"]), EnvSettings::default(), Vec::new());
2332            let spawner = MockProcessSpawner::new();
2333            spawner.push_spec(MockSpec {
2334                stdout: Vec::new(),
2335                stderr: b"error\n".to_vec(),
2336                exit_code: 1,
2337                ..MockSpec::default()
2338            });
2339            let observer = Recorder::default();
2340            let ctx = make_ctx(&fixture, &spawner, &observer);
2341
2342            let outcome = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
2343                .await
2344                .unwrap();
2345
2346            assert_eq!(outcome.source, RunSource::FreshRun);
2347            assert_eq!(outcome.state, RunState::Failed);
2348            assert!(outcome.exit_status.is_some_and(|s| !s.success()));
2349
2350            // The captured failure-side stderr still flowed through
2351            // the observer.
2352            assert!(
2353                observer.events().iter().any(|e| matches!(
2354                    e,
2355                    RecordedEvent::Stderr(_, bytes) if bytes == b"error\n"
2356                )),
2357                "failure-side stderr must reach the observer",
2358            );
2359
2360            // Direct CACHE-018 check: the cache root MUST NOT have
2361            // been created. CACHE-010 mandates lazy creation on first
2362            // store, and a failed run is not a store; the cache
2363            // directory therefore stays absent.
2364            let cache_root_meta = fixture.cache.fs().metadata(fixture.cache.cache_root());
2365            assert!(
2366                matches!(cache_root_meta, Err(FsError::NotFound { .. })),
2367                "cache root must remain absent after a failed run; got {cache_root_meta:?}",
2368            );
2369
2370            // Second call: nothing stored, expect another miss.
2371            spawner.push_spec(MockSpec {
2372                stdout: Vec::new(),
2373                stderr: b"again\n".to_vec(),
2374                exit_code: 1,
2375                ..MockSpec::default()
2376            });
2377            let observer2 = Recorder::default();
2378            let ctx2 = make_ctx(&fixture, &spawner, &observer2);
2379            let outcome2 = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
2380                .await
2381                .unwrap();
2382            assert_eq!(outcome2.source, RunSource::FreshRun);
2383            assert_eq!(spawner.spawns().len(), 2);
2384        }
2385
2386        #[tokio::test]
2387        async fn cache_miss_success_stores_outputs_and_restore_overwrites_target() {
2388            // The task declares one output; pre-write it to
2389            // simulate the command producing it. After a fresh run,
2390            // the cache holds the output; a subsequent hit restores
2391            // the cached bytes even when the workspace copy has been
2392            // tampered with.
2393            let fixture = Fixture::new(
2394                command(&["true"]),
2395                EnvSettings::default(),
2396                vec![OutputSpec::parse("artifact.bin").unwrap()],
2397            );
2398            fixture
2399                .cache
2400                .fs()
2401                .write_file(
2402                    std::path::Path::new("/ws/proj/artifact.bin"),
2403                    b"artifact-bytes",
2404                )
2405                .unwrap();
2406
2407            let spawner = MockProcessSpawner::new();
2408            spawner.push_spec(MockSpec::default());
2409            let observer = Recorder::default();
2410            let ctx = make_ctx(&fixture, &spawner, &observer);
2411
2412            let first = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
2413                .await
2414                .unwrap();
2415            assert_eq!(first.source, RunSource::FreshRun);
2416            assert_eq!(first.state, RunState::Succeeded);
2417
2418            // Tamper with the target. The hit branch's restore
2419            // (CACHE-019) should overwrite this with the cached
2420            // bytes.
2421            fixture
2422                .cache
2423                .fs()
2424                .write_file(std::path::Path::new("/ws/proj/artifact.bin"), b"garbage")
2425                .unwrap();
2426
2427            let observer2 = Recorder::default();
2428            let ctx2 = make_ctx(&fixture, &spawner, &observer2);
2429            let second = run_task(&ctx2, &fixture.task_id, &BTreeMap::new(), 2)
2430                .await
2431                .unwrap();
2432            assert_eq!(second.source, RunSource::CacheHit);
2433
2434            let restored_bytes = fixture
2435                .cache
2436                .fs()
2437                .read(std::path::Path::new("/ws/proj/artifact.bin"))
2438                .unwrap();
2439            assert_eq!(restored_bytes, b"artifact-bytes");
2440        }
2441
2442        #[tokio::test]
2443        async fn cache_023_hit_is_observationally_equivalent_to_fresh_run() {
2444            // CACHE-023: a cache hit MUST produce, from the
2445            // perspective of every external observer that consults
2446            // only declared outputs and captured streams, a state
2447            // indistinguishable from a successful fresh run.
2448            //
2449            // Setup: one task with one declared output and a mock
2450            // command that emits non-empty stdout AND stderr. Run
2451            // once (miss -> fresh). Delete the output file so the
2452            // second run cannot accidentally observe stale bytes.
2453            // Run again (hit -> restore). Compare the three
2454            // externally-visible byte streams.
2455            const OUTPUT_BYTES: &[u8] = b"\x00built\x01artifact\xFF";
2456            const STDOUT_BYTES: &[u8] = b"hello from the task\n";
2457            const STDERR_BYTES: &[u8] = b"warning: be careful\n";
2458
2459            let fixture = Fixture::new(
2460                command(&["true"]),
2461                EnvSettings::default(),
2462                vec![OutputSpec::parse("artifact.bin").unwrap()],
2463            );
2464
2465            // Pre-write the workspace file so the fresh run can
2466            // hash it into the cache as the task's "produced"
2467            // output.
2468            let artifact_path = std::path::Path::new("/ws/proj/artifact.bin");
2469            fixture
2470                .cache
2471                .fs()
2472                .write_file(artifact_path, OUTPUT_BYTES)
2473                .unwrap();
2474
2475            let spawner = MockProcessSpawner::new();
2476            spawner.push_spec(MockSpec {
2477                stdout: STDOUT_BYTES.to_vec(),
2478                stderr: STDERR_BYTES.to_vec(),
2479                exit_code: 0,
2480                ..MockSpec::default()
2481            });
2482
2483            // Fresh run.
2484            let observer_fresh = Recorder::default();
2485            let ctx_fresh = make_ctx(&fixture, &spawner, &observer_fresh);
2486            let fresh = run_task(&ctx_fresh, &fixture.task_id, &BTreeMap::new(), 1)
2487                .await
2488                .unwrap();
2489            assert_eq!(fresh.source, RunSource::FreshRun);
2490            assert_eq!(fresh.state, RunState::Succeeded);
2491
2492            let fresh_output = fixture.cache.fs().read(artifact_path).unwrap();
2493            let fresh_stdout = concat_stdout(&observer_fresh.events());
2494            let fresh_stderr = concat_stderr(&observer_fresh.events());
2495
2496            // Overwrite the workspace artefact with distinctive
2497            // garbage so the second run cannot accidentally
2498            // observe stale-but-equal bytes; the assertion below
2499            // proves the restore put the cached bytes back. The
2500            // cache entry under .haz/cache/ is untouched.
2501            fixture
2502                .cache
2503                .fs()
2504                .write_file(artifact_path, b"garbage-should-be-overwritten")
2505                .unwrap();
2506
2507            // Second run: cache hit, restoration only.
2508            let observer_hit = Recorder::default();
2509            let ctx_hit = make_ctx(&fixture, &spawner, &observer_hit);
2510            let hit = run_task(&ctx_hit, &fixture.task_id, &BTreeMap::new(), 2)
2511                .await
2512                .unwrap();
2513            assert_eq!(hit.source, RunSource::CacheHit);
2514            assert_eq!(hit.state, RunState::Succeeded);
2515
2516            let hit_output = fixture.cache.fs().read(artifact_path).unwrap();
2517            let hit_stdout = concat_stdout(&observer_hit.events());
2518            let hit_stderr = concat_stderr(&observer_hit.events());
2519
2520            // The three externally-visible byte sequences MUST
2521            // match the fresh-run values exactly.
2522            assert_eq!(
2523                fresh_output, hit_output,
2524                "CACHE-023: declared output bytes differ between fresh and hit",
2525            );
2526            assert_eq!(
2527                fresh_stdout, hit_stdout,
2528                "CACHE-023: captured stdout differs between fresh and hit",
2529            );
2530            assert_eq!(
2531                fresh_stderr, hit_stderr,
2532                "CACHE-023: captured stderr differs between fresh and hit",
2533            );
2534
2535            // And the bytes themselves agree with the source-of-
2536            // truth values, not some accidentally-equal corruption.
2537            assert_eq!(hit_output, OUTPUT_BYTES);
2538            assert_eq!(hit_stdout, STDOUT_BYTES);
2539            assert_eq!(hit_stderr, STDERR_BYTES);
2540
2541            // Exactly one spawn happened (the fresh run); the hit
2542            // did not re-spawn.
2543            assert_eq!(spawner.spawns().len(), 1);
2544        }
2545
2546        fn concat_stdout(events: &[RecordedEvent]) -> Vec<u8> {
2547            let mut acc = Vec::new();
2548            for event in events {
2549                if let RecordedEvent::Stdout(_, bytes) = event {
2550                    acc.extend_from_slice(bytes);
2551                }
2552            }
2553            acc
2554        }
2555
2556        fn concat_stderr(events: &[RecordedEvent]) -> Vec<u8> {
2557            let mut acc = Vec::new();
2558            for event in events {
2559                if let RecordedEvent::Stderr(_, bytes) = event {
2560                    acc.extend_from_slice(bytes);
2561                }
2562            }
2563            acc
2564        }
2565
2566        #[tokio::test]
2567        async fn task_not_in_workspace_surfaces_build_key_failed() {
2568            let fixture = Fixture::new(command(&["true"]), EnvSettings::default(), Vec::new());
2569            let spawner = MockProcessSpawner::new();
2570            let observer = Recorder::default();
2571            let ctx = make_ctx(&fixture, &spawner, &observer);
2572
2573            let absent = task_id_for("absent_project", "build");
2574
2575            match run_task(&ctx, &absent, &BTreeMap::new(), 1).await {
2576                Err(RunTaskError::BuildKeyFailed {
2577                    source: BuildKeyError::TaskNotInWorkspace { task },
2578                }) => assert_eq!(task, absent),
2579                other => panic!("expected BuildKeyFailed(TaskNotInWorkspace), got {other:?}"),
2580            }
2581
2582            // on_task_started fires before the lookup phase: the
2583            // wrapper marks the lifecycle as entered before any
2584            // workspace resolution attempt. on_task_finished does
2585            // NOT fire because no CompletedRecord was produced.
2586            let events = observer.events();
2587            assert_eq!(
2588                events.len(),
2589                1,
2590                "expected one Started event, got {events:?}"
2591            );
2592            assert!(
2593                matches!(events[0], RecordedEvent::Started(ref id) if id == &absent),
2594                "expected Started(absent), got {events:?}",
2595            );
2596            // No spawn either.
2597            assert!(spawner.spawns().is_empty());
2598        }
2599
2600        #[tokio::test]
2601        async fn spawn_plan_env_reflects_cache_008_runtime_view() {
2602            let env = EnvSettings {
2603                from_host: BTreeSet::from([env_var("PATH"), env_var("MISSING")]),
2604                overrides: BTreeMap::from([
2605                    (env_var("HAZ_MODE"), "ci".to_owned()),
2606                    (env_var("PATH"), "/override/bin".to_owned()),
2607                ]),
2608            };
2609            let mut fixture = Fixture::new(command(&["true"]), env, Vec::new());
2610            fixture
2611                .host_env
2612                .insert(env_var("PATH"), "/usr/bin".to_owned());
2613            fixture
2614                .host_env
2615                .insert(env_var("UNRELATED"), "should-not-appear".to_owned());
2616
2617            let spawner = MockProcessSpawner::new();
2618            spawner.push_spec(MockSpec::default());
2619            let observer = Recorder::default();
2620            let ctx = make_ctx(&fixture, &spawner, &observer);
2621
2622            let _ = run_task(&ctx, &fixture.task_id, &BTreeMap::new(), 1)
2623                .await
2624                .unwrap();
2625
2626            let records = spawner.spawns();
2627            assert_eq!(records.len(), 1);
2628            let env_vec = &records[0].plan.env;
2629            let env_names: BTreeSet<String> = env_vec
2630                .iter()
2631                .map(|(k, _)| k.to_str().unwrap().to_owned())
2632                .collect();
2633            // Override-only entries propagate; from_host present
2634            // names propagate (with override winning on
2635            // collision); from_host absent names are excluded;
2636            // unrelated host names are excluded.
2637            assert_eq!(
2638                env_names,
2639                BTreeSet::from(["HAZ_MODE".to_owned(), "PATH".to_owned()]),
2640            );
2641
2642            let path_value = env_vec
2643                .iter()
2644                .find(|(k, _)| k.to_str() == Some("PATH"))
2645                .map(|(_, v)| v.to_str().unwrap().to_owned());
2646            assert_eq!(path_value.as_deref(), Some("/override/bin"));
2647        }
2648    }
2649}