Skip to main content

rivet/pipeline/
validate_manifest.rs

1//! **Layer: Observability**
2//!
3//! Manifest-aware verification for `--validate` (ADR-0012 §M5 / §M6,
4//! constrained by the ADR-0013 trust-flag contract that says: no new flags;
5//! manifest-aware checks live under the existing `--validate`).
6//!
7//! Although this module reads from a `Destination` (technically L2 surface),
8//! it makes **no execution decisions**: it does not write data, advance
9//! cursors, mutate state, or change the pipeline path.  Its only output is
10//! a structured `ManifestVerification` verdict the run report renders.
11//! Per ADR-0003, that places it firmly in L4 Observability — the
12//! destination read surface is just the carrier.
13//!
14//! Inputs (read-only):
15//! - the destination's `manifest.json` body
16//! - the destination's `_SUCCESS` body, if present
17//! - the listing of every object under the destination prefix
18//!
19//! Outputs:
20//! - [`ManifestVerification`] — a structured verdict the run report renders
21//!   into the operator-facing "Verdicts" section.
22//!
23//! Out of scope here:
24//! - per-file row-count check (that runs *during* the export, against the
25//!   local temp file before upload — see `pipeline::validate::validate_output`).
26//! - source-side reconciliation (lives in [`pipeline::reconcile_cmd`] and
27//!   is what `--reconcile` adds on top of this).
28//! - re-fingerprinting parts (`--validate --deep`, future).
29//!
30//! Failure modes are explicit: each check produces a `Failure` enum variant
31//! that is rendered verbatim in `summary.json` so an Airflow / CI consumer
32//! can branch on the kind, not parse strings.
33
34use serde::{Deserialize, Serialize};
35
36use crate::destination::Destination;
37use crate::error::Result;
38use crate::manifest::{
39    MANIFEST_FILENAME, RunManifest, SUCCESS_FILENAME, join_key, parse_success_marker,
40    success_marker_body,
41};
42use crate::pipeline::manifest_reconcile::{PartPresence, reconcile_manifest_against_listing};
43
44/// Upper bound on a destination control artifact (`manifest.json`) the read
45/// path will materialise into memory.  A `manifest.json` is metadata — a few
46/// KB to low single-digit MB even for very large datasets — so 64 MiB is far
47/// above any legitimate body while still bounding the blast radius.
48///
49/// Security (V21, CWE-400): the manifest readers `head()` an object then read
50/// its full body into a `Vec<u8>`.  An attacker who can write the destination
51/// prefix (a shared bucket prefix, a world-writable export dir) can plant a
52/// multi-GB `manifest.json`; an unbounded read would OOM the next `--resume`,
53/// `--validate`, or `rivet repair`.  [`read_capped`] consults the size the
54/// `head()` already reports and bails before the read when it exceeds this cap.
55pub(crate) const MANIFEST_MAX_BYTES: u64 = 64 * 1024 * 1024;
56
57/// How deep a `rivet validate` pass goes — a graded verify layer over the
58/// same checks, letting an operator trade thoroughness for latency / cost.
59///
60/// The variants are a strict superset chain: `Light ⊂ Sample ⊂ Full`.  Each
61/// level runs every check the level below it does, plus more.  Defined here
62/// (the pipeline layer) and re-exported for the CLI grammar so the **same**
63/// enum gates the checks in [`verify_at_destination`] and parses on the
64/// `--depth` flag — no CLI→pipeline back-dependency.
65///
66/// - **Light**: manifest read + self-consistency + `_SUCCESS` only.  Skips the
67///   `list_prefix` reconcile (no per-part presence/size/checksum) and the
68///   untracked-surplus scan, leaving `parts_verified = 0`.  One `head` + one
69///   `read` of `manifest.json` and `_SUCCESS` — a fast "is this prefix a
70///   complete, marked run?" poll with no prefix listing.
71/// - **Sample**: everything Light does **plus** the part reconcile and
72///   untracked surplus (one `list_prefix`).  This is the pre-graded behaviour
73///   minus the Form B value re-read — full structural verification with no
74///   part downloads.
75/// - **Full** (default): everything Sample does **plus** the Form B value-
76///   checksum re-read (re-reads parts, re-derives per-column checksums).  The
77///   most thorough and the only level that downloads part bodies.  Equivalent
78///   to the pre-graded behaviour, so existing callers are unchanged.
79#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq, Default)]
80pub enum ValidateDepth {
81    /// Manifest read + self-consistency + `_SUCCESS` only (no prefix listing).
82    Light,
83    /// Light + part reconcile + untracked surplus (one `list_prefix`).
84    Sample,
85    /// Sample + the Form B value-checksum re-read (downloads parts).
86    #[default]
87    Full,
88}
89
90impl ValidateDepth {
91    /// True iff this level runs the `list_prefix` reconcile (part presence,
92    /// size, checksum) and the untracked-surplus scan — i.e. anything above
93    /// `Light`.  The single predicate the section-3/5 depth gating keys off.
94    fn runs_part_reconcile(self) -> bool {
95        !matches!(self, ValidateDepth::Light)
96    }
97
98    /// True iff this level downloads part *bodies* — the CDC `__pos` continuity
99    /// check and the Form-B value-checksum re-read, both `Full`-only (the
100    /// `part_reconcile` level above only reads listing metadata). The single
101    /// predicate the section-3/5 part-download gating keys off, parallel to
102    /// [`Self::runs_part_reconcile`] — so adding a depth level edits the enum,
103    /// not the call sites in `validate_cmd`.
104    pub(crate) fn runs_part_download(self) -> bool {
105        matches!(self, ValidateDepth::Full)
106    }
107
108    /// Stable operator-/wire-facing label for the depth a verdict was produced
109    /// at, surfaced in `summary.json` (`depth_level`) and `rivet validate`
110    /// output.
111    pub fn label(self) -> &'static str {
112        match self {
113            ValidateDepth::Light => "light",
114            ValidateDepth::Sample => "sample",
115            ValidateDepth::Full => "full",
116        }
117    }
118}
119
120/// Read `key` into memory only if its `head()`-reported size is within
121/// `max_bytes`; otherwise bail without reading a single byte.
122///
123/// The single enforcement point for the V21 (CWE-400) manifest-read cap shared
124/// by the three control-artifact readers (`--resume` M8 preamble, `--validate`,
125/// `rivet repair`).  Each previously did `head()` then an uncapped `read()`,
126/// discarding the size `head()` already returned; routing through here closes
127/// that gap in one place.
128///
129/// Behaviour:
130/// - object absent (`head` → `None`): `Err` — callers invoke this only after
131///   establishing the object exists, so an absent object here is a hard error,
132///   not the benign "no manifest / legacy prefix" case (which the callers
133///   detect with their own `head()` first).
134/// - oversized (`size_bytes > max_bytes`): `Err` naming the cap, **before** any
135///   body is materialised.
136/// - otherwise: the full body via [`Destination::read`].
137pub(crate) fn read_capped(dest: &dyn Destination, key: &str, max_bytes: u64) -> Result<Vec<u8>> {
138    match dest.head(key)? {
139        None => anyhow::bail!("'{key}' not found at the destination"),
140        Some(meta) => {
141            if meta.size_bytes > max_bytes {
142                anyhow::bail!(
143                    "'{key}' is {} bytes, exceeding the {max_bytes}-byte control-artifact \
144                     read cap — refusing to load it into memory (possible tampering)",
145                    meta.size_bytes
146                );
147            }
148            dest.read(key)
149        }
150    }
151}
152
153/// Outcome of a single `--validate` pass over a destination prefix.
154///
155/// Stable enough to be embedded in `summary.json` directly (see
156/// `pipeline::report::ValidationOutcome`).  Forward-compat: consumers MUST
157/// ignore unknown fields (no `deny_unknown_fields`).
158#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
159pub struct ManifestVerification {
160    /// True iff a `manifest.json` was found at the destination and parsed.
161    /// `false` triggers ADR-0012 M6 fallback (legacy run); higher-level
162    /// check results below are then "skipped" rather than "passed".
163    pub manifest_found: bool,
164    /// Mirrors ADR-0012 M6's required `legacy_run` operator-facing label.
165    pub legacy_run: bool,
166    /// Manifest parts whose presence and recorded `size_bytes` were
167    /// confirmed at the destination.  0 when no manifest was found.
168    pub parts_verified: usize,
169    /// Subset of `parts_verified` whose **content** was confirmed via an MD5
170    /// the store surfaced in its listing (no download) — the rest are size-only.
171    /// Lets `passed: true` say how much of the dataset was content-checked
172    /// rather than implying all of it was.  `#[serde(default)]` for back-compat.
173    #[serde(default)]
174    pub parts_md5_verified: usize,
175    /// Manifest parts that were declared `committed` but not actually
176    /// present, present at a different size, or otherwise mismatched.
177    pub parts_failed: usize,
178    /// True iff `_SUCCESS` exists at the destination AND its body matches
179    /// the fingerprint of the bytes we read for `manifest.json`.  An
180    /// existing `_SUCCESS` whose body diverges from the manifest is itself
181    /// an integrity failure — surfaced via `failures`.
182    pub success_marker_consistent: bool,
183    /// Self-consistency of the manifest (`row_count`, `part_count`,
184    /// duplicate `part_id`s).  Skipped when `manifest_found = false`.
185    pub manifest_self_consistent: bool,
186    /// Final verdict, **derived** (not hand-maintained) — `manifest_found` and
187    /// no *fatal* failure ([`Failure::is_fatal`]).  Stored so it stays in the
188    /// `summary.json` contract, but computed in one place
189    /// ([`ManifestVerification::recompute_passed`]) so a new failure variant is
190    /// fatal by default rather than relying on every site to flip a bool.
191    pub passed: bool,
192    /// Per-failure detail.  May be non-empty with `passed = true` for advisory
193    /// (non-fatal) failures like [`Failure::UntrackedObject`].  Stable variant
194    /// set; new variants land under a new manifest version per ADR-0012.
195    pub failures: Vec<Failure>,
196    /// The graded depth this verdict was produced at: `"light"`, `"sample"`,
197    /// or `"full"` (see [`ValidateDepth`]).  Lets a consumer of `summary.json`
198    /// tell **how much** was actually checked — a `passed: true` at `"light"`
199    /// asserts far less than at `"full"` (no part presence was verified).
200    /// `#[serde(default)]` (→ `"full"`) for back-compat: pre-graded verdicts
201    /// always ran the full pass.
202    #[serde(default = "default_depth_level")]
203    pub depth_level: String,
204}
205
206/// `serde(default)` for [`ManifestVerification::depth_level`]: a verdict that
207/// predates the graded layer always ran the full pass, so an absent field
208/// deserializes to `"full"`.
209fn default_depth_level() -> String {
210    ValidateDepth::Full.label().to_string()
211}
212
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214#[serde(tag = "kind", rename_all = "snake_case")]
215pub enum Failure {
216    /// Manifest declared a part that does not exist at the destination.
217    PartMissing { part_id: u32, path: String },
218    /// Manifest declared a part whose actual size differs from `size_bytes`.
219    PartSizeMismatch {
220        part_id: u32,
221        path: String,
222        expected: u64,
223        actual: u64,
224    },
225    /// Part present at the recorded size but its content MD5 (from the store's
226    /// listing metadata) differs from the manifest's — transit / at-rest
227    /// corruption, caught with no download.
228    PartChecksumMismatch {
229        part_id: u32,
230        path: String,
231        expected: String,
232        actual: String,
233    },
234    /// `_SUCCESS` exists but its body is malformed (not `xxh3:<16-hex>` after
235    /// trim).  ADR-0012 M2 — orchestrators rely on this format being strict.
236    SuccessMarkerMalformed { body_preview: String },
237    /// `_SUCCESS` body parsed but does not match `xxh3(manifest.json bytes)`.
238    /// Two legitimate sources: (a) someone overwrote `_SUCCESS` after the
239    /// manifest was rewritten — orchestrator bug; (b) the manifest was
240    /// edited in place after the run — operator bug.  Either way the
241    /// manifest is no longer trustworthy.
242    SuccessMarkerStale {
243        marker_fingerprint: String,
244        manifest_fingerprint: String,
245    },
246    /// `RunManifest::validate_self_consistency` rejected the manifest.
247    /// Usually a writer bug (declared row_count != sum of committed parts'
248    /// rows); blocks the rest of the verification because the manifest
249    /// itself is unreliable.
250    ManifestSelfInconsistent { detail: String },
251    /// Reading `manifest.json` returned an I/O error other than "absent".
252    ManifestReadError { detail: String },
253    /// Reading `_SUCCESS` returned an I/O error other than "absent".
254    SuccessMarkerReadError { detail: String },
255    /// Listing the destination prefix returned an I/O error.  Reduces the
256    /// untracked-parts check (M5 surplus) to a no-op for this run.
257    ListPrefixError { detail: String },
258    /// A file is present at the destination prefix but no manifest entry
259    /// references it.  M9-adjacent: `--validate` only flags it; quarantine
260    /// belongs to `--resume`.
261    UntrackedObject { key: String, size_bytes: u64 },
262    /// The export declared `verify: content` but some parts could only be
263    /// size-verified (no comparable content checksum from the store) — the
264    /// declared integrity contract was not met.
265    ContentVerificationUnmet { size_only: usize, total: usize },
266    /// A manifest was *required* at this prefix (the operator pinned a literal
267    /// `--prefix`, asserting a real dataset lives here) but none was found.
268    /// Without this, an absent manifest at an operator-pinned prefix maps to
269    /// the M6 legacy-run label and exits 0 — indistinguishable from a verified
270    /// run, so a CI gate `rivet validate && deploy` sails past a destination
271    /// that was never written.  Fatal: a required-but-missing manifest is a
272    /// refusal reason, not a "cannot certify" advisory.
273    ManifestRequiredButAbsent { prefix: String },
274}
275
276impl Failure {
277    /// Whether this failure invalidates the dataset (flips `passed` to false).
278    ///
279    /// Every variant is fatal **except** [`Failure::UntrackedObject`]: surplus
280    /// objects are an audit signal whose cleanup is `--resume`'s job (ADR-0012
281    /// M9), not a corruption of the manifest-listed parts.  New variants are
282    /// fatal by default — opt out here explicitly, so a forgotten case fails
283    /// closed (safe) rather than silently passing.
284    pub fn is_fatal(&self) -> bool {
285        !matches!(self, Failure::UntrackedObject { .. })
286    }
287
288    /// Stable `RIVET_VERIFY_*` error code for this failure variant.
289    ///
290    /// One code per variant, intended for orchestrators / CI to branch on
291    /// without parsing the human `Display` string or the per-variant JSON
292    /// fields.  The code is part of the wire contract: it is emitted next to
293    /// `kind` in the JSON report and prefixed in brackets on each pretty line.
294    /// Codes are append-only — never renamed once shipped (a renamed code is a
295    /// silent break for any consumer keying off it).
296    pub fn error_code(&self) -> &'static str {
297        match self {
298            Failure::PartMissing { .. } => "RIVET_VERIFY_PART_MISSING",
299            Failure::PartSizeMismatch { .. } => "RIVET_VERIFY_PART_SIZE_MISMATCH",
300            Failure::PartChecksumMismatch { .. } => "RIVET_VERIFY_PART_CHECKSUM_MISMATCH",
301            Failure::SuccessMarkerMalformed { .. } => "RIVET_VERIFY_SUCCESS_MALFORMED",
302            Failure::SuccessMarkerStale { .. } => "RIVET_VERIFY_SUCCESS_STALE",
303            Failure::ManifestSelfInconsistent { .. } => "RIVET_VERIFY_MANIFEST_INCONSISTENT",
304            Failure::ManifestReadError { .. } => "RIVET_VERIFY_MANIFEST_READ_ERROR",
305            Failure::SuccessMarkerReadError { .. } => "RIVET_VERIFY_SUCCESS_READ_ERROR",
306            Failure::ListPrefixError { .. } => "RIVET_VERIFY_LIST_ERROR",
307            Failure::UntrackedObject { .. } => "RIVET_VERIFY_UNTRACKED_OBJECT",
308            Failure::ContentVerificationUnmet { .. } => "RIVET_VERIFY_CONTENT_UNMET",
309            Failure::ManifestRequiredButAbsent { .. } => "RIVET_VERIFY_MANIFEST_REQUIRED",
310        }
311    }
312}
313
314impl std::fmt::Display for Failure {
315    /// One operator-facing line per failure variant.  Used by:
316    /// - `pipeline::report::render_markdown` (summary.md "failure:" lines)
317    /// - `pipeline::validate_cmd::render_pretty` (`rivet validate` stdout)
318    /// - any future consumer that wants a human-readable failure label
319    ///
320    /// The wire format (`failures[].kind` + per-variant fields) lives in
321    /// the `Serialize` derive above and is the contract Airflow / CI
322    /// consumers branch on.  This `Display` impl is for humans only.
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        match self {
325            Failure::PartMissing { part_id, path } => {
326                write!(f, "part {} missing at {}", part_id, path)
327            }
328            Failure::PartSizeMismatch {
329                part_id,
330                path,
331                expected,
332                actual,
333            } => write!(
334                f,
335                "part {} size mismatch at {}: manifest {}, dest {}",
336                part_id, path, expected, actual
337            ),
338            Failure::PartChecksumMismatch {
339                part_id,
340                path,
341                expected,
342                actual,
343            } => write!(
344                f,
345                "part {} content mismatch at {}: manifest md5 {}, dest {}",
346                part_id, path, expected, actual
347            ),
348            Failure::SuccessMarkerMalformed { body_preview } => {
349                write!(f, "_SUCCESS body malformed: {body_preview:?}")
350            }
351            Failure::SuccessMarkerStale {
352                marker_fingerprint,
353                manifest_fingerprint,
354            } => write!(
355                f,
356                "_SUCCESS body {} != manifest fingerprint {} (stale marker)",
357                marker_fingerprint, manifest_fingerprint
358            ),
359            Failure::ManifestSelfInconsistent { detail } => {
360                write!(f, "manifest self-consistency: {detail}")
361            }
362            Failure::ManifestReadError { detail } => {
363                write!(f, "manifest read error: {detail}")
364            }
365            Failure::SuccessMarkerReadError { detail } => {
366                write!(f, "_SUCCESS read error: {detail}")
367            }
368            Failure::ListPrefixError { detail } => {
369                write!(f, "destination listing error: {detail}")
370            }
371            Failure::UntrackedObject { key, size_bytes } => {
372                write!(f, "untracked object: {} ({} bytes)", key, size_bytes)
373            }
374            Failure::ContentVerificationUnmet { size_only, total } => write!(
375                f,
376                "verify: content not met — {size_only} of {total} part(s) only \
377                 size-verified (no store checksum); lower max_file_size so parts \
378                 upload as a single PUT, or the backend exposes no checksum"
379            ),
380            Failure::ManifestRequiredButAbsent { prefix } => write!(
381                f,
382                "no manifest at {prefix}: a manifest was required here (operator \
383                 pinned --prefix) but none was found — this prefix was never \
384                 written, or the data was relocated. Run the export first, or \
385                 drop --prefix to validate the config-resolved destination."
386            ),
387        }
388    }
389}
390
391impl ManifestVerification {
392    /// Base verdict: nothing checked yet (no manifest, all counts zero, all
393    /// sub-checks false, `passed = false`).  Every constructor builds on this
394    /// and overrides only what differs, so a new field lands in **one** place
395    /// rather than several near-identical literals.
396    fn empty() -> Self {
397        Self {
398            manifest_found: false,
399            legacy_run: false,
400            parts_verified: 0,
401            parts_md5_verified: 0,
402            parts_failed: 0,
403            success_marker_consistent: false,
404            manifest_self_consistent: false,
405            passed: false,
406            failures: Vec::new(),
407            // Base level; `verify_at_destination` overwrites this with the
408            // depth it was actually called at before returning any verdict.
409            depth_level: default_depth_level(),
410        }
411    }
412
413    /// Recompute `passed` from the verdict's facts: a manifest was found and no
414    /// **fatal** failure was recorded (advisory failures like `UntrackedObject`
415    /// don't count).  The single source of truth — callers set failures and
416    /// call this once, rather than flipping `passed` by hand at every site.
417    fn recompute_passed(&mut self) {
418        self.passed = self.manifest_found && !self.failures.iter().any(Failure::is_fatal);
419    }
420
421    /// Apply the export's `verify` policy (ADR-0013 / review D).  When content
422    /// verification is required but some parts were only size-verified, record
423    /// a fatal [`Failure::ContentVerificationUnmet`] and re-derive `passed`.
424    /// Policy lives here (one place); the composers — run finalize and the
425    /// `rivet validate` command — just call it with their export's intent.
426    pub fn enforce_content_policy(&mut self, require_content: bool) {
427        if require_content && self.manifest_found {
428            let size_only = self.parts_verified.saturating_sub(self.parts_md5_verified);
429            if size_only > 0 {
430                self.failures.push(Failure::ContentVerificationUnmet {
431                    size_only,
432                    total: self.parts_verified,
433                });
434                self.recompute_passed();
435            }
436        }
437    }
438
439    /// Apply the "a manifest must exist here" policy (finding #20).  When the
440    /// operator pinned a literal `--prefix`, an absent manifest is no longer the
441    /// benign M6 legacy-run case — it almost always means the prefix was never
442    /// written (a misconfigured CI gate). Convert that exact verdict — no
443    /// manifest, no other failure (i.e. the [`ManifestVerification::legacy`]
444    /// shape) — into a fatal [`Failure::ManifestRequiredButAbsent`] so the exit
445    /// gate refuses it loudly instead of silently passing.
446    ///
447    /// Deliberately a no-op for every other shape: a real manifest (passed or
448    /// failed), or an absent manifest that already carries a `ManifestReadError`
449    /// / head failure, is left untouched — those are already classified.  Only
450    /// the "legacy / cannot certify" case is escalated, and only when required.
451    pub fn require_manifest_present(&mut self, prefix: &str) {
452        if !self.manifest_found && !self.has_failures() {
453            self.legacy_run = false;
454            self.failures.push(Failure::ManifestRequiredButAbsent {
455                prefix: prefix.to_string(),
456            });
457            self.recompute_passed();
458        }
459    }
460
461    /// Construct the M6 (legacy run) verdict for a destination that has no
462    /// manifest at all.  Caller composes this with the existing per-file
463    /// row-count check; together they form the legacy `--validate` result.
464    pub fn legacy() -> Self {
465        // `passed = false` is intentional — not "validation failed" but "this
466        // verifier cannot certify"; the caller layers per-file row counts on
467        // top and composes the final verdict.
468        Self {
469            legacy_run: true,
470            ..Self::empty()
471        }
472    }
473
474    /// True iff this verification surfaced any explicit failure (i.e. a
475    /// reason an orchestrator should refuse the run).  Distinct from
476    /// `!passed`, which can also mean "legacy / not applicable".
477    pub fn has_failures(&self) -> bool {
478        !self.failures.is_empty()
479    }
480}
481
482/// Run the manifest-aware verification at `manifest_dir` (the destination-
483/// relative directory containing `manifest.json` and `_SUCCESS`).
484///
485/// `manifest_dir` is the same key shape `Destination::write` was called with
486/// for the manifest itself — typically empty (`""`) for prefix-rooted runs,
487/// or the per-export sub-directory.  Trailing `/` is optional.
488///
489/// This function does not panic on any expected I/O outcome — every read
490/// failure becomes a `Failure::*ReadError` so the caller can render a
491/// useful message instead of bailing.
492///
493/// `depth` selects the graded verify layer (see [`ValidateDepth`]):
494/// - [`ValidateDepth::Light`] skips section 3 (the `list_prefix` part
495///   reconcile) and section 5 (untracked surplus), leaving `parts_verified`
496///   at 0 — a fast manifest + `_SUCCESS` poll with no prefix listing.
497/// - [`ValidateDepth::Sample`] and [`ValidateDepth::Full`] run all five
498///   sections here.  The Form B value re-read is **not** in this function;
499///   it is the caller's concern (`run_validate_command`), gated on `Full`.
500///
501/// Regardless of depth, `depth_level` on the returned verdict records the
502/// level this pass ran at.
503pub fn verify_at_destination(
504    dest: &dyn Destination,
505    manifest_dir: &str,
506    depth: ValidateDepth,
507) -> Result<ManifestVerification> {
508    let manifest_key = join_key(manifest_dir, MANIFEST_FILENAME);
509    let success_key = join_key(manifest_dir, SUCCESS_FILENAME);
510
511    // Stamp the depth this pass ran at onto every verdict before it leaves the
512    // function — including the early-return error/legacy shapes — so a consumer
513    // always sees *how much* was checked.  Each `return Ok(v)` below routes
514    // through `with_depth` (or sets `out.depth_level` for the main path).
515    let with_depth = |mut v: ManifestVerification| -> ManifestVerification {
516        v.depth_level = depth.label().to_string();
517        v
518    };
519
520    // ── 1. Manifest read ───────────────────────────────────────────────
521    //
522    // Error-consistency contract: every I/O outcome here surfaces as a
523    // structured `Failure` variant rather than as `Err`.  An operator gets
524    // one verdict shape regardless of whether the destination is missing,
525    // permission-denied, or temporarily unreachable.  The bubbled `Err`
526    // path is reserved for *programmer* errors (caller passes a malformed
527    // `manifest_dir`, a future destination breaks an internal invariant).
528    let manifest_bytes = match dest.head(&manifest_key) {
529        Ok(None) => return Ok(with_depth(ManifestVerification::legacy())),
530        Ok(Some(_)) => match read_capped(dest, &manifest_key, MANIFEST_MAX_BYTES) {
531            Ok(b) => b,
532            Err(e) => {
533                let mut v = ManifestVerification::legacy();
534                v.legacy_run = false;
535                v.failures.push(Failure::ManifestReadError {
536                    detail: format!("{e:#}"),
537                });
538                v.passed = false;
539                return Ok(with_depth(v));
540            }
541        },
542        Err(e) => {
543            // `head` failure is symmetric to a `read` failure — same kind
544            // (`ManifestReadError`) so consumers don't have to branch on
545            // which method tripped.  Distinct from "manifest absent"
546            // (Ok(None) above) which legitimately means "legacy prefix".
547            let mut v = ManifestVerification::legacy();
548            v.legacy_run = false;
549            v.failures.push(Failure::ManifestReadError {
550                detail: format!("manifest head failed: {e:#}"),
551            });
552            v.passed = false;
553            return Ok(with_depth(v));
554        }
555    };
556
557    let manifest: RunManifest = match serde_json::from_slice(&manifest_bytes) {
558        Ok(m) => m,
559        Err(e) => {
560            // A malformed manifest is treated as a self-inconsistency —
561            // semantically equivalent for the operator (the manifest can't
562            // be trusted) but kept distinct in `failures` so the kind is
563            // explicit on the wire.
564            return Ok(with_depth(ManifestVerification {
565                manifest_found: true,
566                failures: vec![Failure::ManifestSelfInconsistent {
567                    detail: format!("manifest.json parse failed: {e}"),
568                }],
569                ..ManifestVerification::empty()
570            }));
571        }
572    };
573
574    // Optimistic base: a found, self-consistent manifest that passes until a
575    // check below flips it.  Overrides only what differs from `empty()`.
576    // Stamp the depth here so the two early `return Ok(out)` paths in section
577    // 4 (success-marker head error, non-utf8 body) carry the right level too.
578    let mut out = ManifestVerification {
579        manifest_found: true,
580        manifest_self_consistent: true,
581        passed: true,
582        depth_level: depth.label().to_string(),
583        ..ManifestVerification::empty()
584    };
585
586    // ── 2. Self-consistency ─────────────────────────────────────────────
587    if let Err(e) = manifest.validate_self_consistency() {
588        out.manifest_self_consistent = false;
589        out.failures.push(Failure::ManifestSelfInconsistent {
590            detail: format!("{e}"),
591        });
592        // Don't short-circuit — we still want to surface part-presence
593        // failures because the operator may want to know both classes at
594        // once rather than fix-then-rerun.
595    }
596
597    // ── 3. Reconcile parts + surplus against ONE prefix listing ────────
598    //
599    // Presence and untracked-surplus both fall out of a single
600    // `reconcile_manifest_against_listing` over one `list_prefix` — the same
601    // pure walk chunked resume uses (`build_resume_plan`).  This replaces the
602    // old per-part `HEAD` loop (N round-trips) and its separate untracked
603    // listing.  Per-part failures are emitted here (step 3); untracked is
604    // emitted at step 5 so the failure ordering an operator reads is stable.
605    //
606    // Trade-off: presence now rides the listing, not per-part `HEAD`.  If the
607    // listing cannot be read, an audit cannot certify the parts — so a list
608    // failure flips `passed = false` (a `ListPrefixError`), rather than the
609    // old behaviour where per-part HEAD still "verified" parts a failed
610    // listing couldn't enumerate.  Every Rivet destination backend offers
611    // strong read-after-write list consistency, so the happy path is one call.
612    //
613    // Graded depth: `Light` skips this `list_prefix` entirely — no part
614    // reconcile, no `ListPrefixError`, `parts_verified` stays 0, and section 5
615    // (untracked) is a no-op since `reconciliation` is `None`.  `Sample` and
616    // `Full` run it.  A `Light` pass therefore certifies only that the
617    // manifest reads, is self-consistent, and `_SUCCESS` matches — never that
618    // the parts are physically present.
619    let reconciliation = if depth.runs_part_reconcile() {
620        match dest.list_prefix(manifest_dir) {
621            Ok(listing) => Some(reconcile_manifest_against_listing(
622                &manifest,
623                &listing,
624                manifest_dir,
625            )),
626            Err(e) => {
627                out.failures.push(Failure::ListPrefixError {
628                    detail: format!("{e:#}"),
629                });
630                None
631            }
632        }
633    } else {
634        None
635    };
636    if let Some(rec) = &reconciliation {
637        for check in &rec.per_part {
638            match &check.presence {
639                PartPresence::Present { md5_verified } => {
640                    out.parts_verified += 1;
641                    if *md5_verified {
642                        out.parts_md5_verified += 1;
643                    }
644                }
645                PartPresence::SizeMismatch { expected, actual } => {
646                    out.parts_failed += 1;
647                    out.failures.push(Failure::PartSizeMismatch {
648                        part_id: check.part_id,
649                        path: check.path.clone(),
650                        expected: *expected,
651                        actual: *actual,
652                    });
653                }
654                PartPresence::Missing => {
655                    out.parts_failed += 1;
656                    out.failures.push(Failure::PartMissing {
657                        part_id: check.part_id,
658                        path: check.path.clone(),
659                    });
660                }
661                PartPresence::ChecksumMismatch { expected, actual } => {
662                    out.parts_failed += 1;
663                    out.failures.push(Failure::PartChecksumMismatch {
664                        part_id: check.part_id,
665                        path: check.path.clone(),
666                        expected: expected.clone(),
667                        actual: actual.clone(),
668                    });
669                }
670            }
671        }
672    }
673
674    // ── 4. _SUCCESS marker consistency ─────────────────────────────────
675    //
676    // Same error-consistency contract as step 1: head/read failures become
677    // `Failure::SuccessMarkerReadError`, not bubbled `Err`.  Absent marker
678    // (Ok(None)) stays informational, not a failure (M2: only successful
679    // runs land _SUCCESS, so its absence on a failed manifest is correct).
680    let success_head = match dest.head(&success_key) {
681        Ok(h) => h,
682        Err(e) => {
683            out.failures.push(Failure::SuccessMarkerReadError {
684                detail: format!("_SUCCESS head failed: {e:#}"),
685            });
686            out.recompute_passed();
687            return Ok(out);
688        }
689    };
690    match success_head {
691        None => {
692            // Absent _SUCCESS is informational, not a failure: per ADR-0012
693            // M2, only successful runs land it.  A failed-then-rewritten
694            // manifest legitimately lacks _SUCCESS.  Leave
695            // `success_marker_consistent = false` (this is a "no signal"
696            // bool, not a "broken" bool) and let the caller decide.
697        }
698        Some(_) => match dest.read(&success_key) {
699            Err(e) => {
700                out.failures.push(Failure::SuccessMarkerReadError {
701                    detail: format!("{e:#}"),
702                });
703            }
704            Ok(body) => {
705                let body_str = match std::str::from_utf8(&body) {
706                    Ok(s) => s,
707                    Err(_) => {
708                        out.failures.push(Failure::SuccessMarkerMalformed {
709                            body_preview: format!("(non-utf8, {} bytes)", body.len()),
710                        });
711                        out.recompute_passed();
712                        return Ok(out);
713                    }
714                };
715                match parse_success_marker(body_str) {
716                    None => {
717                        out.failures.push(Failure::SuccessMarkerMalformed {
718                            body_preview: preview(body_str),
719                        });
720                    }
721                    Some(marker_fp) => {
722                        let manifest_fp = success_marker_body(&manifest_bytes);
723                        // success_marker_body returns the trailing `\n`
724                        // form; trim before comparing to the parsed marker
725                        // (which already trims).
726                        let manifest_fp_trimmed = manifest_fp.trim_end_matches('\n');
727                        if marker_fp == manifest_fp_trimmed {
728                            out.success_marker_consistent = true;
729                        } else {
730                            out.failures.push(Failure::SuccessMarkerStale {
731                                marker_fingerprint: marker_fp.to_string(),
732                                manifest_fingerprint: manifest_fp_trimmed.to_string(),
733                            });
734                        }
735                    }
736                }
737            }
738        },
739    }
740
741    // ── 5. Untracked surplus ───────────────────────────────────────────
742    //
743    // Already computed by the step-3 reconciliation (sidecars, quarantine,
744    // and the doctor probe are filtered there).  Emit it last so the failure
745    // ordering stays parts → marker → untracked.  A list failure left
746    // `reconciliation = None` and already flipped `passed` above.
747    if let Some(rec) = reconciliation {
748        for obj in rec.untracked {
749            out.failures.push(Failure::UntrackedObject {
750                key: obj.key,
751                size_bytes: obj.size_bytes,
752            });
753        }
754    }
755
756    out.recompute_passed();
757    Ok(out)
758}
759
760/// Truncate `s` to a small printable preview for error messages.
761fn preview(s: &str) -> String {
762    let trimmed: String = s.chars().take(40).collect();
763    if s.chars().count() > 40 {
764        format!("{trimmed}…")
765    } else {
766        trimmed
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use crate::config::{DestinationConfig, DestinationType};
774    use crate::destination::local::LocalDestination;
775    use crate::manifest::{
776        MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
777        PartStatus, RunManifest,
778    };
779    use std::path::Path;
780
781    fn local_dest(base: &Path) -> LocalDestination {
782        LocalDestination::new(&DestinationConfig {
783            destination_type: DestinationType::Local,
784            path: Some(base.to_string_lossy().into_owned()),
785            ..Default::default()
786        })
787        .unwrap()
788    }
789
790    fn part(part_id: u32, rows: i64, size: u64, fp: &str) -> ManifestPart {
791        ManifestPart {
792            part_id,
793            path: format!("part-{part_id:06}.parquet"),
794            rows,
795            size_bytes: size,
796            content_fingerprint: fp.into(),
797            content_md5: String::new(),
798            status: PartStatus::Committed,
799        }
800    }
801
802    fn build_manifest(parts: Vec<ManifestPart>, status: ManifestStatus) -> RunManifest {
803        let row_count: i64 = parts
804            .iter()
805            .filter(|p| p.status == PartStatus::Committed)
806            .map(|p| p.rows)
807            .sum();
808        let part_count = parts
809            .iter()
810            .filter(|p| p.status == PartStatus::Committed)
811            .count() as u32;
812        RunManifest {
813            manifest_version: MANIFEST_VERSION,
814            run_id: "r".into(),
815            export_name: "public.orders".into(),
816            started_at: "2026-05-21T12:00:00Z".into(),
817            finished_at: "2026-05-21T12:01:00Z".into(),
818            status,
819            source: ManifestSource {
820                engine: "postgres".into(),
821                schema: Some("public".into()),
822                table: Some("orders".into()),
823            },
824            destination: ManifestDestination {
825                kind: "local".into(),
826                uri: "file:///tmp/out".into(),
827            },
828            format: "parquet".into(),
829            compression: "zstd".into(),
830            schema_fingerprint: "xxh3:0123456789abcdef".into(),
831            row_count,
832            part_count,
833            parts,
834            column_checksums: None,
835            checksum_key_column: None,
836        }
837    }
838
839    /// Lay out a clean dataset with manifest + _SUCCESS at the root.
840    fn write_dataset(dir: &Path, m: &RunManifest, parts_with_bytes: &[(&str, &[u8])]) {
841        for (name, bytes) in parts_with_bytes {
842            std::fs::write(dir.join(name), bytes).unwrap();
843        }
844        let body = serde_json::to_vec_pretty(m).unwrap();
845        std::fs::write(dir.join(MANIFEST_FILENAME), &body).unwrap();
846        if matches!(m.status, ManifestStatus::Success) {
847            std::fs::write(dir.join(SUCCESS_FILENAME), success_marker_body(&body)).unwrap();
848        }
849    }
850
851    // ── happy path ───────────────────────────────────────────────────────
852
853    #[test]
854    fn happy_path_verifies_all_parts_and_success_marker() {
855        let dir = tempfile::tempdir().unwrap();
856        let m = build_manifest(
857            vec![
858                part(1, 10, 4, "xxh3:1111111111111111"),
859                part(2, 20, 5, "xxh3:2222222222222222"),
860            ],
861            ManifestStatus::Success,
862        );
863        write_dataset(
864            dir.path(),
865            &m,
866            &[
867                ("part-000001.parquet", b"AAAA"),
868                ("part-000002.parquet", b"BBBBB"),
869            ],
870        );
871        let dest = local_dest(dir.path());
872
873        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
874        assert!(v.manifest_found);
875        assert!(!v.legacy_run);
876        assert_eq!(v.parts_verified, 2);
877        assert_eq!(v.parts_failed, 0);
878        assert!(v.success_marker_consistent);
879        assert!(v.manifest_self_consistent);
880        assert!(v.passed);
881        assert!(v.failures.is_empty());
882    }
883
884    // ── M6 legacy run ───────────────────────────────────────────────────
885
886    #[test]
887    fn no_manifest_returns_legacy_run_label() {
888        // Empty prefix — no manifest, no parts.
889        let dir = tempfile::tempdir().unwrap();
890        let dest = local_dest(dir.path());
891        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
892        assert!(!v.manifest_found);
893        assert!(v.legacy_run);
894        assert_eq!(v.parts_verified, 0);
895        assert!(!v.passed);
896        assert!(v.failures.is_empty(), "no failures, just a legacy label");
897    }
898
899    // ── M5 part-presence failures ───────────────────────────────────────
900
901    #[test]
902    fn missing_part_is_flagged_with_part_id_and_path() {
903        let dir = tempfile::tempdir().unwrap();
904        let m = build_manifest(
905            vec![
906                part(1, 10, 4, "xxh3:1111111111111111"),
907                part(2, 20, 5, "xxh3:2222222222222222"),
908            ],
909            ManifestStatus::Success,
910        );
911        write_dataset(
912            dir.path(),
913            &m,
914            &[("part-000001.parquet", b"AAAA")], // part 2 missing
915        );
916        let dest = local_dest(dir.path());
917
918        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
919        assert_eq!(v.parts_verified, 1);
920        assert_eq!(v.parts_failed, 1);
921        assert!(!v.passed);
922        assert!(
923            v.failures
924                .iter()
925                .any(|f| matches!(f, Failure::PartMissing { part_id: 2, .. }))
926        );
927    }
928
929    #[test]
930    fn part_size_mismatch_is_flagged_with_expected_and_actual() {
931        let dir = tempfile::tempdir().unwrap();
932        let m = build_manifest(
933            vec![part(1, 10, 4, "xxh3:1111111111111111")],
934            ManifestStatus::Success,
935        );
936        // Manifest claims 4 bytes; we write 6.
937        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"OOPSIE")]);
938        let dest = local_dest(dir.path());
939
940        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
941        assert!(!v.passed);
942        let mismatch = v
943            .failures
944            .iter()
945            .find_map(|f| match f {
946                Failure::PartSizeMismatch {
947                    part_id,
948                    expected,
949                    actual,
950                    ..
951                } => Some((*part_id, *expected, *actual)),
952                _ => None,
953            })
954            .expect("must surface the size mismatch");
955        assert_eq!(mismatch, (1, 4, 6));
956    }
957
958    // ── _SUCCESS marker integrity ───────────────────────────────────────
959
960    #[test]
961    fn stale_success_marker_is_flagged_as_inconsistent() {
962        // Write a manifest, then overwrite _SUCCESS with the marker for a
963        // *different* manifest body — simulating an orchestrator that
964        // mishandled a re-run.
965        let dir = tempfile::tempdir().unwrap();
966        let m = build_manifest(
967            vec![part(1, 10, 4, "xxh3:1111111111111111")],
968            ManifestStatus::Success,
969        );
970        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
971        std::fs::write(
972            dir.path().join(SUCCESS_FILENAME),
973            success_marker_body(b"different manifest body"),
974        )
975        .unwrap();
976        let dest = local_dest(dir.path());
977
978        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
979        assert!(!v.success_marker_consistent);
980        assert!(!v.passed);
981        assert!(
982            v.failures
983                .iter()
984                .any(|f| matches!(f, Failure::SuccessMarkerStale { .. }))
985        );
986    }
987
988    #[test]
989    fn malformed_success_marker_body_is_flagged() {
990        let dir = tempfile::tempdir().unwrap();
991        let m = build_manifest(
992            vec![part(1, 10, 4, "xxh3:1111111111111111")],
993            ManifestStatus::Success,
994        );
995        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
996        std::fs::write(dir.path().join(SUCCESS_FILENAME), b"not even xxh3 shaped").unwrap();
997        let dest = local_dest(dir.path());
998
999        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1000        assert!(!v.passed);
1001        assert!(
1002            v.failures
1003                .iter()
1004                .any(|f| matches!(f, Failure::SuccessMarkerMalformed { .. }))
1005        );
1006    }
1007
1008    #[test]
1009    fn absent_success_marker_does_not_fail_validation_alone() {
1010        // ADR-0012 M2: only successful runs land _SUCCESS.  A failed-then-
1011        // rewritten manifest legitimately lacks one — verification must
1012        // not flip `passed` just for that.
1013        let dir = tempfile::tempdir().unwrap();
1014        let m = build_manifest(
1015            vec![part(1, 10, 4, "xxh3:1111111111111111")],
1016            ManifestStatus::Failed,
1017        );
1018        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1019        // Note: write_dataset only writes _SUCCESS for status == Success,
1020        // so no marker exists here.
1021        assert!(!dir.path().join(SUCCESS_FILENAME).exists());
1022        let dest = local_dest(dir.path());
1023
1024        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1025        assert!(v.manifest_found);
1026        assert!(
1027            !v.success_marker_consistent,
1028            "no marker => false (no signal)"
1029        );
1030        // The parts still verified, so passed = true.
1031        assert!(v.passed);
1032        assert!(v.failures.is_empty());
1033    }
1034
1035    // ── self-consistency ────────────────────────────────────────────────
1036
1037    #[test]
1038    fn self_inconsistent_manifest_is_flagged_but_part_check_still_runs() {
1039        let dir = tempfile::tempdir().unwrap();
1040        let mut m = build_manifest(
1041            vec![part(1, 10, 4, "xxh3:1111111111111111")],
1042            ManifestStatus::Success,
1043        );
1044        m.row_count = 9999; // lie
1045
1046        let body = serde_json::to_vec_pretty(&m).unwrap();
1047        std::fs::write(dir.path().join("part-000001.parquet"), b"AAAA").unwrap();
1048        std::fs::write(dir.path().join(MANIFEST_FILENAME), &body).unwrap();
1049        std::fs::write(
1050            dir.path().join(SUCCESS_FILENAME),
1051            success_marker_body(&body),
1052        )
1053        .unwrap();
1054        let dest = local_dest(dir.path());
1055
1056        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1057        assert!(v.manifest_found);
1058        assert!(!v.manifest_self_consistent);
1059        assert!(!v.passed);
1060        // Parts that are physically present still get their `parts_verified`
1061        // counter bumped — both signals are independently useful.
1062        assert_eq!(v.parts_verified, 1);
1063        assert!(
1064            v.failures
1065                .iter()
1066                .any(|f| matches!(f, Failure::ManifestSelfInconsistent { .. }))
1067        );
1068    }
1069
1070    // ── untracked objects ───────────────────────────────────────────────
1071
1072    #[test]
1073    fn untracked_object_under_prefix_is_flagged() {
1074        let dir = tempfile::tempdir().unwrap();
1075        let m = build_manifest(
1076            vec![part(1, 10, 4, "xxh3:1111111111111111")],
1077            ManifestStatus::Success,
1078        );
1079        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1080        std::fs::write(dir.path().join("rogue.parquet"), b"XX").unwrap();
1081        let dest = local_dest(dir.path());
1082
1083        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1084        assert!(
1085            v.failures.iter().any(
1086                |f| matches!(f, Failure::UntrackedObject { key, .. } if key == "rogue.parquet")
1087            )
1088        );
1089        // Untracked objects are surfaced but do NOT flip `passed` — that
1090        // is the resume-side decision (M9).  Parts and marker are fine,
1091        // so passed remains true.
1092        assert!(v.passed);
1093    }
1094
1095    #[test]
1096    fn quarantine_prefix_objects_are_silently_ignored() {
1097        let dir = tempfile::tempdir().unwrap();
1098        let m = build_manifest(
1099            vec![part(1, 10, 4, "xxh3:1111111111111111")],
1100            ManifestStatus::Success,
1101        );
1102        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1103        std::fs::create_dir_all(dir.path().join(crate::manifest::QUARANTINE_PREFIX)).unwrap();
1104        std::fs::write(
1105            dir.path()
1106                .join(crate::manifest::QUARANTINE_PREFIX)
1107                .join("old.parquet"),
1108            b"OO",
1109        )
1110        .unwrap();
1111        let dest = local_dest(dir.path());
1112
1113        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1114        assert!(v.passed);
1115        assert!(
1116            !v.failures
1117                .iter()
1118                .any(|f| matches!(f, Failure::UntrackedObject { .. })),
1119            "quarantine_prefix is the legitimate home for these — must not flag"
1120        );
1121    }
1122
1123    #[test]
1124    fn doctor_probe_is_not_flagged_as_untracked() {
1125        // Regression: `rivet doctor` writes `.rivet_doctor_probe` at the
1126        // destination prefix and never removes it.  A subsequent
1127        // `rivet run --validate` against the same prefix must treat it as a
1128        // Rivet sidecar, not foreign data — otherwise `has_failures()` trips
1129        // and the run's `validated` flag is downgraded to FAIL.
1130        let dir = tempfile::tempdir().unwrap();
1131        let m = build_manifest(
1132            vec![part(1, 10, 4, "xxh3:1111111111111111")],
1133            ManifestStatus::Success,
1134        );
1135        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1136        std::fs::write(
1137            dir.path().join(crate::manifest::DOCTOR_PROBE_FILENAME),
1138            b"ok",
1139        )
1140        .unwrap();
1141        let dest = local_dest(dir.path());
1142
1143        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1144        assert!(
1145            !v.has_failures(),
1146            "doctor probe must not surface as a failure: {:?}",
1147            v.failures
1148        );
1149        assert!(v.passed);
1150    }
1151
1152    // ── manifest_dir join semantics ─────────────────────────────────────
1153
1154    #[test]
1155    fn verifies_in_subdirectory_when_manifest_dir_is_non_empty() {
1156        let outer = tempfile::tempdir().unwrap();
1157        std::fs::create_dir_all(outer.path().join("sub/run")).unwrap();
1158        let m = build_manifest(
1159            vec![part(1, 10, 4, "xxh3:1111111111111111")],
1160            ManifestStatus::Success,
1161        );
1162        let body = serde_json::to_vec_pretty(&m).unwrap();
1163        std::fs::write(outer.path().join("sub/run/part-000001.parquet"), b"AAAA").unwrap();
1164        std::fs::write(outer.path().join("sub/run").join(MANIFEST_FILENAME), &body).unwrap();
1165        std::fs::write(
1166            outer.path().join("sub/run").join(SUCCESS_FILENAME),
1167            success_marker_body(&body),
1168        )
1169        .unwrap();
1170        let dest = local_dest(outer.path());
1171
1172        let v = verify_at_destination(&dest, "sub/run", ValidateDepth::Full).unwrap();
1173        assert!(v.passed);
1174        assert_eq!(v.parts_verified, 1);
1175
1176        // Trailing slash is normalised — same outcome.
1177        let v2 = verify_at_destination(&dest, "sub/run/", ValidateDepth::Full).unwrap();
1178        assert!(v2.passed);
1179    }
1180
1181    // ── list-failure semantics (presence now rides the listing) ──────────
1182
1183    /// Wraps a real `LocalDestination` but fails every `list_prefix`. Used to
1184    /// pin the post-refactor contract: presence is derived from the listing,
1185    /// so a listing we cannot read means the audit cannot certify the parts.
1186    struct ListFails(LocalDestination);
1187    impl crate::destination::Destination for ListFails {
1188        fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
1189            self.0.write(p, k)
1190        }
1191        fn capabilities(&self) -> crate::destination::DestinationCapabilities {
1192            self.0.capabilities()
1193        }
1194        fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
1195            self.0.head(k)
1196        }
1197        fn read(&self, k: &str) -> Result<Vec<u8>> {
1198            self.0.read(k)
1199        }
1200        fn list_prefix(&self, _: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
1201            anyhow::bail!("listing unavailable")
1202        }
1203    }
1204
1205    #[test]
1206    fn list_failure_cannot_certify_parts_and_fails_the_audit() {
1207        let dir = tempfile::tempdir().unwrap();
1208        let m = build_manifest(vec![part(0, 3, 3, "xxh3:0")], ManifestStatus::Success);
1209        write_dataset(dir.path(), &m, &[("part-000000.parquet", b"abc")]);
1210        let dest = ListFails(local_dest(dir.path()));
1211
1212        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1213        // The manifest itself reads + parses fine (HEAD/read still work)…
1214        assert!(v.manifest_found);
1215        assert!(v.manifest_self_consistent);
1216        // …but with no listing we verify zero parts and refuse to pass.
1217        assert!(
1218            !v.passed,
1219            "an audit that cannot list the prefix must not pass"
1220        );
1221        assert_eq!(v.parts_verified, 0);
1222        assert!(
1223            v.failures
1224                .iter()
1225                .any(|f| matches!(f, Failure::ListPrefixError { .. })),
1226            "expected a ListPrefixError, got: {:?}",
1227            v.failures
1228        );
1229    }
1230
1231    // ── manifest read-error semantics (explicit failure, not legacy) ─────
1232
1233    /// Wraps a real `LocalDestination` but fails reading `manifest.json`
1234    /// (head still sees it) — EACCES or a transient store error on a
1235    /// manifest that exists.
1236    struct ManifestReadFails(LocalDestination);
1237    impl crate::destination::Destination for ManifestReadFails {
1238        fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
1239            self.0.write(p, k)
1240        }
1241        fn capabilities(&self) -> crate::destination::DestinationCapabilities {
1242            self.0.capabilities()
1243        }
1244        fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
1245            self.0.head(k)
1246        }
1247        fn read(&self, k: &str) -> Result<Vec<u8>> {
1248            if k.ends_with(MANIFEST_FILENAME) {
1249                anyhow::bail!("permission denied (simulated)")
1250            }
1251            self.0.read(k)
1252        }
1253        fn list_prefix(&self, p: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
1254            self.0.list_prefix(p)
1255        }
1256    }
1257
1258    #[test]
1259    fn unreadable_manifest_is_explicit_failure_not_legacy() {
1260        // The exit gates (`rivet validate`, run finalize) key off this exact
1261        // shape: `manifest_found: false` but `has_failures()` — distinct
1262        // from M6 legacy (`legacy_run: true`, no failures, exit 0).
1263        let dir = tempfile::tempdir().unwrap();
1264        let m = build_manifest(
1265            vec![part(1, 10, 4, "xxh3:1111111111111111")],
1266            ManifestStatus::Success,
1267        );
1268        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1269        let dest = ManifestReadFails(local_dest(dir.path()));
1270
1271        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1272        assert!(!v.manifest_found);
1273        assert!(!v.legacy_run, "a read error is not the M6 legacy label");
1274        assert!(!v.passed);
1275        assert!(v.has_failures(), "orchestrators need a reason to refuse");
1276        assert!(
1277            matches!(v.failures.as_slice(), [Failure::ManifestReadError { .. }]),
1278            "expected exactly one ManifestReadError, got: {:?}",
1279            v.failures
1280        );
1281    }
1282
1283    /// Same contract when `head` itself errors (cannot even stat the
1284    /// manifest): symmetric `ManifestReadError`, never the legacy label.
1285    struct ManifestHeadFails(LocalDestination);
1286    impl crate::destination::Destination for ManifestHeadFails {
1287        fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
1288            self.0.write(p, k)
1289        }
1290        fn capabilities(&self) -> crate::destination::DestinationCapabilities {
1291            self.0.capabilities()
1292        }
1293        fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
1294            if k.ends_with(MANIFEST_FILENAME) {
1295                anyhow::bail!("io timeout (simulated)")
1296            }
1297            self.0.head(k)
1298        }
1299        fn read(&self, k: &str) -> Result<Vec<u8>> {
1300            self.0.read(k)
1301        }
1302        fn list_prefix(&self, p: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
1303            self.0.list_prefix(p)
1304        }
1305    }
1306
1307    #[test]
1308    fn manifest_head_error_is_explicit_failure_not_legacy() {
1309        let dir = tempfile::tempdir().unwrap();
1310        let m = build_manifest(
1311            vec![part(1, 10, 4, "xxh3:1111111111111111")],
1312            ManifestStatus::Success,
1313        );
1314        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1315        let dest = ManifestHeadFails(local_dest(dir.path()));
1316
1317        let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1318        assert!(!v.manifest_found);
1319        assert!(!v.legacy_run);
1320        assert!(!v.passed);
1321        assert!(
1322            matches!(
1323                v.failures.as_slice(),
1324                [Failure::ManifestReadError { detail }] if detail.contains("manifest head failed")
1325            ),
1326            "expected one ManifestReadError naming the head step, got: {:?}",
1327            v.failures
1328        );
1329    }
1330
1331    #[test]
1332    fn passed_is_derived_advisory_failures_do_not_fail() {
1333        // An advisory failure (untracked surplus) keeps the verdict passing…
1334        let mut v = ManifestVerification {
1335            manifest_found: true,
1336            ..ManifestVerification::empty()
1337        };
1338        v.failures.push(Failure::UntrackedObject {
1339            key: "stray.parquet".into(),
1340            size_bytes: 9,
1341        });
1342        v.recompute_passed();
1343        assert!(v.passed, "untracked surplus is advisory, not fatal");
1344
1345        // …while any fatal failure flips it.
1346        v.failures.push(Failure::PartMissing {
1347            part_id: 1,
1348            path: "part-000001.parquet".into(),
1349        });
1350        v.recompute_passed();
1351        assert!(!v.passed, "a missing part is fatal");
1352
1353        // No manifest → never passes regardless of failures.
1354        let mut legacy = ManifestVerification::empty();
1355        legacy.recompute_passed();
1356        assert!(!legacy.passed, "no manifest found → cannot certify");
1357    }
1358
1359    #[test]
1360    fn verify_content_policy_fails_only_size_only_parts() {
1361        // 3 parts, 2 content-checked, 1 size-only.
1362        let base = ManifestVerification {
1363            manifest_found: true,
1364            parts_verified: 3,
1365            parts_md5_verified: 2,
1366            ..ManifestVerification::empty()
1367        };
1368        // verify: size → size-only is acceptable, passes.
1369        let mut sz = base.clone();
1370        sz.recompute_passed();
1371        sz.enforce_content_policy(false);
1372        assert!(sz.passed, "size-only OK under verify: size");
1373
1374        // verify: content → the 1 size-only part is a fatal failure.
1375        let mut ct = base.clone();
1376        ct.recompute_passed();
1377        ct.enforce_content_policy(true);
1378        assert!(!ct.passed, "a size-only part fails verify: content");
1379        assert!(
1380            ct.failures.iter().any(|f| matches!(
1381                f,
1382                Failure::ContentVerificationUnmet {
1383                    size_only: 1,
1384                    total: 3
1385                }
1386            )),
1387            "expected ContentVerificationUnmet, got: {:?}",
1388            ct.failures
1389        );
1390
1391        // verify: content with every part md5-checked → satisfied.
1392        let mut all = ManifestVerification {
1393            parts_md5_verified: 3,
1394            ..base
1395        };
1396        all.recompute_passed();
1397        all.enforce_content_policy(true);
1398        assert!(
1399            all.passed && all.failures.is_empty(),
1400            "all md5 meets verify: content"
1401        );
1402    }
1403
1404    // ── require_manifest_present (finding #20: operator-pinned --prefix) ──────
1405
1406    #[test]
1407    fn require_manifest_escalates_legacy_to_fatal_absent() {
1408        // The exact shape `verify_at_destination` returns for an absent manifest
1409        // (`legacy()`): no manifest, no other failure. With a pinned `--prefix`
1410        // this is escalated to a fatal `ManifestRequiredButAbsent` so the exit
1411        // gate refuses it instead of passing as a benign legacy run.
1412        let mut v = ManifestVerification::legacy();
1413        assert!(v.legacy_run && !v.has_failures());
1414
1415        v.require_manifest_present("exports/2026-06-09/orders/");
1416
1417        assert!(!v.legacy_run, "no longer the benign legacy-run label");
1418        assert!(!v.passed, "an absent-but-required manifest cannot pass");
1419        assert!(
1420            matches!(
1421                v.failures.as_slice(),
1422                [Failure::ManifestRequiredButAbsent { prefix }]
1423                    if prefix == "exports/2026-06-09/orders/"
1424            ),
1425            "expected one ManifestRequiredButAbsent naming the prefix, got: {:?}",
1426            v.failures
1427        );
1428    }
1429
1430    #[test]
1431    fn require_manifest_is_noop_on_a_real_passing_manifest() {
1432        // A found, passing verdict is untouched — `--prefix` plus real data is
1433        // the normal "validate this exact prefix" case and must still pass.
1434        let mut v = ManifestVerification {
1435            manifest_found: true,
1436            manifest_self_consistent: true,
1437            parts_verified: 1,
1438            passed: true,
1439            ..ManifestVerification::empty()
1440        };
1441        v.require_manifest_present("exports/orders/");
1442        assert!(
1443            v.passed && v.failures.is_empty(),
1444            "real dataset still passes"
1445        );
1446    }
1447
1448    #[test]
1449    fn require_manifest_does_not_double_flag_a_read_error() {
1450        // An absent manifest that already carries a `ManifestReadError` (head /
1451        // read failed) is already a fatal, classified failure — requiring a
1452        // manifest here must not add a second, redundant failure.
1453        let mut v = ManifestVerification::legacy();
1454        v.legacy_run = false;
1455        v.failures.push(Failure::ManifestReadError {
1456            detail: "permission denied".into(),
1457        });
1458        v.recompute_passed();
1459
1460        v.require_manifest_present("exports/orders/");
1461
1462        assert!(
1463            matches!(v.failures.as_slice(), [Failure::ManifestReadError { .. }]),
1464            "must leave the existing read-error verdict alone, got: {:?}",
1465            v.failures
1466        );
1467    }
1468
1469    // ── graded verify layer (--depth) ───────────────────────────────────
1470
1471    #[test]
1472    fn light_depth_skips_part_reconcile_even_when_a_part_is_missing() {
1473        // A manifest declaring a part that is NOT on disk. At `Full`/`Sample`
1474        // this is a fatal `PartMissing`; at `Light` the `list_prefix` reconcile
1475        // is skipped entirely, so `parts_verified == 0`, no `ListPrefixError`,
1476        // and — with `_SUCCESS` consistent and the manifest self-consistent —
1477        // the verdict still passes. Light certifies the manifest + marker, not
1478        // the parts.
1479        let dir = tempfile::tempdir().unwrap();
1480        let m = build_manifest(
1481            vec![
1482                part(1, 10, 4, "xxh3:1111111111111111"),
1483                part(2, 20, 5, "xxh3:2222222222222222"),
1484            ],
1485            ManifestStatus::Success,
1486        );
1487        // Deliberately write NEITHER part — only manifest.json + _SUCCESS.
1488        write_dataset(dir.path(), &m, &[]);
1489        let dest = local_dest(dir.path());
1490
1491        let v = verify_at_destination(&dest, "", ValidateDepth::Light).unwrap();
1492        assert_eq!(v.depth_level, "light");
1493        assert_eq!(
1494            v.parts_verified, 0,
1495            "light skips the listing — no part is ever verified"
1496        );
1497        assert_eq!(
1498            v.parts_failed, 0,
1499            "no part reconcile means no part failures"
1500        );
1501        assert!(
1502            !v.failures.iter().any(|f| matches!(
1503                f,
1504                Failure::PartMissing { .. } | Failure::ListPrefixError { .. }
1505            )),
1506            "light must not surface part or list failures, got: {:?}",
1507            v.failures
1508        );
1509        assert!(
1510            v.success_marker_consistent,
1511            "_SUCCESS is still checked at light depth"
1512        );
1513        assert!(v.manifest_self_consistent);
1514        assert!(
1515            v.passed,
1516            "manifest + _SUCCESS are consistent, so a light pass certifies it"
1517        );
1518    }
1519
1520    #[test]
1521    fn light_depth_never_lists_so_a_list_failure_cannot_trip() {
1522        // Even with a destination whose `list_prefix` always errors, a light
1523        // pass succeeds: it never calls `list_prefix`, so no `ListPrefixError`.
1524        // This is the direct contrast to `list_failure_cannot_certify_parts…`
1525        // (which runs at Full and *does* fail on the list error).
1526        let dir = tempfile::tempdir().unwrap();
1527        let m = build_manifest(vec![part(0, 3, 3, "xxh3:0")], ManifestStatus::Success);
1528        write_dataset(dir.path(), &m, &[("part-000000.parquet", b"abc")]);
1529        let dest = ListFails(local_dest(dir.path()));
1530
1531        let v = verify_at_destination(&dest, "", ValidateDepth::Light).unwrap();
1532        assert_eq!(v.depth_level, "light");
1533        assert!(
1534            !v.failures
1535                .iter()
1536                .any(|f| matches!(f, Failure::ListPrefixError { .. })),
1537            "light never lists, so a failing list_prefix cannot surface, got: {:?}",
1538            v.failures
1539        );
1540        assert_eq!(v.parts_verified, 0);
1541        assert!(v.passed, "manifest + _SUCCESS consistent → light passes");
1542    }
1543
1544    #[test]
1545    fn sample_depth_runs_part_reconcile_like_full() {
1546        // `Sample` runs every section `verify_at_destination` owns (1-5) — the
1547        // Form B value re-read it skips lives in the *caller*, not here. So a
1548        // missing part is a fatal `PartMissing` at Sample, identical to Full.
1549        let dir = tempfile::tempdir().unwrap();
1550        let m = build_manifest(
1551            vec![
1552                part(1, 10, 4, "xxh3:1111111111111111"),
1553                part(2, 20, 5, "xxh3:2222222222222222"),
1554            ],
1555            ManifestStatus::Success,
1556        );
1557        write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]); // part 2 missing
1558        let dest = local_dest(dir.path());
1559
1560        let v = verify_at_destination(&dest, "", ValidateDepth::Sample).unwrap();
1561        assert_eq!(v.depth_level, "sample");
1562        assert_eq!(v.parts_verified, 1);
1563        assert_eq!(v.parts_failed, 1);
1564        assert!(!v.passed);
1565        assert!(
1566            v.failures
1567                .iter()
1568                .any(|f| matches!(f, Failure::PartMissing { part_id: 2, .. })),
1569            "sample reconciles parts just like full, got: {:?}",
1570            v.failures
1571        );
1572    }
1573
1574    #[test]
1575    fn depth_level_is_stamped_on_every_verdict_shape() {
1576        // The depth label rides the verdict even on the early-return shapes
1577        // (legacy / no manifest), so a consumer always sees how deep the pass
1578        // went.
1579        let dir = tempfile::tempdir().unwrap();
1580        let dest = local_dest(dir.path()); // empty prefix → legacy
1581        for depth in [
1582            ValidateDepth::Light,
1583            ValidateDepth::Sample,
1584            ValidateDepth::Full,
1585        ] {
1586            let v = verify_at_destination(&dest, "", depth).unwrap();
1587            assert!(v.legacy_run, "empty prefix is the legacy shape");
1588            assert_eq!(
1589                v.depth_level,
1590                depth.label(),
1591                "legacy verdict must still carry its depth label"
1592            );
1593        }
1594    }
1595
1596    #[test]
1597    fn error_code_is_stable_and_distinct_per_variant() {
1598        // Each variant maps to its documented `RIVET_VERIFY_*` code. A
1599        // regression guard: renaming a code is a silent break for any CI gate
1600        // keying off it.
1601        let cases: &[(Failure, &str)] = &[
1602            (
1603                Failure::PartMissing {
1604                    part_id: 1,
1605                    path: "p".into(),
1606                },
1607                "RIVET_VERIFY_PART_MISSING",
1608            ),
1609            (
1610                Failure::PartSizeMismatch {
1611                    part_id: 1,
1612                    path: "p".into(),
1613                    expected: 1,
1614                    actual: 2,
1615                },
1616                "RIVET_VERIFY_PART_SIZE_MISMATCH",
1617            ),
1618            (
1619                Failure::PartChecksumMismatch {
1620                    part_id: 1,
1621                    path: "p".into(),
1622                    expected: "a".into(),
1623                    actual: "b".into(),
1624                },
1625                "RIVET_VERIFY_PART_CHECKSUM_MISMATCH",
1626            ),
1627            (
1628                Failure::SuccessMarkerMalformed {
1629                    body_preview: "x".into(),
1630                },
1631                "RIVET_VERIFY_SUCCESS_MALFORMED",
1632            ),
1633            (
1634                Failure::SuccessMarkerStale {
1635                    marker_fingerprint: "a".into(),
1636                    manifest_fingerprint: "b".into(),
1637                },
1638                "RIVET_VERIFY_SUCCESS_STALE",
1639            ),
1640            (
1641                Failure::ManifestSelfInconsistent { detail: "d".into() },
1642                "RIVET_VERIFY_MANIFEST_INCONSISTENT",
1643            ),
1644            (
1645                Failure::ManifestReadError { detail: "d".into() },
1646                "RIVET_VERIFY_MANIFEST_READ_ERROR",
1647            ),
1648            (
1649                Failure::SuccessMarkerReadError { detail: "d".into() },
1650                "RIVET_VERIFY_SUCCESS_READ_ERROR",
1651            ),
1652            (
1653                Failure::ListPrefixError { detail: "d".into() },
1654                "RIVET_VERIFY_LIST_ERROR",
1655            ),
1656            (
1657                Failure::UntrackedObject {
1658                    key: "k".into(),
1659                    size_bytes: 1,
1660                },
1661                "RIVET_VERIFY_UNTRACKED_OBJECT",
1662            ),
1663            (
1664                Failure::ContentVerificationUnmet {
1665                    size_only: 1,
1666                    total: 2,
1667                },
1668                "RIVET_VERIFY_CONTENT_UNMET",
1669            ),
1670            (
1671                Failure::ManifestRequiredButAbsent { prefix: "p".into() },
1672                "RIVET_VERIFY_MANIFEST_REQUIRED",
1673            ),
1674        ];
1675        for (failure, code) in cases {
1676            assert_eq!(&failure.error_code(), code, "code for {failure:?}");
1677            assert!(
1678                failure.error_code().starts_with("RIVET_VERIFY_"),
1679                "every code shares the RIVET_VERIFY_ prefix"
1680            );
1681        }
1682    }
1683}