rivet/pipeline/validate_manifest.rs
1//! **Layer: Observability**
2//!
3//! Manifest-aware verification for `--validate` (ADR-0012 §M5 / §M6,
4//! constrained by the ADR-0013 trust-flag contract that says: no new flags;
5//! manifest-aware checks live under the existing `--validate`).
6//!
7//! Although this module reads from a `Destination` (technically L2 surface),
8//! it makes **no execution decisions**: it does not write data, advance
9//! cursors, mutate state, or change the pipeline path. Its only output is
10//! a structured `ManifestVerification` verdict the run report renders.
11//! Per ADR-0003, that places it firmly in L4 Observability — the
12//! destination read surface is just the carrier.
13//!
14//! Inputs (read-only):
15//! - the destination's `manifest.json` body
16//! - the destination's `_SUCCESS` body, if present
17//! - the listing of every object under the destination prefix
18//!
19//! Outputs:
20//! - [`ManifestVerification`] — a structured verdict the run report renders
21//! into the operator-facing "Verdicts" section.
22//!
23//! Out of scope here:
24//! - per-file row-count check (that runs *during* the export, against the
25//! local temp file before upload — see `pipeline::validate::validate_output`).
26//! - source-side reconciliation (lives in [`pipeline::reconcile_cmd`] and
27//! is what `--reconcile` adds on top of this).
28//! - re-fingerprinting parts (`--validate --deep`, future).
29//!
30//! Failure modes are explicit: each check produces a `Failure` enum variant
31//! that is rendered verbatim in `summary.json` so an Airflow / CI consumer
32//! can branch on the kind, not parse strings.
33
34use serde::{Deserialize, Serialize};
35
36use crate::destination::Destination;
37use crate::error::Result;
38use crate::manifest::{
39 MANIFEST_FILENAME, RunManifest, SUCCESS_FILENAME, join_key, parse_success_marker,
40 success_marker_body,
41};
42use crate::pipeline::manifest_reconcile::{PartPresence, reconcile_manifest_against_listing};
43
44/// Upper bound on a destination control artifact (`manifest.json`) the read
45/// path will materialise into memory. A `manifest.json` is metadata — a few
46/// KB to low single-digit MB even for very large datasets — so 64 MiB is far
47/// above any legitimate body while still bounding the blast radius.
48///
49/// Security (V21, CWE-400): the manifest readers `head()` an object then read
50/// its full body into a `Vec<u8>`. An attacker who can write the destination
51/// prefix (a shared bucket prefix, a world-writable export dir) can plant a
52/// multi-GB `manifest.json`; an unbounded read would OOM the next `--resume`,
53/// `--validate`, or `rivet repair`. [`read_capped`] consults the size the
54/// `head()` already reports and bails before the read when it exceeds this cap.
55pub(crate) const MANIFEST_MAX_BYTES: u64 = 64 * 1024 * 1024;
56
57/// How deep a `rivet validate` pass goes — a graded verify layer over the
58/// same checks, letting an operator trade thoroughness for latency / cost.
59///
60/// The variants are a strict superset chain: `Light ⊂ Sample ⊂ Full`. Each
61/// level runs every check the level below it does, plus more. Defined here
62/// (the pipeline layer) and re-exported for the CLI grammar so the **same**
63/// enum gates the checks in [`verify_at_destination`] and parses on the
64/// `--depth` flag — no CLI→pipeline back-dependency.
65///
66/// - **Light**: manifest read + self-consistency + `_SUCCESS` only. Skips the
67/// `list_prefix` reconcile (no per-part presence/size/checksum) and the
68/// untracked-surplus scan, leaving `parts_verified = 0`. One `head` + one
69/// `read` of `manifest.json` and `_SUCCESS` — a fast "is this prefix a
70/// complete, marked run?" poll with no prefix listing.
71/// - **Sample**: everything Light does **plus** the part reconcile and
72/// untracked surplus (one `list_prefix`). This is the pre-graded behaviour
73/// minus the Form B value re-read — full structural verification with no
74/// part downloads.
75/// - **Full** (default): everything Sample does **plus** the Form B value-
76/// checksum re-read (re-reads parts, re-derives per-column checksums). The
77/// most thorough and the only level that downloads part bodies. Equivalent
78/// to the pre-graded behaviour, so existing callers are unchanged.
79#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq, Default)]
80pub enum ValidateDepth {
81 /// Manifest read + self-consistency + `_SUCCESS` only (no prefix listing).
82 Light,
83 /// Light + part reconcile + untracked surplus (one `list_prefix`).
84 Sample,
85 /// Sample + the Form B value-checksum re-read (downloads parts).
86 #[default]
87 Full,
88}
89
90impl ValidateDepth {
91 /// True iff this level runs the `list_prefix` reconcile (part presence,
92 /// size, checksum) and the untracked-surplus scan — i.e. anything above
93 /// `Light`. The single predicate the section-3/5 depth gating keys off.
94 fn runs_part_reconcile(self) -> bool {
95 !matches!(self, ValidateDepth::Light)
96 }
97
98 /// True iff this level downloads part *bodies* — the CDC `__pos` continuity
99 /// check and the Form-B value-checksum re-read, both `Full`-only (the
100 /// `part_reconcile` level above only reads listing metadata). The single
101 /// predicate the section-3/5 part-download gating keys off, parallel to
102 /// [`Self::runs_part_reconcile`] — so adding a depth level edits the enum,
103 /// not the call sites in `validate_cmd`.
104 pub(crate) fn runs_part_download(self) -> bool {
105 matches!(self, ValidateDepth::Full)
106 }
107
108 /// Stable operator-/wire-facing label for the depth a verdict was produced
109 /// at, surfaced in `summary.json` (`depth_level`) and `rivet validate`
110 /// output.
111 pub fn label(self) -> &'static str {
112 match self {
113 ValidateDepth::Light => "light",
114 ValidateDepth::Sample => "sample",
115 ValidateDepth::Full => "full",
116 }
117 }
118}
119
120/// Read `key` into memory only if its `head()`-reported size is within
121/// `max_bytes`; otherwise bail without reading a single byte.
122///
123/// The single enforcement point for the V21 (CWE-400) manifest-read cap shared
124/// by the three control-artifact readers (`--resume` M8 preamble, `--validate`,
125/// `rivet repair`). Each previously did `head()` then an uncapped `read()`,
126/// discarding the size `head()` already returned; routing through here closes
127/// that gap in one place.
128///
129/// Behaviour:
130/// - object absent (`head` → `None`): `Err` — callers invoke this only after
131/// establishing the object exists, so an absent object here is a hard error,
132/// not the benign "no manifest / legacy prefix" case (which the callers
133/// detect with their own `head()` first).
134/// - oversized (`size_bytes > max_bytes`): `Err` naming the cap, **before** any
135/// body is materialised.
136/// - otherwise: the full body via [`Destination::read`].
137pub(crate) fn read_capped(dest: &dyn Destination, key: &str, max_bytes: u64) -> Result<Vec<u8>> {
138 match dest.head(key)? {
139 None => anyhow::bail!("'{key}' not found at the destination"),
140 Some(meta) => {
141 if meta.size_bytes > max_bytes {
142 anyhow::bail!(
143 "'{key}' is {} bytes, exceeding the {max_bytes}-byte control-artifact \
144 read cap — refusing to load it into memory (possible tampering)",
145 meta.size_bytes
146 );
147 }
148 dest.read(key)
149 }
150 }
151}
152
153/// Outcome of a single `--validate` pass over a destination prefix.
154///
155/// Stable enough to be embedded in `summary.json` directly (see
156/// `pipeline::report::ValidationOutcome`). Forward-compat: consumers MUST
157/// ignore unknown fields (no `deny_unknown_fields`).
158#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
159pub struct ManifestVerification {
160 /// True iff a `manifest.json` was found at the destination and parsed.
161 /// `false` triggers ADR-0012 M6 fallback (legacy run); higher-level
162 /// check results below are then "skipped" rather than "passed".
163 pub manifest_found: bool,
164 /// Mirrors ADR-0012 M6's required `legacy_run` operator-facing label.
165 pub legacy_run: bool,
166 /// Manifest parts whose presence and recorded `size_bytes` were
167 /// confirmed at the destination. 0 when no manifest was found.
168 pub parts_verified: usize,
169 /// Subset of `parts_verified` whose **content** was confirmed via an MD5
170 /// the store surfaced in its listing (no download) — the rest are size-only.
171 /// Lets `passed: true` say how much of the dataset was content-checked
172 /// rather than implying all of it was. `#[serde(default)]` for back-compat.
173 #[serde(default)]
174 pub parts_md5_verified: usize,
175 /// Manifest parts that were declared `committed` but not actually
176 /// present, present at a different size, or otherwise mismatched.
177 pub parts_failed: usize,
178 /// True iff `_SUCCESS` exists at the destination AND its body matches
179 /// the fingerprint of the bytes we read for `manifest.json`. An
180 /// existing `_SUCCESS` whose body diverges from the manifest is itself
181 /// an integrity failure — surfaced via `failures`.
182 pub success_marker_consistent: bool,
183 /// Self-consistency of the manifest (`row_count`, `part_count`,
184 /// duplicate `part_id`s). Skipped when `manifest_found = false`.
185 pub manifest_self_consistent: bool,
186 /// Final verdict, **derived** (not hand-maintained) — `manifest_found` and
187 /// no *fatal* failure ([`Failure::is_fatal`]). Stored so it stays in the
188 /// `summary.json` contract, but computed in one place
189 /// ([`ManifestVerification::recompute_passed`]) so a new failure variant is
190 /// fatal by default rather than relying on every site to flip a bool.
191 pub passed: bool,
192 /// Per-failure detail. May be non-empty with `passed = true` for advisory
193 /// (non-fatal) failures like [`Failure::UntrackedObject`]. Stable variant
194 /// set; new variants land under a new manifest version per ADR-0012.
195 pub failures: Vec<Failure>,
196 /// The graded depth this verdict was produced at: `"light"`, `"sample"`,
197 /// or `"full"` (see [`ValidateDepth`]). Lets a consumer of `summary.json`
198 /// tell **how much** was actually checked — a `passed: true` at `"light"`
199 /// asserts far less than at `"full"` (no part presence was verified).
200 /// `#[serde(default)]` (→ `"full"`) for back-compat: pre-graded verdicts
201 /// always ran the full pass.
202 #[serde(default = "default_depth_level")]
203 pub depth_level: String,
204}
205
206/// `serde(default)` for [`ManifestVerification::depth_level`]: a verdict that
207/// predates the graded layer always ran the full pass, so an absent field
208/// deserializes to `"full"`.
209fn default_depth_level() -> String {
210 ValidateDepth::Full.label().to_string()
211}
212
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214#[serde(tag = "kind", rename_all = "snake_case")]
215pub enum Failure {
216 /// Manifest declared a part that does not exist at the destination.
217 PartMissing { part_id: u32, path: String },
218 /// Manifest declared a part whose actual size differs from `size_bytes`.
219 PartSizeMismatch {
220 part_id: u32,
221 path: String,
222 expected: u64,
223 actual: u64,
224 },
225 /// Part present at the recorded size but its content MD5 (from the store's
226 /// listing metadata) differs from the manifest's — transit / at-rest
227 /// corruption, caught with no download.
228 PartChecksumMismatch {
229 part_id: u32,
230 path: String,
231 expected: String,
232 actual: String,
233 },
234 /// `_SUCCESS` exists but its body is malformed (not `xxh3:<16-hex>` after
235 /// trim). ADR-0012 M2 — orchestrators rely on this format being strict.
236 SuccessMarkerMalformed { body_preview: String },
237 /// `_SUCCESS` body parsed but does not match `xxh3(manifest.json bytes)`.
238 /// Two legitimate sources: (a) someone overwrote `_SUCCESS` after the
239 /// manifest was rewritten — orchestrator bug; (b) the manifest was
240 /// edited in place after the run — operator bug. Either way the
241 /// manifest is no longer trustworthy.
242 SuccessMarkerStale {
243 marker_fingerprint: String,
244 manifest_fingerprint: String,
245 },
246 /// `RunManifest::validate_self_consistency` rejected the manifest.
247 /// Usually a writer bug (declared row_count != sum of committed parts'
248 /// rows); blocks the rest of the verification because the manifest
249 /// itself is unreliable.
250 ManifestSelfInconsistent { detail: String },
251 /// Reading `manifest.json` returned an I/O error other than "absent".
252 ManifestReadError { detail: String },
253 /// Reading `_SUCCESS` returned an I/O error other than "absent".
254 SuccessMarkerReadError { detail: String },
255 /// Listing the destination prefix returned an I/O error. Reduces the
256 /// untracked-parts check (M5 surplus) to a no-op for this run.
257 ListPrefixError { detail: String },
258 /// A file is present at the destination prefix but no manifest entry
259 /// references it. M9-adjacent: `--validate` only flags it; quarantine
260 /// belongs to `--resume`.
261 UntrackedObject { key: String, size_bytes: u64 },
262 /// The export declared `verify: content` but some parts could only be
263 /// size-verified (no comparable content checksum from the store) — the
264 /// declared integrity contract was not met.
265 ContentVerificationUnmet { size_only: usize, total: usize },
266 /// A manifest was *required* at this prefix (the operator pinned a literal
267 /// `--prefix`, asserting a real dataset lives here) but none was found.
268 /// Without this, an absent manifest at an operator-pinned prefix maps to
269 /// the M6 legacy-run label and exits 0 — indistinguishable from a verified
270 /// run, so a CI gate `rivet validate && deploy` sails past a destination
271 /// that was never written. Fatal: a required-but-missing manifest is a
272 /// refusal reason, not a "cannot certify" advisory.
273 ManifestRequiredButAbsent { prefix: String },
274}
275
276impl Failure {
277 /// Whether this failure invalidates the dataset (flips `passed` to false).
278 ///
279 /// Every variant is fatal **except** [`Failure::UntrackedObject`]: surplus
280 /// objects are an audit signal whose cleanup is `--resume`'s job (ADR-0012
281 /// M9), not a corruption of the manifest-listed parts. New variants are
282 /// fatal by default — opt out here explicitly, so a forgotten case fails
283 /// closed (safe) rather than silently passing.
284 pub fn is_fatal(&self) -> bool {
285 !matches!(self, Failure::UntrackedObject { .. })
286 }
287
288 /// Stable `RIVET_VERIFY_*` error code for this failure variant.
289 ///
290 /// One code per variant, intended for orchestrators / CI to branch on
291 /// without parsing the human `Display` string or the per-variant JSON
292 /// fields. The code is part of the wire contract: it is emitted next to
293 /// `kind` in the JSON report and prefixed in brackets on each pretty line.
294 /// Codes are append-only — never renamed once shipped (a renamed code is a
295 /// silent break for any consumer keying off it).
296 pub fn error_code(&self) -> &'static str {
297 match self {
298 Failure::PartMissing { .. } => "RIVET_VERIFY_PART_MISSING",
299 Failure::PartSizeMismatch { .. } => "RIVET_VERIFY_PART_SIZE_MISMATCH",
300 Failure::PartChecksumMismatch { .. } => "RIVET_VERIFY_PART_CHECKSUM_MISMATCH",
301 Failure::SuccessMarkerMalformed { .. } => "RIVET_VERIFY_SUCCESS_MALFORMED",
302 Failure::SuccessMarkerStale { .. } => "RIVET_VERIFY_SUCCESS_STALE",
303 Failure::ManifestSelfInconsistent { .. } => "RIVET_VERIFY_MANIFEST_INCONSISTENT",
304 Failure::ManifestReadError { .. } => "RIVET_VERIFY_MANIFEST_READ_ERROR",
305 Failure::SuccessMarkerReadError { .. } => "RIVET_VERIFY_SUCCESS_READ_ERROR",
306 Failure::ListPrefixError { .. } => "RIVET_VERIFY_LIST_ERROR",
307 Failure::UntrackedObject { .. } => "RIVET_VERIFY_UNTRACKED_OBJECT",
308 Failure::ContentVerificationUnmet { .. } => "RIVET_VERIFY_CONTENT_UNMET",
309 Failure::ManifestRequiredButAbsent { .. } => "RIVET_VERIFY_MANIFEST_REQUIRED",
310 }
311 }
312}
313
314impl std::fmt::Display for Failure {
315 /// One operator-facing line per failure variant. Used by:
316 /// - `pipeline::report::render_markdown` (summary.md "failure:" lines)
317 /// - `pipeline::validate_cmd::render_pretty` (`rivet validate` stdout)
318 /// - any future consumer that wants a human-readable failure label
319 ///
320 /// The wire format (`failures[].kind` + per-variant fields) lives in
321 /// the `Serialize` derive above and is the contract Airflow / CI
322 /// consumers branch on. This `Display` impl is for humans only.
323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324 match self {
325 Failure::PartMissing { part_id, path } => {
326 write!(f, "part {} missing at {}", part_id, path)
327 }
328 Failure::PartSizeMismatch {
329 part_id,
330 path,
331 expected,
332 actual,
333 } => write!(
334 f,
335 "part {} size mismatch at {}: manifest {}, dest {}",
336 part_id, path, expected, actual
337 ),
338 Failure::PartChecksumMismatch {
339 part_id,
340 path,
341 expected,
342 actual,
343 } => write!(
344 f,
345 "part {} content mismatch at {}: manifest md5 {}, dest {}",
346 part_id, path, expected, actual
347 ),
348 Failure::SuccessMarkerMalformed { body_preview } => {
349 write!(f, "_SUCCESS body malformed: {body_preview:?}")
350 }
351 Failure::SuccessMarkerStale {
352 marker_fingerprint,
353 manifest_fingerprint,
354 } => write!(
355 f,
356 "_SUCCESS body {} != manifest fingerprint {} (stale marker)",
357 marker_fingerprint, manifest_fingerprint
358 ),
359 Failure::ManifestSelfInconsistent { detail } => {
360 write!(f, "manifest self-consistency: {detail}")
361 }
362 Failure::ManifestReadError { detail } => {
363 write!(f, "manifest read error: {detail}")
364 }
365 Failure::SuccessMarkerReadError { detail } => {
366 write!(f, "_SUCCESS read error: {detail}")
367 }
368 Failure::ListPrefixError { detail } => {
369 write!(f, "destination listing error: {detail}")
370 }
371 Failure::UntrackedObject { key, size_bytes } => {
372 write!(f, "untracked object: {} ({} bytes)", key, size_bytes)
373 }
374 Failure::ContentVerificationUnmet { size_only, total } => write!(
375 f,
376 "verify: content not met — {size_only} of {total} part(s) only \
377 size-verified (no store checksum); lower max_file_size so parts \
378 upload as a single PUT, or the backend exposes no checksum"
379 ),
380 Failure::ManifestRequiredButAbsent { prefix } => write!(
381 f,
382 "no manifest at {prefix}: a manifest was required here (operator \
383 pinned --prefix) but none was found — this prefix was never \
384 written, or the data was relocated. Run the export first, or \
385 drop --prefix to validate the config-resolved destination."
386 ),
387 }
388 }
389}
390
391impl ManifestVerification {
392 /// Base verdict: nothing checked yet (no manifest, all counts zero, all
393 /// sub-checks false, `passed = false`). Every constructor builds on this
394 /// and overrides only what differs, so a new field lands in **one** place
395 /// rather than several near-identical literals.
396 fn empty() -> Self {
397 Self {
398 manifest_found: false,
399 legacy_run: false,
400 parts_verified: 0,
401 parts_md5_verified: 0,
402 parts_failed: 0,
403 success_marker_consistent: false,
404 manifest_self_consistent: false,
405 passed: false,
406 failures: Vec::new(),
407 // Base level; `verify_at_destination` overwrites this with the
408 // depth it was actually called at before returning any verdict.
409 depth_level: default_depth_level(),
410 }
411 }
412
413 /// Recompute `passed` from the verdict's facts: a manifest was found and no
414 /// **fatal** failure was recorded (advisory failures like `UntrackedObject`
415 /// don't count). The single source of truth — callers set failures and
416 /// call this once, rather than flipping `passed` by hand at every site.
417 fn recompute_passed(&mut self) {
418 self.passed = self.manifest_found && !self.failures.iter().any(Failure::is_fatal);
419 }
420
421 /// Apply the export's `verify` policy (ADR-0013 / review D). When content
422 /// verification is required but some parts were only size-verified, record
423 /// a fatal [`Failure::ContentVerificationUnmet`] and re-derive `passed`.
424 /// Policy lives here (one place); the composers — run finalize and the
425 /// `rivet validate` command — just call it with their export's intent.
426 pub fn enforce_content_policy(&mut self, require_content: bool) {
427 if require_content && self.manifest_found {
428 let size_only = self.parts_verified.saturating_sub(self.parts_md5_verified);
429 if size_only > 0 {
430 self.failures.push(Failure::ContentVerificationUnmet {
431 size_only,
432 total: self.parts_verified,
433 });
434 self.recompute_passed();
435 }
436 }
437 }
438
439 /// Apply the "a manifest must exist here" policy (finding #20). When the
440 /// operator pinned a literal `--prefix`, an absent manifest is no longer the
441 /// benign M6 legacy-run case — it almost always means the prefix was never
442 /// written (a misconfigured CI gate). Convert that exact verdict — no
443 /// manifest, no other failure (i.e. the [`ManifestVerification::legacy`]
444 /// shape) — into a fatal [`Failure::ManifestRequiredButAbsent`] so the exit
445 /// gate refuses it loudly instead of silently passing.
446 ///
447 /// Deliberately a no-op for every other shape: a real manifest (passed or
448 /// failed), or an absent manifest that already carries a `ManifestReadError`
449 /// / head failure, is left untouched — those are already classified. Only
450 /// the "legacy / cannot certify" case is escalated, and only when required.
451 pub fn require_manifest_present(&mut self, prefix: &str) {
452 if !self.manifest_found && !self.has_failures() {
453 self.legacy_run = false;
454 self.failures.push(Failure::ManifestRequiredButAbsent {
455 prefix: prefix.to_string(),
456 });
457 self.recompute_passed();
458 }
459 }
460
461 /// Construct the M6 (legacy run) verdict for a destination that has no
462 /// manifest at all. Caller composes this with the existing per-file
463 /// row-count check; together they form the legacy `--validate` result.
464 pub fn legacy() -> Self {
465 // `passed = false` is intentional — not "validation failed" but "this
466 // verifier cannot certify"; the caller layers per-file row counts on
467 // top and composes the final verdict.
468 Self {
469 legacy_run: true,
470 ..Self::empty()
471 }
472 }
473
474 /// True iff this verification surfaced any explicit failure (i.e. a
475 /// reason an orchestrator should refuse the run). Distinct from
476 /// `!passed`, which can also mean "legacy / not applicable".
477 pub fn has_failures(&self) -> bool {
478 !self.failures.is_empty()
479 }
480}
481
482/// Run the manifest-aware verification at `manifest_dir` (the destination-
483/// relative directory containing `manifest.json` and `_SUCCESS`).
484///
485/// `manifest_dir` is the same key shape `Destination::write` was called with
486/// for the manifest itself — typically empty (`""`) for prefix-rooted runs,
487/// or the per-export sub-directory. Trailing `/` is optional.
488///
489/// This function does not panic on any expected I/O outcome — every read
490/// failure becomes a `Failure::*ReadError` so the caller can render a
491/// useful message instead of bailing.
492///
493/// `depth` selects the graded verify layer (see [`ValidateDepth`]):
494/// - [`ValidateDepth::Light`] skips section 3 (the `list_prefix` part
495/// reconcile) and section 5 (untracked surplus), leaving `parts_verified`
496/// at 0 — a fast manifest + `_SUCCESS` poll with no prefix listing.
497/// - [`ValidateDepth::Sample`] and [`ValidateDepth::Full`] run all five
498/// sections here. The Form B value re-read is **not** in this function;
499/// it is the caller's concern (`run_validate_command`), gated on `Full`.
500///
501/// Regardless of depth, `depth_level` on the returned verdict records the
502/// level this pass ran at.
503pub fn verify_at_destination(
504 dest: &dyn Destination,
505 manifest_dir: &str,
506 depth: ValidateDepth,
507) -> Result<ManifestVerification> {
508 let manifest_key = join_key(manifest_dir, MANIFEST_FILENAME);
509 let success_key = join_key(manifest_dir, SUCCESS_FILENAME);
510
511 // Stamp the depth this pass ran at onto every verdict before it leaves the
512 // function — including the early-return error/legacy shapes — so a consumer
513 // always sees *how much* was checked. Each `return Ok(v)` below routes
514 // through `with_depth` (or sets `out.depth_level` for the main path).
515 let with_depth = |mut v: ManifestVerification| -> ManifestVerification {
516 v.depth_level = depth.label().to_string();
517 v
518 };
519
520 // ── 1. Manifest read ───────────────────────────────────────────────
521 //
522 // Error-consistency contract: every I/O outcome here surfaces as a
523 // structured `Failure` variant rather than as `Err`. An operator gets
524 // one verdict shape regardless of whether the destination is missing,
525 // permission-denied, or temporarily unreachable. The bubbled `Err`
526 // path is reserved for *programmer* errors (caller passes a malformed
527 // `manifest_dir`, a future destination breaks an internal invariant).
528 let manifest_bytes = match dest.head(&manifest_key) {
529 Ok(None) => return Ok(with_depth(ManifestVerification::legacy())),
530 Ok(Some(_)) => match read_capped(dest, &manifest_key, MANIFEST_MAX_BYTES) {
531 Ok(b) => b,
532 Err(e) => {
533 let mut v = ManifestVerification::legacy();
534 v.legacy_run = false;
535 v.failures.push(Failure::ManifestReadError {
536 detail: format!("{e:#}"),
537 });
538 v.passed = false;
539 return Ok(with_depth(v));
540 }
541 },
542 Err(e) => {
543 // `head` failure is symmetric to a `read` failure — same kind
544 // (`ManifestReadError`) so consumers don't have to branch on
545 // which method tripped. Distinct from "manifest absent"
546 // (Ok(None) above) which legitimately means "legacy prefix".
547 let mut v = ManifestVerification::legacy();
548 v.legacy_run = false;
549 v.failures.push(Failure::ManifestReadError {
550 detail: format!("manifest head failed: {e:#}"),
551 });
552 v.passed = false;
553 return Ok(with_depth(v));
554 }
555 };
556
557 let manifest: RunManifest = match serde_json::from_slice(&manifest_bytes) {
558 Ok(m) => m,
559 Err(e) => {
560 // A malformed manifest is treated as a self-inconsistency —
561 // semantically equivalent for the operator (the manifest can't
562 // be trusted) but kept distinct in `failures` so the kind is
563 // explicit on the wire.
564 return Ok(with_depth(ManifestVerification {
565 manifest_found: true,
566 failures: vec![Failure::ManifestSelfInconsistent {
567 detail: format!("manifest.json parse failed: {e}"),
568 }],
569 ..ManifestVerification::empty()
570 }));
571 }
572 };
573
574 // Optimistic base: a found, self-consistent manifest that passes until a
575 // check below flips it. Overrides only what differs from `empty()`.
576 // Stamp the depth here so the two early `return Ok(out)` paths in section
577 // 4 (success-marker head error, non-utf8 body) carry the right level too.
578 let mut out = ManifestVerification {
579 manifest_found: true,
580 manifest_self_consistent: true,
581 passed: true,
582 depth_level: depth.label().to_string(),
583 ..ManifestVerification::empty()
584 };
585
586 // ── 2. Self-consistency ─────────────────────────────────────────────
587 if let Err(e) = manifest.validate_self_consistency() {
588 out.manifest_self_consistent = false;
589 out.failures.push(Failure::ManifestSelfInconsistent {
590 detail: format!("{e}"),
591 });
592 // Don't short-circuit — we still want to surface part-presence
593 // failures because the operator may want to know both classes at
594 // once rather than fix-then-rerun.
595 }
596
597 // ── 3. Reconcile parts + surplus against ONE prefix listing ────────
598 //
599 // Presence and untracked-surplus both fall out of a single
600 // `reconcile_manifest_against_listing` over one `list_prefix` — the same
601 // pure walk chunked resume uses (`build_resume_plan`). This replaces the
602 // old per-part `HEAD` loop (N round-trips) and its separate untracked
603 // listing. Per-part failures are emitted here (step 3); untracked is
604 // emitted at step 5 so the failure ordering an operator reads is stable.
605 //
606 // Trade-off: presence now rides the listing, not per-part `HEAD`. If the
607 // listing cannot be read, an audit cannot certify the parts — so a list
608 // failure flips `passed = false` (a `ListPrefixError`), rather than the
609 // old behaviour where per-part HEAD still "verified" parts a failed
610 // listing couldn't enumerate. Every Rivet destination backend offers
611 // strong read-after-write list consistency, so the happy path is one call.
612 //
613 // Graded depth: `Light` skips this `list_prefix` entirely — no part
614 // reconcile, no `ListPrefixError`, `parts_verified` stays 0, and section 5
615 // (untracked) is a no-op since `reconciliation` is `None`. `Sample` and
616 // `Full` run it. A `Light` pass therefore certifies only that the
617 // manifest reads, is self-consistent, and `_SUCCESS` matches — never that
618 // the parts are physically present.
619 let reconciliation = if depth.runs_part_reconcile() {
620 match dest.list_prefix(manifest_dir) {
621 Ok(listing) => Some(reconcile_manifest_against_listing(
622 &manifest,
623 &listing,
624 manifest_dir,
625 )),
626 Err(e) => {
627 out.failures.push(Failure::ListPrefixError {
628 detail: format!("{e:#}"),
629 });
630 None
631 }
632 }
633 } else {
634 None
635 };
636 if let Some(rec) = &reconciliation {
637 for check in &rec.per_part {
638 match &check.presence {
639 PartPresence::Present { md5_verified } => {
640 out.parts_verified += 1;
641 if *md5_verified {
642 out.parts_md5_verified += 1;
643 }
644 }
645 PartPresence::SizeMismatch { expected, actual } => {
646 out.parts_failed += 1;
647 out.failures.push(Failure::PartSizeMismatch {
648 part_id: check.part_id,
649 path: check.path.clone(),
650 expected: *expected,
651 actual: *actual,
652 });
653 }
654 PartPresence::Missing => {
655 out.parts_failed += 1;
656 out.failures.push(Failure::PartMissing {
657 part_id: check.part_id,
658 path: check.path.clone(),
659 });
660 }
661 PartPresence::ChecksumMismatch { expected, actual } => {
662 out.parts_failed += 1;
663 out.failures.push(Failure::PartChecksumMismatch {
664 part_id: check.part_id,
665 path: check.path.clone(),
666 expected: expected.clone(),
667 actual: actual.clone(),
668 });
669 }
670 }
671 }
672 }
673
674 // ── 4. _SUCCESS marker consistency ─────────────────────────────────
675 //
676 // Same error-consistency contract as step 1: head/read failures become
677 // `Failure::SuccessMarkerReadError`, not bubbled `Err`. Absent marker
678 // (Ok(None)) stays informational, not a failure (M2: only successful
679 // runs land _SUCCESS, so its absence on a failed manifest is correct).
680 let success_head = match dest.head(&success_key) {
681 Ok(h) => h,
682 Err(e) => {
683 out.failures.push(Failure::SuccessMarkerReadError {
684 detail: format!("_SUCCESS head failed: {e:#}"),
685 });
686 out.recompute_passed();
687 return Ok(out);
688 }
689 };
690 match success_head {
691 None => {
692 // Absent _SUCCESS is informational, not a failure: per ADR-0012
693 // M2, only successful runs land it. A failed-then-rewritten
694 // manifest legitimately lacks _SUCCESS. Leave
695 // `success_marker_consistent = false` (this is a "no signal"
696 // bool, not a "broken" bool) and let the caller decide.
697 }
698 Some(_) => match dest.read(&success_key) {
699 Err(e) => {
700 out.failures.push(Failure::SuccessMarkerReadError {
701 detail: format!("{e:#}"),
702 });
703 }
704 Ok(body) => {
705 let body_str = match std::str::from_utf8(&body) {
706 Ok(s) => s,
707 Err(_) => {
708 out.failures.push(Failure::SuccessMarkerMalformed {
709 body_preview: format!("(non-utf8, {} bytes)", body.len()),
710 });
711 out.recompute_passed();
712 return Ok(out);
713 }
714 };
715 match parse_success_marker(body_str) {
716 None => {
717 out.failures.push(Failure::SuccessMarkerMalformed {
718 body_preview: preview(body_str),
719 });
720 }
721 Some(marker_fp) => {
722 let manifest_fp = success_marker_body(&manifest_bytes);
723 // success_marker_body returns the trailing `\n`
724 // form; trim before comparing to the parsed marker
725 // (which already trims).
726 let manifest_fp_trimmed = manifest_fp.trim_end_matches('\n');
727 if marker_fp == manifest_fp_trimmed {
728 out.success_marker_consistent = true;
729 } else {
730 out.failures.push(Failure::SuccessMarkerStale {
731 marker_fingerprint: marker_fp.to_string(),
732 manifest_fingerprint: manifest_fp_trimmed.to_string(),
733 });
734 }
735 }
736 }
737 }
738 },
739 }
740
741 // ── 5. Untracked surplus ───────────────────────────────────────────
742 //
743 // Already computed by the step-3 reconciliation (sidecars, quarantine,
744 // and the doctor probe are filtered there). Emit it last so the failure
745 // ordering stays parts → marker → untracked. A list failure left
746 // `reconciliation = None` and already flipped `passed` above.
747 if let Some(rec) = reconciliation {
748 for obj in rec.untracked {
749 out.failures.push(Failure::UntrackedObject {
750 key: obj.key,
751 size_bytes: obj.size_bytes,
752 });
753 }
754 }
755
756 out.recompute_passed();
757 Ok(out)
758}
759
760/// Truncate `s` to a small printable preview for error messages.
761fn preview(s: &str) -> String {
762 let trimmed: String = s.chars().take(40).collect();
763 if s.chars().count() > 40 {
764 format!("{trimmed}…")
765 } else {
766 trimmed
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use crate::config::{DestinationConfig, DestinationType};
774 use crate::destination::local::LocalDestination;
775 use crate::manifest::{
776 MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
777 PartStatus, RunManifest,
778 };
779 use std::path::Path;
780
781 fn local_dest(base: &Path) -> LocalDestination {
782 LocalDestination::new(&DestinationConfig {
783 destination_type: DestinationType::Local,
784 path: Some(base.to_string_lossy().into_owned()),
785 ..Default::default()
786 })
787 .unwrap()
788 }
789
790 fn part(part_id: u32, rows: i64, size: u64, fp: &str) -> ManifestPart {
791 ManifestPart {
792 part_id,
793 path: format!("part-{part_id:06}.parquet"),
794 rows,
795 size_bytes: size,
796 content_fingerprint: fp.into(),
797 content_md5: String::new(),
798 status: PartStatus::Committed,
799 }
800 }
801
802 fn build_manifest(parts: Vec<ManifestPart>, status: ManifestStatus) -> RunManifest {
803 let row_count: i64 = parts
804 .iter()
805 .filter(|p| p.status == PartStatus::Committed)
806 .map(|p| p.rows)
807 .sum();
808 let part_count = parts
809 .iter()
810 .filter(|p| p.status == PartStatus::Committed)
811 .count() as u32;
812 RunManifest {
813 manifest_version: MANIFEST_VERSION,
814 run_id: "r".into(),
815 export_name: "public.orders".into(),
816 started_at: "2026-05-21T12:00:00Z".into(),
817 finished_at: "2026-05-21T12:01:00Z".into(),
818 status,
819 source: ManifestSource {
820 engine: "postgres".into(),
821 schema: Some("public".into()),
822 table: Some("orders".into()),
823 },
824 destination: ManifestDestination {
825 kind: "local".into(),
826 uri: "file:///tmp/out".into(),
827 },
828 format: "parquet".into(),
829 compression: "zstd".into(),
830 schema_fingerprint: "xxh3:0123456789abcdef".into(),
831 row_count,
832 part_count,
833 parts,
834 column_checksums: None,
835 checksum_key_column: None,
836 }
837 }
838
839 /// Lay out a clean dataset with manifest + _SUCCESS at the root.
840 fn write_dataset(dir: &Path, m: &RunManifest, parts_with_bytes: &[(&str, &[u8])]) {
841 for (name, bytes) in parts_with_bytes {
842 std::fs::write(dir.join(name), bytes).unwrap();
843 }
844 let body = serde_json::to_vec_pretty(m).unwrap();
845 std::fs::write(dir.join(MANIFEST_FILENAME), &body).unwrap();
846 if matches!(m.status, ManifestStatus::Success) {
847 std::fs::write(dir.join(SUCCESS_FILENAME), success_marker_body(&body)).unwrap();
848 }
849 }
850
851 // ── happy path ───────────────────────────────────────────────────────
852
853 #[test]
854 fn happy_path_verifies_all_parts_and_success_marker() {
855 let dir = tempfile::tempdir().unwrap();
856 let m = build_manifest(
857 vec![
858 part(1, 10, 4, "xxh3:1111111111111111"),
859 part(2, 20, 5, "xxh3:2222222222222222"),
860 ],
861 ManifestStatus::Success,
862 );
863 write_dataset(
864 dir.path(),
865 &m,
866 &[
867 ("part-000001.parquet", b"AAAA"),
868 ("part-000002.parquet", b"BBBBB"),
869 ],
870 );
871 let dest = local_dest(dir.path());
872
873 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
874 assert!(v.manifest_found);
875 assert!(!v.legacy_run);
876 assert_eq!(v.parts_verified, 2);
877 assert_eq!(v.parts_failed, 0);
878 assert!(v.success_marker_consistent);
879 assert!(v.manifest_self_consistent);
880 assert!(v.passed);
881 assert!(v.failures.is_empty());
882 }
883
884 // ── M6 legacy run ───────────────────────────────────────────────────
885
886 #[test]
887 fn no_manifest_returns_legacy_run_label() {
888 // Empty prefix — no manifest, no parts.
889 let dir = tempfile::tempdir().unwrap();
890 let dest = local_dest(dir.path());
891 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
892 assert!(!v.manifest_found);
893 assert!(v.legacy_run);
894 assert_eq!(v.parts_verified, 0);
895 assert!(!v.passed);
896 assert!(v.failures.is_empty(), "no failures, just a legacy label");
897 }
898
899 // ── M5 part-presence failures ───────────────────────────────────────
900
901 #[test]
902 fn missing_part_is_flagged_with_part_id_and_path() {
903 let dir = tempfile::tempdir().unwrap();
904 let m = build_manifest(
905 vec![
906 part(1, 10, 4, "xxh3:1111111111111111"),
907 part(2, 20, 5, "xxh3:2222222222222222"),
908 ],
909 ManifestStatus::Success,
910 );
911 write_dataset(
912 dir.path(),
913 &m,
914 &[("part-000001.parquet", b"AAAA")], // part 2 missing
915 );
916 let dest = local_dest(dir.path());
917
918 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
919 assert_eq!(v.parts_verified, 1);
920 assert_eq!(v.parts_failed, 1);
921 assert!(!v.passed);
922 assert!(
923 v.failures
924 .iter()
925 .any(|f| matches!(f, Failure::PartMissing { part_id: 2, .. }))
926 );
927 }
928
929 #[test]
930 fn part_size_mismatch_is_flagged_with_expected_and_actual() {
931 let dir = tempfile::tempdir().unwrap();
932 let m = build_manifest(
933 vec![part(1, 10, 4, "xxh3:1111111111111111")],
934 ManifestStatus::Success,
935 );
936 // Manifest claims 4 bytes; we write 6.
937 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"OOPSIE")]);
938 let dest = local_dest(dir.path());
939
940 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
941 assert!(!v.passed);
942 let mismatch = v
943 .failures
944 .iter()
945 .find_map(|f| match f {
946 Failure::PartSizeMismatch {
947 part_id,
948 expected,
949 actual,
950 ..
951 } => Some((*part_id, *expected, *actual)),
952 _ => None,
953 })
954 .expect("must surface the size mismatch");
955 assert_eq!(mismatch, (1, 4, 6));
956 }
957
958 // ── _SUCCESS marker integrity ───────────────────────────────────────
959
960 #[test]
961 fn stale_success_marker_is_flagged_as_inconsistent() {
962 // Write a manifest, then overwrite _SUCCESS with the marker for a
963 // *different* manifest body — simulating an orchestrator that
964 // mishandled a re-run.
965 let dir = tempfile::tempdir().unwrap();
966 let m = build_manifest(
967 vec![part(1, 10, 4, "xxh3:1111111111111111")],
968 ManifestStatus::Success,
969 );
970 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
971 std::fs::write(
972 dir.path().join(SUCCESS_FILENAME),
973 success_marker_body(b"different manifest body"),
974 )
975 .unwrap();
976 let dest = local_dest(dir.path());
977
978 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
979 assert!(!v.success_marker_consistent);
980 assert!(!v.passed);
981 assert!(
982 v.failures
983 .iter()
984 .any(|f| matches!(f, Failure::SuccessMarkerStale { .. }))
985 );
986 }
987
988 #[test]
989 fn malformed_success_marker_body_is_flagged() {
990 let dir = tempfile::tempdir().unwrap();
991 let m = build_manifest(
992 vec![part(1, 10, 4, "xxh3:1111111111111111")],
993 ManifestStatus::Success,
994 );
995 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
996 std::fs::write(dir.path().join(SUCCESS_FILENAME), b"not even xxh3 shaped").unwrap();
997 let dest = local_dest(dir.path());
998
999 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1000 assert!(!v.passed);
1001 assert!(
1002 v.failures
1003 .iter()
1004 .any(|f| matches!(f, Failure::SuccessMarkerMalformed { .. }))
1005 );
1006 }
1007
1008 #[test]
1009 fn absent_success_marker_does_not_fail_validation_alone() {
1010 // ADR-0012 M2: only successful runs land _SUCCESS. A failed-then-
1011 // rewritten manifest legitimately lacks one — verification must
1012 // not flip `passed` just for that.
1013 let dir = tempfile::tempdir().unwrap();
1014 let m = build_manifest(
1015 vec![part(1, 10, 4, "xxh3:1111111111111111")],
1016 ManifestStatus::Failed,
1017 );
1018 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1019 // Note: write_dataset only writes _SUCCESS for status == Success,
1020 // so no marker exists here.
1021 assert!(!dir.path().join(SUCCESS_FILENAME).exists());
1022 let dest = local_dest(dir.path());
1023
1024 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1025 assert!(v.manifest_found);
1026 assert!(
1027 !v.success_marker_consistent,
1028 "no marker => false (no signal)"
1029 );
1030 // The parts still verified, so passed = true.
1031 assert!(v.passed);
1032 assert!(v.failures.is_empty());
1033 }
1034
1035 // ── self-consistency ────────────────────────────────────────────────
1036
1037 #[test]
1038 fn self_inconsistent_manifest_is_flagged_but_part_check_still_runs() {
1039 let dir = tempfile::tempdir().unwrap();
1040 let mut m = build_manifest(
1041 vec![part(1, 10, 4, "xxh3:1111111111111111")],
1042 ManifestStatus::Success,
1043 );
1044 m.row_count = 9999; // lie
1045
1046 let body = serde_json::to_vec_pretty(&m).unwrap();
1047 std::fs::write(dir.path().join("part-000001.parquet"), b"AAAA").unwrap();
1048 std::fs::write(dir.path().join(MANIFEST_FILENAME), &body).unwrap();
1049 std::fs::write(
1050 dir.path().join(SUCCESS_FILENAME),
1051 success_marker_body(&body),
1052 )
1053 .unwrap();
1054 let dest = local_dest(dir.path());
1055
1056 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1057 assert!(v.manifest_found);
1058 assert!(!v.manifest_self_consistent);
1059 assert!(!v.passed);
1060 // Parts that are physically present still get their `parts_verified`
1061 // counter bumped — both signals are independently useful.
1062 assert_eq!(v.parts_verified, 1);
1063 assert!(
1064 v.failures
1065 .iter()
1066 .any(|f| matches!(f, Failure::ManifestSelfInconsistent { .. }))
1067 );
1068 }
1069
1070 // ── untracked objects ───────────────────────────────────────────────
1071
1072 #[test]
1073 fn untracked_object_under_prefix_is_flagged() {
1074 let dir = tempfile::tempdir().unwrap();
1075 let m = build_manifest(
1076 vec![part(1, 10, 4, "xxh3:1111111111111111")],
1077 ManifestStatus::Success,
1078 );
1079 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1080 std::fs::write(dir.path().join("rogue.parquet"), b"XX").unwrap();
1081 let dest = local_dest(dir.path());
1082
1083 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1084 assert!(
1085 v.failures.iter().any(
1086 |f| matches!(f, Failure::UntrackedObject { key, .. } if key == "rogue.parquet")
1087 )
1088 );
1089 // Untracked objects are surfaced but do NOT flip `passed` — that
1090 // is the resume-side decision (M9). Parts and marker are fine,
1091 // so passed remains true.
1092 assert!(v.passed);
1093 }
1094
1095 #[test]
1096 fn quarantine_prefix_objects_are_silently_ignored() {
1097 let dir = tempfile::tempdir().unwrap();
1098 let m = build_manifest(
1099 vec![part(1, 10, 4, "xxh3:1111111111111111")],
1100 ManifestStatus::Success,
1101 );
1102 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1103 std::fs::create_dir_all(dir.path().join(crate::manifest::QUARANTINE_PREFIX)).unwrap();
1104 std::fs::write(
1105 dir.path()
1106 .join(crate::manifest::QUARANTINE_PREFIX)
1107 .join("old.parquet"),
1108 b"OO",
1109 )
1110 .unwrap();
1111 let dest = local_dest(dir.path());
1112
1113 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1114 assert!(v.passed);
1115 assert!(
1116 !v.failures
1117 .iter()
1118 .any(|f| matches!(f, Failure::UntrackedObject { .. })),
1119 "quarantine_prefix is the legitimate home for these — must not flag"
1120 );
1121 }
1122
1123 #[test]
1124 fn doctor_probe_is_not_flagged_as_untracked() {
1125 // Regression: `rivet doctor` writes `.rivet_doctor_probe` at the
1126 // destination prefix and never removes it. A subsequent
1127 // `rivet run --validate` against the same prefix must treat it as a
1128 // Rivet sidecar, not foreign data — otherwise `has_failures()` trips
1129 // and the run's `validated` flag is downgraded to FAIL.
1130 let dir = tempfile::tempdir().unwrap();
1131 let m = build_manifest(
1132 vec![part(1, 10, 4, "xxh3:1111111111111111")],
1133 ManifestStatus::Success,
1134 );
1135 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1136 std::fs::write(
1137 dir.path().join(crate::manifest::DOCTOR_PROBE_FILENAME),
1138 b"ok",
1139 )
1140 .unwrap();
1141 let dest = local_dest(dir.path());
1142
1143 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1144 assert!(
1145 !v.has_failures(),
1146 "doctor probe must not surface as a failure: {:?}",
1147 v.failures
1148 );
1149 assert!(v.passed);
1150 }
1151
1152 // ── manifest_dir join semantics ─────────────────────────────────────
1153
1154 #[test]
1155 fn verifies_in_subdirectory_when_manifest_dir_is_non_empty() {
1156 let outer = tempfile::tempdir().unwrap();
1157 std::fs::create_dir_all(outer.path().join("sub/run")).unwrap();
1158 let m = build_manifest(
1159 vec![part(1, 10, 4, "xxh3:1111111111111111")],
1160 ManifestStatus::Success,
1161 );
1162 let body = serde_json::to_vec_pretty(&m).unwrap();
1163 std::fs::write(outer.path().join("sub/run/part-000001.parquet"), b"AAAA").unwrap();
1164 std::fs::write(outer.path().join("sub/run").join(MANIFEST_FILENAME), &body).unwrap();
1165 std::fs::write(
1166 outer.path().join("sub/run").join(SUCCESS_FILENAME),
1167 success_marker_body(&body),
1168 )
1169 .unwrap();
1170 let dest = local_dest(outer.path());
1171
1172 let v = verify_at_destination(&dest, "sub/run", ValidateDepth::Full).unwrap();
1173 assert!(v.passed);
1174 assert_eq!(v.parts_verified, 1);
1175
1176 // Trailing slash is normalised — same outcome.
1177 let v2 = verify_at_destination(&dest, "sub/run/", ValidateDepth::Full).unwrap();
1178 assert!(v2.passed);
1179 }
1180
1181 // ── list-failure semantics (presence now rides the listing) ──────────
1182
1183 /// Wraps a real `LocalDestination` but fails every `list_prefix`. Used to
1184 /// pin the post-refactor contract: presence is derived from the listing,
1185 /// so a listing we cannot read means the audit cannot certify the parts.
1186 struct ListFails(LocalDestination);
1187 impl crate::destination::Destination for ListFails {
1188 fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
1189 self.0.write(p, k)
1190 }
1191 fn capabilities(&self) -> crate::destination::DestinationCapabilities {
1192 self.0.capabilities()
1193 }
1194 fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
1195 self.0.head(k)
1196 }
1197 fn read(&self, k: &str) -> Result<Vec<u8>> {
1198 self.0.read(k)
1199 }
1200 fn list_prefix(&self, _: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
1201 anyhow::bail!("listing unavailable")
1202 }
1203 }
1204
1205 #[test]
1206 fn list_failure_cannot_certify_parts_and_fails_the_audit() {
1207 let dir = tempfile::tempdir().unwrap();
1208 let m = build_manifest(vec![part(0, 3, 3, "xxh3:0")], ManifestStatus::Success);
1209 write_dataset(dir.path(), &m, &[("part-000000.parquet", b"abc")]);
1210 let dest = ListFails(local_dest(dir.path()));
1211
1212 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1213 // The manifest itself reads + parses fine (HEAD/read still work)…
1214 assert!(v.manifest_found);
1215 assert!(v.manifest_self_consistent);
1216 // …but with no listing we verify zero parts and refuse to pass.
1217 assert!(
1218 !v.passed,
1219 "an audit that cannot list the prefix must not pass"
1220 );
1221 assert_eq!(v.parts_verified, 0);
1222 assert!(
1223 v.failures
1224 .iter()
1225 .any(|f| matches!(f, Failure::ListPrefixError { .. })),
1226 "expected a ListPrefixError, got: {:?}",
1227 v.failures
1228 );
1229 }
1230
1231 // ── manifest read-error semantics (explicit failure, not legacy) ─────
1232
1233 /// Wraps a real `LocalDestination` but fails reading `manifest.json`
1234 /// (head still sees it) — EACCES or a transient store error on a
1235 /// manifest that exists.
1236 struct ManifestReadFails(LocalDestination);
1237 impl crate::destination::Destination for ManifestReadFails {
1238 fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
1239 self.0.write(p, k)
1240 }
1241 fn capabilities(&self) -> crate::destination::DestinationCapabilities {
1242 self.0.capabilities()
1243 }
1244 fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
1245 self.0.head(k)
1246 }
1247 fn read(&self, k: &str) -> Result<Vec<u8>> {
1248 if k.ends_with(MANIFEST_FILENAME) {
1249 anyhow::bail!("permission denied (simulated)")
1250 }
1251 self.0.read(k)
1252 }
1253 fn list_prefix(&self, p: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
1254 self.0.list_prefix(p)
1255 }
1256 }
1257
1258 #[test]
1259 fn unreadable_manifest_is_explicit_failure_not_legacy() {
1260 // The exit gates (`rivet validate`, run finalize) key off this exact
1261 // shape: `manifest_found: false` but `has_failures()` — distinct
1262 // from M6 legacy (`legacy_run: true`, no failures, exit 0).
1263 let dir = tempfile::tempdir().unwrap();
1264 let m = build_manifest(
1265 vec![part(1, 10, 4, "xxh3:1111111111111111")],
1266 ManifestStatus::Success,
1267 );
1268 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1269 let dest = ManifestReadFails(local_dest(dir.path()));
1270
1271 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1272 assert!(!v.manifest_found);
1273 assert!(!v.legacy_run, "a read error is not the M6 legacy label");
1274 assert!(!v.passed);
1275 assert!(v.has_failures(), "orchestrators need a reason to refuse");
1276 assert!(
1277 matches!(v.failures.as_slice(), [Failure::ManifestReadError { .. }]),
1278 "expected exactly one ManifestReadError, got: {:?}",
1279 v.failures
1280 );
1281 }
1282
1283 /// Same contract when `head` itself errors (cannot even stat the
1284 /// manifest): symmetric `ManifestReadError`, never the legacy label.
1285 struct ManifestHeadFails(LocalDestination);
1286 impl crate::destination::Destination for ManifestHeadFails {
1287 fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
1288 self.0.write(p, k)
1289 }
1290 fn capabilities(&self) -> crate::destination::DestinationCapabilities {
1291 self.0.capabilities()
1292 }
1293 fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
1294 if k.ends_with(MANIFEST_FILENAME) {
1295 anyhow::bail!("io timeout (simulated)")
1296 }
1297 self.0.head(k)
1298 }
1299 fn read(&self, k: &str) -> Result<Vec<u8>> {
1300 self.0.read(k)
1301 }
1302 fn list_prefix(&self, p: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
1303 self.0.list_prefix(p)
1304 }
1305 }
1306
1307 #[test]
1308 fn manifest_head_error_is_explicit_failure_not_legacy() {
1309 let dir = tempfile::tempdir().unwrap();
1310 let m = build_manifest(
1311 vec![part(1, 10, 4, "xxh3:1111111111111111")],
1312 ManifestStatus::Success,
1313 );
1314 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
1315 let dest = ManifestHeadFails(local_dest(dir.path()));
1316
1317 let v = verify_at_destination(&dest, "", ValidateDepth::Full).unwrap();
1318 assert!(!v.manifest_found);
1319 assert!(!v.legacy_run);
1320 assert!(!v.passed);
1321 assert!(
1322 matches!(
1323 v.failures.as_slice(),
1324 [Failure::ManifestReadError { detail }] if detail.contains("manifest head failed")
1325 ),
1326 "expected one ManifestReadError naming the head step, got: {:?}",
1327 v.failures
1328 );
1329 }
1330
1331 #[test]
1332 fn passed_is_derived_advisory_failures_do_not_fail() {
1333 // An advisory failure (untracked surplus) keeps the verdict passing…
1334 let mut v = ManifestVerification {
1335 manifest_found: true,
1336 ..ManifestVerification::empty()
1337 };
1338 v.failures.push(Failure::UntrackedObject {
1339 key: "stray.parquet".into(),
1340 size_bytes: 9,
1341 });
1342 v.recompute_passed();
1343 assert!(v.passed, "untracked surplus is advisory, not fatal");
1344
1345 // …while any fatal failure flips it.
1346 v.failures.push(Failure::PartMissing {
1347 part_id: 1,
1348 path: "part-000001.parquet".into(),
1349 });
1350 v.recompute_passed();
1351 assert!(!v.passed, "a missing part is fatal");
1352
1353 // No manifest → never passes regardless of failures.
1354 let mut legacy = ManifestVerification::empty();
1355 legacy.recompute_passed();
1356 assert!(!legacy.passed, "no manifest found → cannot certify");
1357 }
1358
1359 #[test]
1360 fn verify_content_policy_fails_only_size_only_parts() {
1361 // 3 parts, 2 content-checked, 1 size-only.
1362 let base = ManifestVerification {
1363 manifest_found: true,
1364 parts_verified: 3,
1365 parts_md5_verified: 2,
1366 ..ManifestVerification::empty()
1367 };
1368 // verify: size → size-only is acceptable, passes.
1369 let mut sz = base.clone();
1370 sz.recompute_passed();
1371 sz.enforce_content_policy(false);
1372 assert!(sz.passed, "size-only OK under verify: size");
1373
1374 // verify: content → the 1 size-only part is a fatal failure.
1375 let mut ct = base.clone();
1376 ct.recompute_passed();
1377 ct.enforce_content_policy(true);
1378 assert!(!ct.passed, "a size-only part fails verify: content");
1379 assert!(
1380 ct.failures.iter().any(|f| matches!(
1381 f,
1382 Failure::ContentVerificationUnmet {
1383 size_only: 1,
1384 total: 3
1385 }
1386 )),
1387 "expected ContentVerificationUnmet, got: {:?}",
1388 ct.failures
1389 );
1390
1391 // verify: content with every part md5-checked → satisfied.
1392 let mut all = ManifestVerification {
1393 parts_md5_verified: 3,
1394 ..base
1395 };
1396 all.recompute_passed();
1397 all.enforce_content_policy(true);
1398 assert!(
1399 all.passed && all.failures.is_empty(),
1400 "all md5 meets verify: content"
1401 );
1402 }
1403
1404 // ── require_manifest_present (finding #20: operator-pinned --prefix) ──────
1405
1406 #[test]
1407 fn require_manifest_escalates_legacy_to_fatal_absent() {
1408 // The exact shape `verify_at_destination` returns for an absent manifest
1409 // (`legacy()`): no manifest, no other failure. With a pinned `--prefix`
1410 // this is escalated to a fatal `ManifestRequiredButAbsent` so the exit
1411 // gate refuses it instead of passing as a benign legacy run.
1412 let mut v = ManifestVerification::legacy();
1413 assert!(v.legacy_run && !v.has_failures());
1414
1415 v.require_manifest_present("exports/2026-06-09/orders/");
1416
1417 assert!(!v.legacy_run, "no longer the benign legacy-run label");
1418 assert!(!v.passed, "an absent-but-required manifest cannot pass");
1419 assert!(
1420 matches!(
1421 v.failures.as_slice(),
1422 [Failure::ManifestRequiredButAbsent { prefix }]
1423 if prefix == "exports/2026-06-09/orders/"
1424 ),
1425 "expected one ManifestRequiredButAbsent naming the prefix, got: {:?}",
1426 v.failures
1427 );
1428 }
1429
1430 #[test]
1431 fn require_manifest_is_noop_on_a_real_passing_manifest() {
1432 // A found, passing verdict is untouched — `--prefix` plus real data is
1433 // the normal "validate this exact prefix" case and must still pass.
1434 let mut v = ManifestVerification {
1435 manifest_found: true,
1436 manifest_self_consistent: true,
1437 parts_verified: 1,
1438 passed: true,
1439 ..ManifestVerification::empty()
1440 };
1441 v.require_manifest_present("exports/orders/");
1442 assert!(
1443 v.passed && v.failures.is_empty(),
1444 "real dataset still passes"
1445 );
1446 }
1447
1448 #[test]
1449 fn require_manifest_does_not_double_flag_a_read_error() {
1450 // An absent manifest that already carries a `ManifestReadError` (head /
1451 // read failed) is already a fatal, classified failure — requiring a
1452 // manifest here must not add a second, redundant failure.
1453 let mut v = ManifestVerification::legacy();
1454 v.legacy_run = false;
1455 v.failures.push(Failure::ManifestReadError {
1456 detail: "permission denied".into(),
1457 });
1458 v.recompute_passed();
1459
1460 v.require_manifest_present("exports/orders/");
1461
1462 assert!(
1463 matches!(v.failures.as_slice(), [Failure::ManifestReadError { .. }]),
1464 "must leave the existing read-error verdict alone, got: {:?}",
1465 v.failures
1466 );
1467 }
1468
1469 // ── graded verify layer (--depth) ───────────────────────────────────
1470
1471 #[test]
1472 fn light_depth_skips_part_reconcile_even_when_a_part_is_missing() {
1473 // A manifest declaring a part that is NOT on disk. At `Full`/`Sample`
1474 // this is a fatal `PartMissing`; at `Light` the `list_prefix` reconcile
1475 // is skipped entirely, so `parts_verified == 0`, no `ListPrefixError`,
1476 // and — with `_SUCCESS` consistent and the manifest self-consistent —
1477 // the verdict still passes. Light certifies the manifest + marker, not
1478 // the parts.
1479 let dir = tempfile::tempdir().unwrap();
1480 let m = build_manifest(
1481 vec![
1482 part(1, 10, 4, "xxh3:1111111111111111"),
1483 part(2, 20, 5, "xxh3:2222222222222222"),
1484 ],
1485 ManifestStatus::Success,
1486 );
1487 // Deliberately write NEITHER part — only manifest.json + _SUCCESS.
1488 write_dataset(dir.path(), &m, &[]);
1489 let dest = local_dest(dir.path());
1490
1491 let v = verify_at_destination(&dest, "", ValidateDepth::Light).unwrap();
1492 assert_eq!(v.depth_level, "light");
1493 assert_eq!(
1494 v.parts_verified, 0,
1495 "light skips the listing — no part is ever verified"
1496 );
1497 assert_eq!(
1498 v.parts_failed, 0,
1499 "no part reconcile means no part failures"
1500 );
1501 assert!(
1502 !v.failures.iter().any(|f| matches!(
1503 f,
1504 Failure::PartMissing { .. } | Failure::ListPrefixError { .. }
1505 )),
1506 "light must not surface part or list failures, got: {:?}",
1507 v.failures
1508 );
1509 assert!(
1510 v.success_marker_consistent,
1511 "_SUCCESS is still checked at light depth"
1512 );
1513 assert!(v.manifest_self_consistent);
1514 assert!(
1515 v.passed,
1516 "manifest + _SUCCESS are consistent, so a light pass certifies it"
1517 );
1518 }
1519
1520 #[test]
1521 fn light_depth_never_lists_so_a_list_failure_cannot_trip() {
1522 // Even with a destination whose `list_prefix` always errors, a light
1523 // pass succeeds: it never calls `list_prefix`, so no `ListPrefixError`.
1524 // This is the direct contrast to `list_failure_cannot_certify_parts…`
1525 // (which runs at Full and *does* fail on the list error).
1526 let dir = tempfile::tempdir().unwrap();
1527 let m = build_manifest(vec![part(0, 3, 3, "xxh3:0")], ManifestStatus::Success);
1528 write_dataset(dir.path(), &m, &[("part-000000.parquet", b"abc")]);
1529 let dest = ListFails(local_dest(dir.path()));
1530
1531 let v = verify_at_destination(&dest, "", ValidateDepth::Light).unwrap();
1532 assert_eq!(v.depth_level, "light");
1533 assert!(
1534 !v.failures
1535 .iter()
1536 .any(|f| matches!(f, Failure::ListPrefixError { .. })),
1537 "light never lists, so a failing list_prefix cannot surface, got: {:?}",
1538 v.failures
1539 );
1540 assert_eq!(v.parts_verified, 0);
1541 assert!(v.passed, "manifest + _SUCCESS consistent → light passes");
1542 }
1543
1544 #[test]
1545 fn sample_depth_runs_part_reconcile_like_full() {
1546 // `Sample` runs every section `verify_at_destination` owns (1-5) — the
1547 // Form B value re-read it skips lives in the *caller*, not here. So a
1548 // missing part is a fatal `PartMissing` at Sample, identical to Full.
1549 let dir = tempfile::tempdir().unwrap();
1550 let m = build_manifest(
1551 vec![
1552 part(1, 10, 4, "xxh3:1111111111111111"),
1553 part(2, 20, 5, "xxh3:2222222222222222"),
1554 ],
1555 ManifestStatus::Success,
1556 );
1557 write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]); // part 2 missing
1558 let dest = local_dest(dir.path());
1559
1560 let v = verify_at_destination(&dest, "", ValidateDepth::Sample).unwrap();
1561 assert_eq!(v.depth_level, "sample");
1562 assert_eq!(v.parts_verified, 1);
1563 assert_eq!(v.parts_failed, 1);
1564 assert!(!v.passed);
1565 assert!(
1566 v.failures
1567 .iter()
1568 .any(|f| matches!(f, Failure::PartMissing { part_id: 2, .. })),
1569 "sample reconciles parts just like full, got: {:?}",
1570 v.failures
1571 );
1572 }
1573
1574 #[test]
1575 fn depth_level_is_stamped_on_every_verdict_shape() {
1576 // The depth label rides the verdict even on the early-return shapes
1577 // (legacy / no manifest), so a consumer always sees how deep the pass
1578 // went.
1579 let dir = tempfile::tempdir().unwrap();
1580 let dest = local_dest(dir.path()); // empty prefix → legacy
1581 for depth in [
1582 ValidateDepth::Light,
1583 ValidateDepth::Sample,
1584 ValidateDepth::Full,
1585 ] {
1586 let v = verify_at_destination(&dest, "", depth).unwrap();
1587 assert!(v.legacy_run, "empty prefix is the legacy shape");
1588 assert_eq!(
1589 v.depth_level,
1590 depth.label(),
1591 "legacy verdict must still carry its depth label"
1592 );
1593 }
1594 }
1595
1596 #[test]
1597 fn error_code_is_stable_and_distinct_per_variant() {
1598 // Each variant maps to its documented `RIVET_VERIFY_*` code. A
1599 // regression guard: renaming a code is a silent break for any CI gate
1600 // keying off it.
1601 let cases: &[(Failure, &str)] = &[
1602 (
1603 Failure::PartMissing {
1604 part_id: 1,
1605 path: "p".into(),
1606 },
1607 "RIVET_VERIFY_PART_MISSING",
1608 ),
1609 (
1610 Failure::PartSizeMismatch {
1611 part_id: 1,
1612 path: "p".into(),
1613 expected: 1,
1614 actual: 2,
1615 },
1616 "RIVET_VERIFY_PART_SIZE_MISMATCH",
1617 ),
1618 (
1619 Failure::PartChecksumMismatch {
1620 part_id: 1,
1621 path: "p".into(),
1622 expected: "a".into(),
1623 actual: "b".into(),
1624 },
1625 "RIVET_VERIFY_PART_CHECKSUM_MISMATCH",
1626 ),
1627 (
1628 Failure::SuccessMarkerMalformed {
1629 body_preview: "x".into(),
1630 },
1631 "RIVET_VERIFY_SUCCESS_MALFORMED",
1632 ),
1633 (
1634 Failure::SuccessMarkerStale {
1635 marker_fingerprint: "a".into(),
1636 manifest_fingerprint: "b".into(),
1637 },
1638 "RIVET_VERIFY_SUCCESS_STALE",
1639 ),
1640 (
1641 Failure::ManifestSelfInconsistent { detail: "d".into() },
1642 "RIVET_VERIFY_MANIFEST_INCONSISTENT",
1643 ),
1644 (
1645 Failure::ManifestReadError { detail: "d".into() },
1646 "RIVET_VERIFY_MANIFEST_READ_ERROR",
1647 ),
1648 (
1649 Failure::SuccessMarkerReadError { detail: "d".into() },
1650 "RIVET_VERIFY_SUCCESS_READ_ERROR",
1651 ),
1652 (
1653 Failure::ListPrefixError { detail: "d".into() },
1654 "RIVET_VERIFY_LIST_ERROR",
1655 ),
1656 (
1657 Failure::UntrackedObject {
1658 key: "k".into(),
1659 size_bytes: 1,
1660 },
1661 "RIVET_VERIFY_UNTRACKED_OBJECT",
1662 ),
1663 (
1664 Failure::ContentVerificationUnmet {
1665 size_only: 1,
1666 total: 2,
1667 },
1668 "RIVET_VERIFY_CONTENT_UNMET",
1669 ),
1670 (
1671 Failure::ManifestRequiredButAbsent { prefix: "p".into() },
1672 "RIVET_VERIFY_MANIFEST_REQUIRED",
1673 ),
1674 ];
1675 for (failure, code) in cases {
1676 assert_eq!(&failure.error_code(), code, "code for {failure:?}");
1677 assert!(
1678 failure.error_code().starts_with("RIVET_VERIFY_"),
1679 "every code shares the RIVET_VERIFY_ prefix"
1680 );
1681 }
1682 }
1683}