Skip to main content

rivet/pipeline/
validate_cmd.rs

1//! **Layer: Coordinator** (config → destination → verification → render)
2//!
3//! `rivet validate` — re-run the manifest-aware `--validate` pass against
4//! an existing destination, without performing an extraction.
5//!
6//! ADR-0013 amendment: this is **not** a new trust noun.  It is a standalone
7//! driver for the same M5/M6 verification surface that `rivet run --validate`
8//! already performs at end-of-run (see [`crate::pipeline::validate_manifest`]).
9//! The verdict shape is identical; the only difference is no source query,
10//! no extraction, no state writes.
11//!
12//! Use cases:
13//! - "Is the output at this prefix still complete?" — Airflow / CI poller
14//!   between runs.
15//! - "Did someone delete a part by mistake?" — operator triage on a
16//!   suspected-broken dataset.
17//! - "Does this legacy prefix have a manifest yet?" — fast check for M6.
18//! - "Was yesterday's run complete?" — `--date YYYY-MM-DD` or `--run-id`
19//!   re-targets a prior day's prefix without re-running the export
20//!   (v0.7.2 historical-validation flags).
21//!
22//! Out of scope:
23//! - Source-side reconciliation (`COUNT(*)`).  That's `--reconcile` /
24//!   `rivet reconcile`, which already exists.
25//! - Per-byte re-fingerprint of every part (`--validate --deep`, future).
26//!
27//! Exit code: `0` if `passed` (or the legacy-run case where the verifier
28//! cannot certify but no failures were seen); non-zero on any explicit
29//! failure (`PartMissing`, `PartSizeMismatch`, `SuccessMarkerStale`, …).
30
31use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::{ValidateDepth, verify_at_destination};
40
41/// Output format mirroring the `rivet reconcile` / `rivet repair` pattern.
42pub enum ValidateOutputFormat {
43    /// Human-readable summary printed to stdout.
44    Pretty,
45    /// JSON to the given path or stdout if `None`.
46    Json(Option<String>),
47}
48
49/// Re-targeting overrides for `rivet validate`.
50///
51/// Default (`ValidateTarget::default()`) reproduces the v0.7.1 behaviour:
52/// resolve `{date}` against today's UTC date, with no `{run_id}`
53/// substitution, and use the config's destination prefix/path unchanged.
54#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56    /// `--date YYYY-MM-DD` — override the date used for `{date}`.
57    pub date: Option<NaiveDate>,
58    /// `--run-id RID` — substitute `{run_id}` in the destination template.
59    pub run_id: Option<String>,
60    /// `--prefix STRING` — bypass placeholder resolution entirely and
61    /// verify exactly this prefix.  Replaces both `prefix` and `path`.
62    pub prefix_override: Option<String>,
63    /// `--depth light|sample|full` — the graded verify layer (see
64    /// [`ValidateDepth`]).  Defaults (`ValidateDepth::default()` →
65    /// [`ValidateDepth::Full`]) to the pre-graded behaviour: all five
66    /// sections **plus** the Form B value re-read, so existing callers
67    /// constructing `ValidateTarget::default()` are unchanged.
68    pub depth: ValidateDepth,
69}
70
71impl ValidateTarget {
72    fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
73        let mut ctx = match self.date {
74            Some(d) => PlaceholderContext::for_date(d, export_name),
75            None => PlaceholderContext::for_today(export_name),
76        };
77        if let Some(rid) = &self.run_id {
78            ctx = ctx.with_run_id(rid.clone());
79        }
80        ctx
81    }
82}
83
84/// Driver for `rivet validate <export>` (or every export when
85/// `export_name` is `None`).
86///
87/// Returns `Err` on the first explicit verification failure across the
88/// requested exports so an Airflow / CI step can branch on the exit code.
89/// Per-export verdicts are still printed to stdout / written to JSON for
90/// every export, including subsequent ones — the bail at the end is the
91/// last action.
92pub fn run_validate_command(
93    config_path: &str,
94    export_name: Option<&str>,
95    format: ValidateOutputFormat,
96    target: ValidateTarget,
97) -> Result<()> {
98    let config = Config::load_with_params(config_path, None)?;
99
100    let exports: Vec<&crate::config::ExportConfig> = match export_name {
101        Some(name) => match config.exports.iter().find(|e| e.name == name) {
102            Some(e) => vec![e],
103            None => anyhow::bail!("export '{}' not found in config", name),
104        },
105        None => config.exports.iter().collect(),
106    };
107
108    if exports.is_empty() {
109        anyhow::bail!("no exports defined in config — nothing to validate");
110    }
111
112    // `--prefix` only makes sense for a single export; with multiple
113    // exports it would silently re-point all of them at the same physical
114    // bytes.  Catch this at the boundary so we never head-check the wrong
115    // dataset under operator triage pressure.
116    if target.prefix_override.is_some() && exports.len() > 1 {
117        anyhow::bail!(
118            "--prefix requires --export <name>: cannot apply one override to {} exports",
119            exports.len()
120        );
121    }
122
123    let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
124    let mut hard_failures: Vec<String> = Vec::new();
125
126    for export in &exports {
127        // Apply the operator-supplied re-targeting if any, else fall back
128        // to today's UTC date (same shape `rivet run` resolves at write
129        // time).
130        let ctx = target.placeholder_context(&export.name);
131        let mut expanded_dest =
132            crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
133        if let Some(p) = &target.prefix_override {
134            // Bypass placeholder resolution: trust the operator's literal
135            // prefix.  Replace both `path` (local) and `prefix` (cloud)
136            // so whichever the backend reads picks up the override.
137            expanded_dest.path = Some(p.clone());
138            expanded_dest.prefix = Some(p.clone());
139        }
140        let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
141        let dest = match crate::destination::create_destination(&expanded_dest) {
142            Ok(d) => d,
143            Err(e) => {
144                let msg = format!(
145                    "export '{}' (prefix: {}): could not open destination: {:#}",
146                    export.name, resolved_prefix, e
147                );
148                hard_failures.push(msg);
149                continue;
150            }
151        };
152        // Streaming destinations have no prefix to verify — note and skip.
153        if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
154        {
155            log::info!(
156                "export '{}': streaming destination, skipping (nothing to verify)",
157                export.name
158            );
159            continue;
160        }
161        match verify_at_destination(&*dest, "", target.depth) {
162            Ok(mut v) => {
163                // Apply this export's `verify` policy: `content` fails the
164                // verdict when any part is only size-verified (review D).
165                v.enforce_content_policy(export.verify.requires_content());
166                // Finding #20: when the operator pinned a literal `--prefix`,
167                // they asserted a real dataset lives here. An absent manifest is
168                // then NOT the benign M6 legacy-run case (exit 0) — it almost
169                // always means the prefix was never written (a misconfigured CI
170                // gate `rivet validate && deploy` sailing past nothing). Escalate
171                // that exact shape (no manifest, no other failure) to a fatal
172                // `ManifestRequiredButAbsent` so the exit gate refuses it loudly
173                // instead of silently passing. No-op for every other shape (a
174                // real manifest, or an absent one already carrying a read error).
175                if target.prefix_override.is_some() {
176                    v.require_manifest_present(&resolved_prefix);
177                }
178                // Capture the verdict before `v` is moved into the result: the
179                // deeper Form B checksum re-read below must run *only* on a
180                // manifest that was found and passed the standard checks.
181                let manifest_verified = v.manifest_found && v.passed;
182                all_results.push(ExportVerdict {
183                    name: export.name.clone(),
184                    resolved_prefix,
185                    verification: v,
186                });
187                // CDC-specific: re-read the parts and confirm `__pos` stayed in
188                // source-log order (no reorder / no part-boundary overlap). The
189                // manifest check above already covered per-part MD5 / size / _SUCCESS.
190                // Full-depth only — like Form B below it downloads every part, so a
191                // light/sample run skips it (keeps the depth contract consistent).
192                if target.depth.runs_part_download()
193                    && export.mode == crate::config::ExportMode::Cdc
194                    && export.format == crate::config::FormatType::Parquet
195                {
196                    match crate::source::cdc::validate::check_positions(&*dest, "") {
197                        Ok(pc) if pc.is_ok() => log::info!(
198                            "export '{}': cdc __pos continuity OK — {} changes across {} parts, range {:?}..{:?}",
199                            export.name,
200                            pc.rows,
201                            pc.parts,
202                            pc.first,
203                            pc.last
204                        ),
205                        Ok(pc) => {
206                            for v in &pc.violations {
207                                hard_failures
208                                    .push(format!("export '{}': cdc __pos: {}", export.name, v));
209                            }
210                        }
211                        Err(e) => hard_failures.push(format!(
212                            "export '{}': cdc __pos check failed: {:#}",
213                            export.name, e
214                        )),
215                    }
216                }
217                // Form B: re-read the parts and verify the per-column value
218                // checksums recorded in the manifest (catches an Arrow→Parquet
219                // encode / post-write fault the in-process Form A cannot see).
220                // Gated on a found+passed manifest: an absent (legacy pass) or
221                // unreadable manifest is already accounted for above, so re-reading
222                // it here would either break the legacy pass or double-count.
223                //
224                // Graded depth: Form B is the **only** part-download step, so it
225                // runs at `--depth full` alone.  `light` and `sample` deliberately
226                // skip it — `sample` is "all structural checks, no part bodies".
227                if target.depth.runs_part_download()
228                    && manifest_verified
229                    && export.format == crate::config::FormatType::Parquet
230                    && let Err(e) =
231                        crate::source::value_checksum::validate_manifest_checksums(&*dest, "")
232                {
233                    hard_failures
234                        .push(format!("export '{}': value checksum: {:#}", export.name, e));
235                }
236            }
237            Err(e) => {
238                hard_failures.push(format!(
239                    "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
240                    export.name, resolved_prefix, e
241                ));
242            }
243        }
244    }
245
246    match format {
247        ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
248        ValidateOutputFormat::Json(out_path) => {
249            render_json(&all_results, &hard_failures, out_path)?
250        }
251    }
252
253    // Exit-code policy: the standalone driver fails when an export's
254    // verdict surfaced an explicit failure it could not pass over
255    // (`verdict_fails_exit`) — an M5 verification failure on a found
256    // manifest (missing part, size mismatch, stale _SUCCESS,
257    // self-inconsistent manifest) or a manifest that could not even be
258    // read (`ManifestReadError`: `manifest_found` is false, but the
259    // verifier has a concrete reason to refuse, not a legacy prefix).
260    // Surplus untracked objects (`UntrackedObject`) are surfaced in
261    // `failures` for operator audit but do NOT flip `passed`, because
262    // their cleanup is M9's job (resume), not validate's.  An operator
263    // who wants strict "no surplus allowed" can grep the JSON report for
264    // `kind: untracked_object` themselves; a future
265    // `rivet validate --strict` flag may surface that exit-code mode if
266    // demand appears (out of scope for this PR).
267    //
268    // Legacy runs (M6) keep exit 0: `passed: false` with no failures
269    // means "verifier cannot certify", not "verifier found a problem".
270    let failed_verdicts = all_results
271        .iter()
272        .filter(|r| verdict_fails_exit(&r.verification))
273        .count();
274    if failed_verdicts > 0 {
275        // A verified-and-wrong verdict (missing part, size mismatch, stale
276        // _SUCCESS, self-inconsistent manifest) is the data-integrity class
277        // (exit 3) — typed so a scheduler stops rather than blindly retries.
278        // `hard_failures` (couldn't open / read the destination) are operational
279        // "could not verify", not "verified wrong", so they fold into the count
280        // but the class is driven by the real verdict failure.
281        return Err(crate::error::DataIntegrityError::new(format!(
282            "rivet validate: {} export(s) failed verification",
283            hard_failures.len() + failed_verdicts
284        ))
285        .into());
286    }
287    if !hard_failures.is_empty() {
288        // Could-not-verify only (no verified-wrong verdict): operational, generic.
289        anyhow::bail!(
290            "rivet validate: {} export(s) failed verification",
291            hard_failures.len()
292        );
293    }
294    Ok(())
295}
296
297/// Exit-code predicate for one export's verdict: non-zero iff the verifier
298/// surfaced an explicit failure (`has_failures` — "a reason an orchestrator
299/// should refuse the run") on a verdict that did not pass.  Both documented
300/// exit-0 cases survive: legacy runs (M6 — `passed: false` with no failures
301/// is "cannot certify", not "found a problem") and advisory-only verdicts
302/// (`UntrackedObject` never flips `passed`).
303fn verdict_fails_exit(v: &ManifestVerification) -> bool {
304    !v.passed && v.has_failures()
305}
306
307/// Per-export verdict plus the resolved physical prefix the verifier
308/// looked at — surfaced in both pretty and JSON output so an operator can
309/// confirm at a glance which bytes were checked.
310struct ExportVerdict {
311    name: String,
312    resolved_prefix: String,
313    verification: ManifestVerification,
314}
315
316/// Render the destination's resolved prefix for human/JSON output.
317///
318/// Cloud backends carry the data location in `prefix`; the local backend
319/// uses `path`.  Falling back to `<unresolved>` should never fire under
320/// normal config (clap + Config::load enforce one of the two) but keeps
321/// `validate` from panicking if a future config shape lands here.
322fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
323    dest.prefix
324        .clone()
325        .or_else(|| dest.path.clone())
326        .unwrap_or_else(|| "<unresolved>".into())
327}
328
329fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
330    use std::io::Write;
331    let stdout = std::io::stdout();
332    let mut h = stdout.lock();
333
334    for r in results {
335        let _ = writeln!(h, "── {} ──", r.name);
336        let _ = writeln!(h, "  prefix:    {}", r.resolved_prefix);
337        let v = &r.verification;
338        // Graded verify layer: surface how deep this pass went so a reader
339        // knows whether a PASSED verdict reconciled parts (sample/full) or
340        // only the manifest + _SUCCESS (light).
341        let _ = writeln!(h, "  depth:     {}", v.depth_level);
342        if v.legacy_run {
343            let _ = writeln!(
344                h,
345                "  status:    legacy_run (no manifest at destination — pre-0.7.0 prefix)"
346            );
347            continue;
348        }
349        if !v.manifest_found {
350            let _ = writeln!(h, "  status:    NO MANIFEST");
351            // A read-error verdict lands here (manifest present but
352            // unreadable, or head failed): its `failures` are the
353            // operator's only signal, so print them before bailing out
354            // of this export's section.  Each line carries its stable
355            // `RIVET_VERIFY_*` code in brackets so CI can grep it.
356            for failure in &v.failures {
357                let _ = writeln!(h, "  failure:   [{}] {}", failure.error_code(), failure);
358            }
359            continue;
360        }
361        let _ = writeln!(
362            h,
363            "  status:    {}",
364            if v.passed { "PASSED" } else { "FAILED" }
365        );
366        let _ = writeln!(
367            h,
368            "  parts:     {} verified ({} md5, {} size-only), {} failed",
369            v.parts_verified,
370            v.parts_md5_verified,
371            v.parts_verified.saturating_sub(v.parts_md5_verified),
372            v.parts_failed
373        );
374        let _ = writeln!(
375            h,
376            "  _SUCCESS:  {}",
377            if v.success_marker_consistent {
378                "consistent"
379            } else if v.failures.iter().any(|f| matches!(
380                f,
381                crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
382                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
383                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
384            )) {
385                "INCONSISTENT (see failures)"
386            } else {
387                "absent (no signal)"
388            }
389        );
390        let _ = writeln!(
391            h,
392            "  manifest:  {}",
393            if v.manifest_self_consistent {
394                "self-consistent"
395            } else {
396                "INCONSISTENT (see failures)"
397            }
398        );
399        for failure in &v.failures {
400            // `Failure: Display` is the single source of truth for the message;
401            // same string the run report uses.  L14: advisory (non-fatal)
402            // entries — `UntrackedObject` surplus — are labelled "warning:" not
403            // "failure:".  They never flip `passed` and never change the exit
404            // code (cleanup is `--resume`'s job, M9), so rendering them as
405            // "failure:" beside exit 0 was contradictory.  Fatal failures keep
406            // the "failure:" label.
407            let label = if failure.is_fatal() {
408                "failure:"
409            } else {
410                "warning:"
411            };
412            // Stable `RIVET_VERIFY_*` code in brackets ahead of the human
413            // message so an orchestrator can branch on the code without
414            // parsing the prose.
415            let _ = writeln!(h, "  {}   [{}] {}", label, failure.error_code(), failure);
416        }
417    }
418
419    if !hard_failures.is_empty() {
420        let _ = writeln!(h);
421        let _ = writeln!(h, "── errors ──");
422        for e in hard_failures {
423            let _ = writeln!(h, "  {}", e);
424        }
425    }
426    let _ = h.flush();
427}
428
429/// Serialize one [`ManifestVerificationFailure`] to JSON with its stable
430/// `RIVET_VERIFY_*` code injected next to `kind`.
431///
432/// The derive emits `{ "kind": "...", <variant fields> }`; this adds `"code"`
433/// so a consumer can branch on the code without re-deriving it from `kind`.
434/// Returns the enriched object (or the unmodified serde value if, impossibly,
435/// the failure didn't serialize as a JSON object).
436fn failure_json(f: &crate::pipeline::ManifestVerificationFailure) -> serde_json::Value {
437    let mut value = serde_json::json!(f);
438    if let Some(obj) = value.as_object_mut() {
439        obj.insert(
440            "code".to_string(),
441            serde_json::Value::String(f.error_code().to_string()),
442        );
443    }
444    value
445}
446
447/// Serialize a [`ManifestVerification`] to JSON, replacing the derive's plain
448/// `failures` array with one whose entries each carry their `RIVET_VERIFY_*`
449/// `code`.  All other fields (`depth_level`, `passed`, counts, …) ride the
450/// derive unchanged, so the stable wire contract is preserved and only widened.
451fn verification_json(v: &ManifestVerification) -> serde_json::Value {
452    let mut value = serde_json::json!(v);
453    if let Some(obj) = value.as_object_mut() {
454        let failures: Vec<serde_json::Value> = v.failures.iter().map(failure_json).collect();
455        obj.insert("failures".to_string(), serde_json::Value::Array(failures));
456    }
457    value
458}
459
460fn render_json(
461    results: &[ExportVerdict],
462    hard_failures: &[String],
463    out_path: Option<String>,
464) -> Result<()> {
465    // L14: surface advisory (non-fatal) entries — `UntrackedObject` surplus —
466    // in a dedicated top-level `warnings` array so a consumer can tell at a
467    // glance that "failures means failures".  The per-export
468    // `verification.failures` array is the stable wire contract (consumers
469    // branch on `failures[].kind`), so advisory entries stay there too — this
470    // is an additive lens over the same data, not a relocation.  Each entry
471    // also carries its stable `RIVET_VERIFY_*` `code` next to `kind`.
472    let warnings: Vec<serde_json::Value> = results
473        .iter()
474        .flat_map(|r| {
475            r.verification
476                .failures
477                .iter()
478                .filter(|f| !f.is_fatal())
479                .map(move |f| {
480                    serde_json::json!({
481                        "export_name": r.name,
482                        "warning": failure_json(f),
483                    })
484                })
485        })
486        .collect();
487
488    let payload = serde_json::json!({
489        "exports": results
490            .iter()
491            .map(|r| {
492                serde_json::json!({
493                    "export_name": r.name,
494                    "resolved_prefix": r.resolved_prefix,
495                    "verification": verification_json(&r.verification),
496                })
497            })
498            .collect::<Vec<_>>(),
499        "warnings": warnings,
500        "errors": hard_failures,
501    });
502    let serialized = serde_json::to_string_pretty(&payload)?;
503    match out_path {
504        Some(p) => {
505            std::fs::write(Path::new(&p), &serialized)?;
506            log::info!("rivet validate: wrote JSON report to {}", p);
507        }
508        None => {
509            println!("{}", serialized);
510        }
511    }
512    Ok(())
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518
519    // ── ValidateTarget::placeholder_context ────────────────────────────────
520
521    #[test]
522    fn target_default_uses_today() {
523        let target = ValidateTarget::default();
524        let ctx = target.placeholder_context("orders");
525        assert_eq!(ctx.date, chrono::Utc::now().date_naive());
526        assert_eq!(ctx.export_name, "orders");
527        assert!(ctx.run_id.is_none());
528    }
529
530    #[test]
531    fn target_with_date_overrides_today() {
532        let target = ValidateTarget {
533            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
534            ..Default::default()
535        };
536        let ctx = target.placeholder_context("orders");
537        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
538        assert!(ctx.run_id.is_none());
539    }
540
541    #[test]
542    fn target_composes_date_and_run_id() {
543        // Regression for the "run yesterday, validate today" scenario:
544        // operator passes both --date and --run-id; the resolver must see
545        // both.
546        let target = ValidateTarget {
547            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
548            run_id: Some("r-abc123".into()),
549            prefix_override: None,
550            ..Default::default()
551        };
552        let ctx = target.placeholder_context("orders");
553        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
554        assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
555    }
556
557    // ── resolved_prefix_for_display ────────────────────────────────────────
558
559    #[test]
560    fn resolved_prefix_prefers_cloud_prefix_over_path() {
561        let dest = crate::config::DestinationConfig {
562            destination_type: crate::config::DestinationType::S3,
563            prefix: Some("exports/2026-05-21/orders/".into()),
564            path: Some("/scratch".into()),
565            ..Default::default()
566        };
567        assert_eq!(
568            resolved_prefix_for_display(&dest),
569            "exports/2026-05-21/orders/",
570        );
571    }
572
573    #[test]
574    fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
575        let dest = crate::config::DestinationConfig {
576            destination_type: crate::config::DestinationType::Local,
577            prefix: None,
578            path: Some("/data/out".into()),
579            ..Default::default()
580        };
581        assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
582    }
583
584    // ── verdict_fails_exit (exit-code policy) ──────────────────────────────
585
586    use crate::pipeline::ManifestVerificationFailure as VFailure;
587
588    /// Verdict shape `verify_at_destination` returns when `manifest.json`
589    /// exists but cannot be read: not legacy, not passed, one explicit
590    /// `ManifestReadError`.
591    fn read_error_verdict() -> ManifestVerification {
592        ManifestVerification {
593            legacy_run: false,
594            failures: vec![VFailure::ManifestReadError {
595                detail: "permission denied".into(),
596            }],
597            ..ManifestVerification::legacy()
598        }
599    }
600
601    #[test]
602    fn exit_gate_counts_manifest_read_error_as_failure() {
603        assert!(verdict_fails_exit(&read_error_verdict()));
604    }
605
606    #[test]
607    fn exit_gate_keeps_legacy_run_at_zero() {
608        // M6: no manifest, no failures — "cannot certify" is not "found a
609        // problem".
610        assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
611    }
612
613    #[test]
614    fn exit_gate_keeps_advisory_untracked_at_zero() {
615        let v = ManifestVerification {
616            manifest_found: true,
617            legacy_run: false,
618            passed: true,
619            parts_verified: 1,
620            failures: vec![VFailure::UntrackedObject {
621                key: "stray.parquet".into(),
622                size_bytes: 9,
623            }],
624            ..ManifestVerification::legacy()
625        };
626        assert!(!verdict_fails_exit(&v));
627    }
628
629    #[test]
630    fn exit_gate_counts_fatal_failure_on_found_manifest() {
631        let v = ManifestVerification {
632            manifest_found: true,
633            legacy_run: false,
634            failures: vec![VFailure::PartMissing {
635                part_id: 1,
636                path: "part-000001.parquet".into(),
637            }],
638            ..ManifestVerification::legacy()
639        };
640        assert!(verdict_fails_exit(&v));
641    }
642
643    // ── run_validate_command end-to-end (local destination; the source URL
644    //     is never dialed — see tests/validate_historical.rs) ──────────────
645
646    use crate::manifest::{
647        MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
648        PartStatus, RunManifest,
649    };
650
651    fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
652        let row_count: i64 = parts.iter().map(|p| p.rows).sum();
653        let part_count = parts.len() as u32;
654        RunManifest {
655            manifest_version: MANIFEST_VERSION,
656            run_id: "r-validate-cmd".into(),
657            export_name: "orders".into(),
658            started_at: "2026-06-09T12:00:00Z".into(),
659            finished_at: "2026-06-09T12:01:00Z".into(),
660            status: ManifestStatus::Success,
661            source: ManifestSource {
662                engine: "postgres".into(),
663                schema: Some("public".into()),
664                table: Some("orders".into()),
665            },
666            destination: ManifestDestination {
667                kind: "local".into(),
668                uri: "file:///tmp/out".into(),
669            },
670            format: "parquet".into(),
671            compression: "zstd".into(),
672            schema_fingerprint: "xxh3:0123456789abcdef".into(),
673            row_count,
674            part_count,
675            parts,
676            column_checksums: None,
677            checksum_key_column: None,
678        }
679    }
680
681    /// Land `manifest.json` + `_SUCCESS` at `prefix` via the public writer
682    /// surface — same path the `rivet run` end-of-run writer takes.
683    fn stage_dataset(prefix: &Path, m: &RunManifest) {
684        std::fs::create_dir_all(prefix).unwrap();
685        let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
686            destination_type: crate::config::DestinationType::Local,
687            path: Some(prefix.to_string_lossy().into_owned()),
688            ..Default::default()
689        })
690        .unwrap();
691        crate::pipeline::write_manifest(&*dest, m).unwrap();
692    }
693
694    /// Config with a single export pointing at `prefix`.  Written next to —
695    /// never inside — the prefix, so it can't surface as untracked surplus.
696    fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
697        let cfg = dir.join("rivet.yaml");
698        let yaml = format!(
699            "source:\n  type: postgres\n  url: postgresql://nobody@localhost/nope\nexports:\n  - name: orders\n    query: \"SELECT 1\"\n    mode: full\n    format: parquet\n    destination:\n      type: local\n      path: \"{}\"\n",
700            prefix.to_string_lossy()
701        );
702        std::fs::write(&cfg, yaml).unwrap();
703        cfg
704    }
705
706    /// In-process twin of the live roast test (tests/roast_validate_exit.rs):
707    /// `manifest.json` present but unreadable must exit non-zero.  head()
708    /// (fs::metadata) succeeds, read() (fs::read) hits EACCES — exactly the
709    /// `ManifestReadError` verdict.
710    #[cfg(unix)]
711    #[test]
712    fn unreadable_manifest_fails_the_command() {
713        use std::os::unix::fs::PermissionsExt;
714
715        let dir = tempfile::tempdir().unwrap();
716        let prefix = dir.path().join("out");
717        stage_dataset(&prefix, &success_manifest(Vec::new()));
718        let cfg = write_cfg(dir.path(), &prefix);
719
720        let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
721        std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
722        if std::fs::read(&manifest_path).is_ok() {
723            // euid 0 ignores file modes — the degraded state can't be staged.
724            eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
725            return;
726        }
727
728        let report = dir.path().join("report.json");
729        let err = run_validate_command(
730            cfg.to_str().unwrap(),
731            Some("orders"),
732            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
733            ValidateTarget::default(),
734        )
735        .expect_err("an unreadable manifest is an explicit failure, not exit 0");
736        assert!(
737            format!("{err:#}").contains("1 export(s) failed verification"),
738            "got: {err:#}"
739        );
740
741        // The JSON report (written before the bail) still carries the
742        // verdict so the operator sees why.
743        let json: serde_json::Value =
744            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
745        let verification = &json["exports"][0]["verification"];
746        assert_eq!(verification["manifest_found"], false);
747        assert_eq!(verification["legacy_run"], false);
748        assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
749    }
750
751    #[test]
752    fn untracked_surplus_alone_keeps_exit_zero() {
753        // The advisory neighbor of the read-error fix: gating on
754        // `has_failures()` alone would flip this verdict to non-zero, but
755        // surplus cleanup is `--resume`'s job (M9), not validate's.
756        let dir = tempfile::tempdir().unwrap();
757        let prefix = dir.path().join("out");
758        stage_dataset(&prefix, &success_manifest(Vec::new()));
759        std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
760        let cfg = write_cfg(dir.path(), &prefix);
761
762        let report = dir.path().join("report.json");
763        run_validate_command(
764            cfg.to_str().unwrap(),
765            Some("orders"),
766            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
767            ValidateTarget::default(),
768        )
769        .expect("advisory untracked surplus must not flip the exit code");
770
771        let json: serde_json::Value =
772            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
773        let verification = &json["exports"][0]["verification"];
774        assert_eq!(verification["passed"], true);
775        // The stable wire contract is preserved: untracked entries still ride
776        // `verification.failures` (consumers branch on `failures[].kind`).
777        assert_eq!(verification["failures"][0]["kind"], "untracked_object");
778
779        // L14: …and the same advisory entry is also surfaced in the top-level
780        // `warnings` array so "failures means failures" — an exit-0 verdict no
781        // longer hides a surplus object under a "failure" label.
782        let warnings = json["warnings"].as_array().expect("warnings array present");
783        assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
784        assert_eq!(warnings[0]["export_name"], "orders");
785        assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
786        assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
787    }
788
789    #[test]
790    fn json_warnings_array_is_empty_when_no_advisory_failures() {
791        // A clean dataset with no surplus → no warnings.  Guards against the
792        // `warnings` lens accidentally picking up fatal failures.
793        let dir = tempfile::tempdir().unwrap();
794        let prefix = dir.path().join("out");
795        stage_dataset(&prefix, &success_manifest(Vec::new()));
796        let cfg = write_cfg(dir.path(), &prefix);
797
798        let report = dir.path().join("report.json");
799        run_validate_command(
800            cfg.to_str().unwrap(),
801            Some("orders"),
802            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
803            ValidateTarget::default(),
804        )
805        .expect("a clean dataset must pass");
806
807        let json: serde_json::Value =
808            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
809        assert_eq!(
810            json["warnings"]
811                .as_array()
812                .expect("warnings array present")
813                .len(),
814            0,
815            "no surplus → no warnings"
816        );
817    }
818
819    #[test]
820    fn missing_part_fails_the_command() {
821        let dir = tempfile::tempdir().unwrap();
822        let prefix = dir.path().join("out");
823        let m = success_manifest(vec![ManifestPart {
824            part_id: 1,
825            path: "part-000001.parquet".into(),
826            rows: 10,
827            size_bytes: 4,
828            content_fingerprint: "xxh3:1111111111111111".into(),
829            content_md5: String::new(),
830            status: PartStatus::Committed,
831        }]);
832        stage_dataset(&prefix, &m); // the part itself is never written
833        let cfg = write_cfg(dir.path(), &prefix);
834
835        let err = run_validate_command(
836            cfg.to_str().unwrap(),
837            Some("orders"),
838            ValidateOutputFormat::Json(None),
839            ValidateTarget::default(),
840        )
841        .expect_err("a missing committed part must fail verification");
842        assert!(
843            format!("{err:#}").contains("1 export(s) failed verification"),
844            "got: {err:#}"
845        );
846    }
847
848    // ── finding #20: operator-pinned --prefix requires a manifest ────────────
849
850    /// `--prefix` at a real, complete dataset still passes — the normal
851    /// "validate exactly this prefix" case must not regress.
852    #[test]
853    fn prefix_override_with_real_manifest_passes() {
854        let dir = tempfile::tempdir().unwrap();
855        let prefix = dir.path().join("out");
856        stage_dataset(&prefix, &success_manifest(Vec::new()));
857        let cfg = write_cfg(dir.path(), &prefix);
858
859        run_validate_command(
860            cfg.to_str().unwrap(),
861            Some("orders"),
862            ValidateOutputFormat::Json(None),
863            ValidateTarget {
864                prefix_override: Some(prefix.to_string_lossy().into_owned()),
865                ..Default::default()
866            },
867        )
868        .expect("a real dataset under a pinned --prefix must pass");
869    }
870
871    /// `--prefix` at a never-written directory FAILS (exit non-zero): the
872    /// operator asserted a dataset lives here, so an absent manifest is a
873    /// refusal reason, not the benign legacy-run pass. This is the in-process
874    /// twin of the live `audit_validate_absent_prefix_can_fail` roast.
875    #[test]
876    fn prefix_override_at_absent_manifest_fails() {
877        let dir = tempfile::tempdir().unwrap();
878        // The export's config destination is irrelevant — `--prefix` overrides
879        // it. Point the override at a dir that exists but was never written.
880        let cfg_prefix = dir.path().join("cfg_dest");
881        std::fs::create_dir_all(&cfg_prefix).unwrap();
882        let cfg = write_cfg(dir.path(), &cfg_prefix);
883        let empty_prefix = dir.path().join("never_written");
884        std::fs::create_dir_all(&empty_prefix).unwrap();
885
886        let report = dir.path().join("report.json");
887        let err = run_validate_command(
888            cfg.to_str().unwrap(),
889            Some("orders"),
890            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
891            ValidateTarget {
892                prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
893                ..Default::default()
894            },
895        )
896        .expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
897        assert!(
898            format!("{err:#}").contains("1 export(s) failed verification"),
899            "got: {err:#}"
900        );
901
902        // The verdict (written before the bail) carries the explicit reason so
903        // the operator sees why the gate refused, not a bare exit code.
904        let json: serde_json::Value =
905            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
906        let verification = &json["exports"][0]["verification"];
907        assert_eq!(verification["manifest_found"], false);
908        assert_eq!(verification["legacy_run"], false);
909        assert_eq!(
910            verification["failures"][0]["kind"],
911            "manifest_required_but_absent"
912        );
913    }
914
915    /// Without `--prefix`, an absent manifest stays the benign M6 legacy-run
916    /// pass (exit 0) — today's behaviour is preserved for config-resolved
917    /// destinations that may legitimately be pre-0.7.0 prefixes.
918    #[test]
919    fn absent_manifest_without_prefix_override_stays_legacy_pass() {
920        let dir = tempfile::tempdir().unwrap();
921        let prefix = dir.path().join("out");
922        std::fs::create_dir_all(&prefix).unwrap(); // exists, but no manifest
923        let cfg = write_cfg(dir.path(), &prefix);
924
925        run_validate_command(
926            cfg.to_str().unwrap(),
927            Some("orders"),
928            ValidateOutputFormat::Json(None),
929            ValidateTarget::default(), // no --prefix
930        )
931        .expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
932    }
933
934    // ── graded verify layer (--depth) end-to-end ─────────────────────────
935
936    /// Stage a dataset that passes sections 1-5 (manifest reads + is
937    /// self-consistent, the single part is present at the recorded size,
938    /// `_SUCCESS` matches) **but** records a non-empty `column_checksums`, so
939    /// the Form B re-read is *reachable*. The part body is deliberately NOT
940    /// valid Parquet, so if Form B runs it errors on the Parquet open — making
941    /// "did Form B run?" observable as a pass/fail of the command.
942    fn stage_dataset_form_b_would_fail(prefix: &Path) {
943        std::fs::create_dir_all(prefix).unwrap();
944        // 4-byte non-Parquet body; the manifest records size 4 so the part
945        // reconcile (size-only, empty content_md5) passes.
946        let part_body: &[u8] = b"AAAA";
947        std::fs::write(prefix.join("part-000001.parquet"), part_body).unwrap();
948
949        let mut m = success_manifest(vec![ManifestPart {
950            part_id: 1,
951            path: "part-000001.parquet".into(),
952            rows: 1,
953            size_bytes: part_body.len() as u64,
954            content_fingerprint: "xxh3:1111111111111111".into(),
955            content_md5: String::new(),
956            status: PartStatus::Committed,
957        }]);
958        // Non-empty → Form B does NOT early-return; it proceeds to read the
959        // (garbage) part as Parquet and fail.
960        m.column_checksums = Some(vec![crate::manifest::ColumnChecksum {
961            name: "id".into(),
962            checksum: "0".into(),
963        }]);
964        stage_dataset(prefix, &m);
965    }
966
967    #[test]
968    fn sample_depth_does_not_run_form_b() {
969        // At `--depth sample` the structural checks (parts present, _SUCCESS,
970        // self-consistency) all pass and the Form B value re-read is skipped —
971        // so the command succeeds even though the part body is not real Parquet.
972        let dir = tempfile::tempdir().unwrap();
973        let prefix = dir.path().join("out");
974        stage_dataset_form_b_would_fail(&prefix);
975        let cfg = write_cfg(dir.path(), &prefix);
976
977        let report = dir.path().join("report.json");
978        run_validate_command(
979            cfg.to_str().unwrap(),
980            Some("orders"),
981            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
982            ValidateTarget {
983                depth: ValidateDepth::Sample,
984                ..Default::default()
985            },
986        )
987        .expect("sample depth skips Form B, so a non-Parquet part still passes");
988
989        let json: serde_json::Value =
990            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
991        let verification = &json["exports"][0]["verification"];
992        assert_eq!(verification["passed"], true);
993        assert_eq!(verification["parts_verified"], 1, "sample reconciles parts");
994        assert_eq!(verification["depth_level"], "sample");
995    }
996
997    #[test]
998    fn full_depth_runs_form_b() {
999        // The contrast: identical dataset, `--depth full`. Sections 1-5 still
1000        // pass (so `manifest_verified` is true and Form B is gated open), Form B
1001        // re-reads the part, fails to parse it as Parquet, and the command exits
1002        // non-zero. Proves Form B runs at full depth and only at full depth.
1003        let dir = tempfile::tempdir().unwrap();
1004        let prefix = dir.path().join("out");
1005        stage_dataset_form_b_would_fail(&prefix);
1006        let cfg = write_cfg(dir.path(), &prefix);
1007
1008        let err = run_validate_command(
1009            cfg.to_str().unwrap(),
1010            Some("orders"),
1011            ValidateOutputFormat::Json(None),
1012            ValidateTarget {
1013                depth: ValidateDepth::Full,
1014                ..Default::default()
1015            },
1016        )
1017        .expect_err("full depth runs Form B, which fails on a non-Parquet part");
1018        assert!(
1019            format!("{err:#}").contains("1 export(s) failed verification"),
1020            "got: {err:#}"
1021        );
1022    }
1023
1024    #[test]
1025    fn json_report_carries_failure_code_and_depth_level() {
1026        // render_json injects the stable `RIVET_VERIFY_*` code next to each
1027        // failure's `kind`, and the verdict carries the `depth_level` it ran at.
1028        // A missing committed part gives us a fatal failure to inspect.
1029        let dir = tempfile::tempdir().unwrap();
1030        let prefix = dir.path().join("out");
1031        let m = success_manifest(vec![ManifestPart {
1032            part_id: 1,
1033            path: "part-000001.parquet".into(),
1034            rows: 10,
1035            size_bytes: 4,
1036            content_fingerprint: "xxh3:1111111111111111".into(),
1037            content_md5: String::new(),
1038            status: PartStatus::Committed,
1039        }]);
1040        stage_dataset(&prefix, &m); // the part itself is never written → PartMissing
1041        let cfg = write_cfg(dir.path(), &prefix);
1042
1043        let report = dir.path().join("report.json");
1044        let _ = run_validate_command(
1045            cfg.to_str().unwrap(),
1046            Some("orders"),
1047            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
1048            ValidateTarget {
1049                depth: ValidateDepth::Sample,
1050                ..Default::default()
1051            },
1052        )
1053        .expect_err("a missing part fails the command");
1054
1055        let json: serde_json::Value =
1056            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
1057        let verification = &json["exports"][0]["verification"];
1058        // depth_level surfaces the level the pass ran at.
1059        assert_eq!(verification["depth_level"], "sample");
1060        // The PartMissing failure carries BOTH its stable kind and its code.
1061        let failure = &verification["failures"][0];
1062        assert_eq!(failure["kind"], "part_missing");
1063        assert_eq!(failure["code"], "RIVET_VERIFY_PART_MISSING");
1064        // The per-variant fields ride alongside (the derive output is widened,
1065        // not replaced).
1066        assert_eq!(failure["part_id"], 1);
1067    }
1068
1069    #[test]
1070    fn json_warning_entry_also_carries_its_code() {
1071        // The advisory `warnings` lens carries the code too — an untracked
1072        // surplus surfaces `RIVET_VERIFY_UNTRACKED_OBJECT` without flipping exit.
1073        let dir = tempfile::tempdir().unwrap();
1074        let prefix = dir.path().join("out");
1075        stage_dataset(&prefix, &success_manifest(Vec::new()));
1076        std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
1077        let cfg = write_cfg(dir.path(), &prefix);
1078
1079        let report = dir.path().join("report.json");
1080        run_validate_command(
1081            cfg.to_str().unwrap(),
1082            Some("orders"),
1083            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
1084            ValidateTarget::default(),
1085        )
1086        .expect("advisory untracked surplus must not flip the exit code");
1087
1088        let json: serde_json::Value =
1089            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
1090        let warning = &json["warnings"][0]["warning"];
1091        assert_eq!(warning["kind"], "untracked_object");
1092        assert_eq!(warning["code"], "RIVET_VERIFY_UNTRACKED_OBJECT");
1093        // And the default depth (full) is recorded on the verdict.
1094        assert_eq!(json["exports"][0]["verification"]["depth_level"], "full");
1095    }
1096}