Skip to main content

rivet/pipeline/
validate_cmd.rs

1//! **Layer: Coordinator** (config → destination → verification → render)
2//!
3//! `rivet validate` — re-run the manifest-aware `--validate` pass against
4//! an existing destination, without performing an extraction.
5//!
6//! ADR-0013 amendment: this is **not** a new trust noun.  It is a standalone
7//! driver for the same M5/M6 verification surface that `rivet run --validate`
8//! already performs at end-of-run (see [`crate::pipeline::validate_manifest`]).
9//! The verdict shape is identical; the only difference is no source query,
10//! no extraction, no state writes.
11//!
12//! Use cases:
13//! - "Is the output at this prefix still complete?" — Airflow / CI poller
14//!   between runs.
15//! - "Did someone delete a part by mistake?" — operator triage on a
16//!   suspected-broken dataset.
17//! - "Does this legacy prefix have a manifest yet?" — fast check for M6.
18//! - "Was yesterday's run complete?" — `--date YYYY-MM-DD` or `--run-id`
19//!   re-targets a prior day's prefix without re-running the export
20//!   (v0.7.2 historical-validation flags).
21//!
22//! Out of scope:
23//! - Source-side reconciliation (`COUNT(*)`).  That's `--reconcile` /
24//!   `rivet reconcile`, which already exists.
25//! - Per-byte re-fingerprint of every part (`--validate --deep`, future).
26//!
27//! Exit code: `0` if `passed` (or the legacy-run case where the verifier
28//! cannot certify but no failures were seen); non-zero on any explicit
29//! failure (`PartMissing`, `PartSizeMismatch`, `SuccessMarkerStale`, …).
30
31use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41/// Output format mirroring the `rivet reconcile` / `rivet repair` pattern.
42pub enum ValidateOutputFormat {
43    /// Human-readable summary printed to stdout.
44    Pretty,
45    /// JSON to the given path or stdout if `None`.
46    Json(Option<String>),
47}
48
49/// Re-targeting overrides for `rivet validate`.
50///
51/// Default (`ValidateTarget::default()`) reproduces the v0.7.1 behaviour:
52/// resolve `{date}` against today's UTC date, with no `{run_id}`
53/// substitution, and use the config's destination prefix/path unchanged.
54#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56    /// `--date YYYY-MM-DD` — override the date used for `{date}`.
57    pub date: Option<NaiveDate>,
58    /// `--run-id RID` — substitute `{run_id}` in the destination template.
59    pub run_id: Option<String>,
60    /// `--prefix STRING` — bypass placeholder resolution entirely and
61    /// verify exactly this prefix.  Replaces both `prefix` and `path`.
62    pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66    fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67        let mut ctx = match self.date {
68            Some(d) => PlaceholderContext::for_date(d, export_name),
69            None => PlaceholderContext::for_today(export_name),
70        };
71        if let Some(rid) = &self.run_id {
72            ctx = ctx.with_run_id(rid.clone());
73        }
74        ctx
75    }
76}
77
78/// Driver for `rivet validate <export>` (or every export when
79/// `export_name` is `None`).
80///
81/// Returns `Err` on the first explicit verification failure across the
82/// requested exports so an Airflow / CI step can branch on the exit code.
83/// Per-export verdicts are still printed to stdout / written to JSON for
84/// every export, including subsequent ones — the bail at the end is the
85/// last action.
86pub fn run_validate_command(
87    config_path: &str,
88    export_name: Option<&str>,
89    format: ValidateOutputFormat,
90    target: ValidateTarget,
91) -> Result<()> {
92    let config = Config::load_with_params(config_path, None)?;
93
94    let exports: Vec<&crate::config::ExportConfig> = match export_name {
95        Some(name) => match config.exports.iter().find(|e| e.name == name) {
96            Some(e) => vec![e],
97            None => anyhow::bail!("export '{}' not found in config", name),
98        },
99        None => config.exports.iter().collect(),
100    };
101
102    if exports.is_empty() {
103        anyhow::bail!("no exports defined in config — nothing to validate");
104    }
105
106    // `--prefix` only makes sense for a single export; with multiple
107    // exports it would silently re-point all of them at the same physical
108    // bytes.  Catch this at the boundary so we never head-check the wrong
109    // dataset under operator triage pressure.
110    if target.prefix_override.is_some() && exports.len() > 1 {
111        anyhow::bail!(
112            "--prefix requires --export <name>: cannot apply one override to {} exports",
113            exports.len()
114        );
115    }
116
117    let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118    let mut hard_failures: Vec<String> = Vec::new();
119
120    for export in &exports {
121        // Apply the operator-supplied re-targeting if any, else fall back
122        // to today's UTC date (same shape `rivet run` resolves at write
123        // time).
124        let ctx = target.placeholder_context(&export.name);
125        let mut expanded_dest =
126            crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127        if let Some(p) = &target.prefix_override {
128            // Bypass placeholder resolution: trust the operator's literal
129            // prefix.  Replace both `path` (local) and `prefix` (cloud)
130            // so whichever the backend reads picks up the override.
131            expanded_dest.path = Some(p.clone());
132            expanded_dest.prefix = Some(p.clone());
133        }
134        let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135        let dest = match crate::destination::create_destination(&expanded_dest) {
136            Ok(d) => d,
137            Err(e) => {
138                let msg = format!(
139                    "export '{}' (prefix: {}): could not open destination: {:#}",
140                    export.name, resolved_prefix, e
141                );
142                hard_failures.push(msg);
143                continue;
144            }
145        };
146        // Streaming destinations have no prefix to verify — note and skip.
147        if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148        {
149            log::info!(
150                "export '{}': streaming destination, skipping (nothing to verify)",
151                export.name
152            );
153            continue;
154        }
155        match verify_at_destination(&*dest, "") {
156            Ok(mut v) => {
157                // Apply this export's `verify` policy: `content` fails the
158                // verdict when any part is only size-verified (review D).
159                v.enforce_content_policy(export.verify.requires_content());
160                // Finding #20: when the operator pinned a literal `--prefix`,
161                // they asserted a real dataset lives here. An absent manifest is
162                // then NOT the benign M6 legacy-run case (exit 0) — it almost
163                // always means the prefix was never written (a misconfigured CI
164                // gate `rivet validate && deploy` sailing past nothing). Escalate
165                // that exact shape (no manifest, no other failure) to a fatal
166                // `ManifestRequiredButAbsent` so the exit gate refuses it loudly
167                // instead of silently passing. No-op for every other shape (a
168                // real manifest, or an absent one already carrying a read error).
169                if target.prefix_override.is_some() {
170                    v.require_manifest_present(&resolved_prefix);
171                }
172                all_results.push(ExportVerdict {
173                    name: export.name.clone(),
174                    resolved_prefix,
175                    verification: v,
176                });
177            }
178            Err(e) => {
179                hard_failures.push(format!(
180                    "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
181                    export.name, resolved_prefix, e
182                ));
183            }
184        }
185    }
186
187    match format {
188        ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
189        ValidateOutputFormat::Json(out_path) => {
190            render_json(&all_results, &hard_failures, out_path)?
191        }
192    }
193
194    // Exit-code policy: the standalone driver fails when an export's
195    // verdict surfaced an explicit failure it could not pass over
196    // (`verdict_fails_exit`) — an M5 verification failure on a found
197    // manifest (missing part, size mismatch, stale _SUCCESS,
198    // self-inconsistent manifest) or a manifest that could not even be
199    // read (`ManifestReadError`: `manifest_found` is false, but the
200    // verifier has a concrete reason to refuse, not a legacy prefix).
201    // Surplus untracked objects (`UntrackedObject`) are surfaced in
202    // `failures` for operator audit but do NOT flip `passed`, because
203    // their cleanup is M9's job (resume), not validate's.  An operator
204    // who wants strict "no surplus allowed" can grep the JSON report for
205    // `kind: untracked_object` themselves; a future
206    // `rivet validate --strict` flag may surface that exit-code mode if
207    // demand appears (out of scope for this PR).
208    //
209    // Legacy runs (M6) keep exit 0: `passed: false` with no failures
210    // means "verifier cannot certify", not "verifier found a problem".
211    let failed_verdicts = all_results
212        .iter()
213        .filter(|r| verdict_fails_exit(&r.verification))
214        .count();
215    if failed_verdicts > 0 {
216        // A verified-and-wrong verdict (missing part, size mismatch, stale
217        // _SUCCESS, self-inconsistent manifest) is the data-integrity class
218        // (exit 3) — typed so a scheduler stops rather than blindly retries.
219        // `hard_failures` (couldn't open / read the destination) are operational
220        // "could not verify", not "verified wrong", so they fold into the count
221        // but the class is driven by the real verdict failure.
222        return Err(crate::error::DataIntegrityError::new(format!(
223            "rivet validate: {} export(s) failed verification",
224            hard_failures.len() + failed_verdicts
225        ))
226        .into());
227    }
228    if !hard_failures.is_empty() {
229        // Could-not-verify only (no verified-wrong verdict): operational, generic.
230        anyhow::bail!(
231            "rivet validate: {} export(s) failed verification",
232            hard_failures.len()
233        );
234    }
235    Ok(())
236}
237
238/// Exit-code predicate for one export's verdict: non-zero iff the verifier
239/// surfaced an explicit failure (`has_failures` — "a reason an orchestrator
240/// should refuse the run") on a verdict that did not pass.  Both documented
241/// exit-0 cases survive: legacy runs (M6 — `passed: false` with no failures
242/// is "cannot certify", not "found a problem") and advisory-only verdicts
243/// (`UntrackedObject` never flips `passed`).
244fn verdict_fails_exit(v: &ManifestVerification) -> bool {
245    !v.passed && v.has_failures()
246}
247
248/// Per-export verdict plus the resolved physical prefix the verifier
249/// looked at — surfaced in both pretty and JSON output so an operator can
250/// confirm at a glance which bytes were checked.
251struct ExportVerdict {
252    name: String,
253    resolved_prefix: String,
254    verification: ManifestVerification,
255}
256
257/// Render the destination's resolved prefix for human/JSON output.
258///
259/// Cloud backends carry the data location in `prefix`; the local backend
260/// uses `path`.  Falling back to `<unresolved>` should never fire under
261/// normal config (clap + Config::load enforce one of the two) but keeps
262/// `validate` from panicking if a future config shape lands here.
263fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
264    dest.prefix
265        .clone()
266        .or_else(|| dest.path.clone())
267        .unwrap_or_else(|| "<unresolved>".into())
268}
269
270fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
271    use std::io::Write;
272    let stdout = std::io::stdout();
273    let mut h = stdout.lock();
274
275    for r in results {
276        let _ = writeln!(h, "── {} ──", r.name);
277        let _ = writeln!(h, "  prefix:    {}", r.resolved_prefix);
278        let v = &r.verification;
279        if v.legacy_run {
280            let _ = writeln!(
281                h,
282                "  status:    legacy_run (no manifest at destination — pre-0.7.0 prefix)"
283            );
284            continue;
285        }
286        if !v.manifest_found {
287            let _ = writeln!(h, "  status:    NO MANIFEST");
288            // A read-error verdict lands here (manifest present but
289            // unreadable, or head failed): its `failures` are the
290            // operator's only signal, so print them before bailing out
291            // of this export's section.
292            for failure in &v.failures {
293                let _ = writeln!(h, "  failure:   {}", failure);
294            }
295            continue;
296        }
297        let _ = writeln!(
298            h,
299            "  status:    {}",
300            if v.passed { "PASSED" } else { "FAILED" }
301        );
302        let _ = writeln!(
303            h,
304            "  parts:     {} verified ({} md5, {} size-only), {} failed",
305            v.parts_verified,
306            v.parts_md5_verified,
307            v.parts_verified.saturating_sub(v.parts_md5_verified),
308            v.parts_failed
309        );
310        let _ = writeln!(
311            h,
312            "  _SUCCESS:  {}",
313            if v.success_marker_consistent {
314                "consistent"
315            } else if v.failures.iter().any(|f| matches!(
316                f,
317                crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
318                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
319                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
320            )) {
321                "INCONSISTENT (see failures)"
322            } else {
323                "absent (no signal)"
324            }
325        );
326        let _ = writeln!(
327            h,
328            "  manifest:  {}",
329            if v.manifest_self_consistent {
330                "self-consistent"
331            } else {
332                "INCONSISTENT (see failures)"
333            }
334        );
335        for failure in &v.failures {
336            // `Failure: Display` is the single source of truth for the message;
337            // same string the run report uses.  L14: advisory (non-fatal)
338            // entries — `UntrackedObject` surplus — are labelled "warning:" not
339            // "failure:".  They never flip `passed` and never change the exit
340            // code (cleanup is `--resume`'s job, M9), so rendering them as
341            // "failure:" beside exit 0 was contradictory.  Fatal failures keep
342            // the "failure:" label.
343            let label = if failure.is_fatal() {
344                "failure:"
345            } else {
346                "warning:"
347            };
348            let _ = writeln!(h, "  {}   {}", label, failure);
349        }
350    }
351
352    if !hard_failures.is_empty() {
353        let _ = writeln!(h);
354        let _ = writeln!(h, "── errors ──");
355        for e in hard_failures {
356            let _ = writeln!(h, "  {}", e);
357        }
358    }
359    let _ = h.flush();
360}
361
362fn render_json(
363    results: &[ExportVerdict],
364    hard_failures: &[String],
365    out_path: Option<String>,
366) -> Result<()> {
367    // L14: surface advisory (non-fatal) entries — `UntrackedObject` surplus —
368    // in a dedicated top-level `warnings` array so a consumer can tell at a
369    // glance that "failures means failures".  The per-export
370    // `verification.failures` array is the stable wire contract (consumers
371    // branch on `failures[].kind`), so advisory entries stay there too — this
372    // is an additive lens over the same data, not a relocation.
373    let warnings: Vec<serde_json::Value> = results
374        .iter()
375        .flat_map(|r| {
376            r.verification
377                .failures
378                .iter()
379                .filter(|f| !f.is_fatal())
380                .map(move |f| {
381                    serde_json::json!({
382                        "export_name": r.name,
383                        "warning": f,
384                    })
385                })
386        })
387        .collect();
388
389    let payload = serde_json::json!({
390        "exports": results
391            .iter()
392            .map(|r| {
393                serde_json::json!({
394                    "export_name": r.name,
395                    "resolved_prefix": r.resolved_prefix,
396                    "verification": r.verification,
397                })
398            })
399            .collect::<Vec<_>>(),
400        "warnings": warnings,
401        "errors": hard_failures,
402    });
403    let serialized = serde_json::to_string_pretty(&payload)?;
404    match out_path {
405        Some(p) => {
406            std::fs::write(Path::new(&p), &serialized)?;
407            log::info!("rivet validate: wrote JSON report to {}", p);
408        }
409        None => {
410            println!("{}", serialized);
411        }
412    }
413    Ok(())
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    // ── ValidateTarget::placeholder_context ────────────────────────────────
421
422    #[test]
423    fn target_default_uses_today() {
424        let target = ValidateTarget::default();
425        let ctx = target.placeholder_context("orders");
426        assert_eq!(ctx.date, chrono::Utc::now().date_naive());
427        assert_eq!(ctx.export_name, "orders");
428        assert!(ctx.run_id.is_none());
429    }
430
431    #[test]
432    fn target_with_date_overrides_today() {
433        let target = ValidateTarget {
434            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
435            ..Default::default()
436        };
437        let ctx = target.placeholder_context("orders");
438        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
439        assert!(ctx.run_id.is_none());
440    }
441
442    #[test]
443    fn target_composes_date_and_run_id() {
444        // Regression for the "run yesterday, validate today" scenario:
445        // operator passes both --date and --run-id; the resolver must see
446        // both.
447        let target = ValidateTarget {
448            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
449            run_id: Some("r-abc123".into()),
450            prefix_override: None,
451        };
452        let ctx = target.placeholder_context("orders");
453        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
454        assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
455    }
456
457    // ── resolved_prefix_for_display ────────────────────────────────────────
458
459    #[test]
460    fn resolved_prefix_prefers_cloud_prefix_over_path() {
461        let dest = crate::config::DestinationConfig {
462            destination_type: crate::config::DestinationType::S3,
463            prefix: Some("exports/2026-05-21/orders/".into()),
464            path: Some("/scratch".into()),
465            ..Default::default()
466        };
467        assert_eq!(
468            resolved_prefix_for_display(&dest),
469            "exports/2026-05-21/orders/",
470        );
471    }
472
473    #[test]
474    fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
475        let dest = crate::config::DestinationConfig {
476            destination_type: crate::config::DestinationType::Local,
477            prefix: None,
478            path: Some("/data/out".into()),
479            ..Default::default()
480        };
481        assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
482    }
483
484    // ── verdict_fails_exit (exit-code policy) ──────────────────────────────
485
486    use crate::pipeline::ManifestVerificationFailure as VFailure;
487
488    /// Verdict shape `verify_at_destination` returns when `manifest.json`
489    /// exists but cannot be read: not legacy, not passed, one explicit
490    /// `ManifestReadError`.
491    fn read_error_verdict() -> ManifestVerification {
492        ManifestVerification {
493            legacy_run: false,
494            failures: vec![VFailure::ManifestReadError {
495                detail: "permission denied".into(),
496            }],
497            ..ManifestVerification::legacy()
498        }
499    }
500
501    #[test]
502    fn exit_gate_counts_manifest_read_error_as_failure() {
503        assert!(verdict_fails_exit(&read_error_verdict()));
504    }
505
506    #[test]
507    fn exit_gate_keeps_legacy_run_at_zero() {
508        // M6: no manifest, no failures — "cannot certify" is not "found a
509        // problem".
510        assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
511    }
512
513    #[test]
514    fn exit_gate_keeps_advisory_untracked_at_zero() {
515        let v = ManifestVerification {
516            manifest_found: true,
517            legacy_run: false,
518            passed: true,
519            parts_verified: 1,
520            failures: vec![VFailure::UntrackedObject {
521                key: "stray.parquet".into(),
522                size_bytes: 9,
523            }],
524            ..ManifestVerification::legacy()
525        };
526        assert!(!verdict_fails_exit(&v));
527    }
528
529    #[test]
530    fn exit_gate_counts_fatal_failure_on_found_manifest() {
531        let v = ManifestVerification {
532            manifest_found: true,
533            legacy_run: false,
534            failures: vec![VFailure::PartMissing {
535                part_id: 1,
536                path: "part-000001.parquet".into(),
537            }],
538            ..ManifestVerification::legacy()
539        };
540        assert!(verdict_fails_exit(&v));
541    }
542
543    // ── run_validate_command end-to-end (local destination; the source URL
544    //     is never dialed — see tests/validate_historical.rs) ──────────────
545
546    use crate::manifest::{
547        MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
548        PartStatus, RunManifest,
549    };
550
551    fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
552        let row_count: i64 = parts.iter().map(|p| p.rows).sum();
553        let part_count = parts.len() as u32;
554        RunManifest {
555            manifest_version: MANIFEST_VERSION,
556            run_id: "r-validate-cmd".into(),
557            export_name: "orders".into(),
558            started_at: "2026-06-09T12:00:00Z".into(),
559            finished_at: "2026-06-09T12:01:00Z".into(),
560            status: ManifestStatus::Success,
561            source: ManifestSource {
562                engine: "postgres".into(),
563                schema: Some("public".into()),
564                table: Some("orders".into()),
565            },
566            destination: ManifestDestination {
567                kind: "local".into(),
568                uri: "file:///tmp/out".into(),
569            },
570            format: "parquet".into(),
571            compression: "zstd".into(),
572            schema_fingerprint: "xxh3:0123456789abcdef".into(),
573            row_count,
574            part_count,
575            parts,
576        }
577    }
578
579    /// Land `manifest.json` + `_SUCCESS` at `prefix` via the public writer
580    /// surface — same path the `rivet run` end-of-run writer takes.
581    fn stage_dataset(prefix: &Path, m: &RunManifest) {
582        std::fs::create_dir_all(prefix).unwrap();
583        let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
584            destination_type: crate::config::DestinationType::Local,
585            path: Some(prefix.to_string_lossy().into_owned()),
586            ..Default::default()
587        })
588        .unwrap();
589        crate::pipeline::write_manifest(&*dest, m).unwrap();
590    }
591
592    /// Config with a single export pointing at `prefix`.  Written next to —
593    /// never inside — the prefix, so it can't surface as untracked surplus.
594    fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
595        let cfg = dir.join("rivet.yaml");
596        let yaml = format!(
597            "source:\n  type: postgres\n  url: postgresql://nobody@localhost/nope\nexports:\n  - name: orders\n    query: \"SELECT 1\"\n    mode: full\n    format: parquet\n    destination:\n      type: local\n      path: \"{}\"\n",
598            prefix.to_string_lossy()
599        );
600        std::fs::write(&cfg, yaml).unwrap();
601        cfg
602    }
603
604    /// In-process twin of the live roast test (tests/roast_validate_exit.rs):
605    /// `manifest.json` present but unreadable must exit non-zero.  head()
606    /// (fs::metadata) succeeds, read() (fs::read) hits EACCES — exactly the
607    /// `ManifestReadError` verdict.
608    #[cfg(unix)]
609    #[test]
610    fn unreadable_manifest_fails_the_command() {
611        use std::os::unix::fs::PermissionsExt;
612
613        let dir = tempfile::tempdir().unwrap();
614        let prefix = dir.path().join("out");
615        stage_dataset(&prefix, &success_manifest(Vec::new()));
616        let cfg = write_cfg(dir.path(), &prefix);
617
618        let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
619        std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
620        if std::fs::read(&manifest_path).is_ok() {
621            // euid 0 ignores file modes — the degraded state can't be staged.
622            eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
623            return;
624        }
625
626        let report = dir.path().join("report.json");
627        let err = run_validate_command(
628            cfg.to_str().unwrap(),
629            Some("orders"),
630            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
631            ValidateTarget::default(),
632        )
633        .expect_err("an unreadable manifest is an explicit failure, not exit 0");
634        assert!(
635            format!("{err:#}").contains("1 export(s) failed verification"),
636            "got: {err:#}"
637        );
638
639        // The JSON report (written before the bail) still carries the
640        // verdict so the operator sees why.
641        let json: serde_json::Value =
642            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
643        let verification = &json["exports"][0]["verification"];
644        assert_eq!(verification["manifest_found"], false);
645        assert_eq!(verification["legacy_run"], false);
646        assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
647    }
648
649    #[test]
650    fn untracked_surplus_alone_keeps_exit_zero() {
651        // The advisory neighbor of the read-error fix: gating on
652        // `has_failures()` alone would flip this verdict to non-zero, but
653        // surplus cleanup is `--resume`'s job (M9), not validate's.
654        let dir = tempfile::tempdir().unwrap();
655        let prefix = dir.path().join("out");
656        stage_dataset(&prefix, &success_manifest(Vec::new()));
657        std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
658        let cfg = write_cfg(dir.path(), &prefix);
659
660        let report = dir.path().join("report.json");
661        run_validate_command(
662            cfg.to_str().unwrap(),
663            Some("orders"),
664            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
665            ValidateTarget::default(),
666        )
667        .expect("advisory untracked surplus must not flip the exit code");
668
669        let json: serde_json::Value =
670            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
671        let verification = &json["exports"][0]["verification"];
672        assert_eq!(verification["passed"], true);
673        // The stable wire contract is preserved: untracked entries still ride
674        // `verification.failures` (consumers branch on `failures[].kind`).
675        assert_eq!(verification["failures"][0]["kind"], "untracked_object");
676
677        // L14: …and the same advisory entry is also surfaced in the top-level
678        // `warnings` array so "failures means failures" — an exit-0 verdict no
679        // longer hides a surplus object under a "failure" label.
680        let warnings = json["warnings"].as_array().expect("warnings array present");
681        assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
682        assert_eq!(warnings[0]["export_name"], "orders");
683        assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
684        assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
685    }
686
687    #[test]
688    fn json_warnings_array_is_empty_when_no_advisory_failures() {
689        // A clean dataset with no surplus → no warnings.  Guards against the
690        // `warnings` lens accidentally picking up fatal failures.
691        let dir = tempfile::tempdir().unwrap();
692        let prefix = dir.path().join("out");
693        stage_dataset(&prefix, &success_manifest(Vec::new()));
694        let cfg = write_cfg(dir.path(), &prefix);
695
696        let report = dir.path().join("report.json");
697        run_validate_command(
698            cfg.to_str().unwrap(),
699            Some("orders"),
700            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
701            ValidateTarget::default(),
702        )
703        .expect("a clean dataset must pass");
704
705        let json: serde_json::Value =
706            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
707        assert_eq!(
708            json["warnings"]
709                .as_array()
710                .expect("warnings array present")
711                .len(),
712            0,
713            "no surplus → no warnings"
714        );
715    }
716
717    #[test]
718    fn missing_part_fails_the_command() {
719        let dir = tempfile::tempdir().unwrap();
720        let prefix = dir.path().join("out");
721        let m = success_manifest(vec![ManifestPart {
722            part_id: 1,
723            path: "part-000001.parquet".into(),
724            rows: 10,
725            size_bytes: 4,
726            content_fingerprint: "xxh3:1111111111111111".into(),
727            content_md5: String::new(),
728            status: PartStatus::Committed,
729        }]);
730        stage_dataset(&prefix, &m); // the part itself is never written
731        let cfg = write_cfg(dir.path(), &prefix);
732
733        let err = run_validate_command(
734            cfg.to_str().unwrap(),
735            Some("orders"),
736            ValidateOutputFormat::Json(None),
737            ValidateTarget::default(),
738        )
739        .expect_err("a missing committed part must fail verification");
740        assert!(
741            format!("{err:#}").contains("1 export(s) failed verification"),
742            "got: {err:#}"
743        );
744    }
745
746    // ── finding #20: operator-pinned --prefix requires a manifest ────────────
747
748    /// `--prefix` at a real, complete dataset still passes — the normal
749    /// "validate exactly this prefix" case must not regress.
750    #[test]
751    fn prefix_override_with_real_manifest_passes() {
752        let dir = tempfile::tempdir().unwrap();
753        let prefix = dir.path().join("out");
754        stage_dataset(&prefix, &success_manifest(Vec::new()));
755        let cfg = write_cfg(dir.path(), &prefix);
756
757        run_validate_command(
758            cfg.to_str().unwrap(),
759            Some("orders"),
760            ValidateOutputFormat::Json(None),
761            ValidateTarget {
762                prefix_override: Some(prefix.to_string_lossy().into_owned()),
763                ..Default::default()
764            },
765        )
766        .expect("a real dataset under a pinned --prefix must pass");
767    }
768
769    /// `--prefix` at a never-written directory FAILS (exit non-zero): the
770    /// operator asserted a dataset lives here, so an absent manifest is a
771    /// refusal reason, not the benign legacy-run pass. This is the in-process
772    /// twin of the live `audit_validate_absent_prefix_can_fail` roast.
773    #[test]
774    fn prefix_override_at_absent_manifest_fails() {
775        let dir = tempfile::tempdir().unwrap();
776        // The export's config destination is irrelevant — `--prefix` overrides
777        // it. Point the override at a dir that exists but was never written.
778        let cfg_prefix = dir.path().join("cfg_dest");
779        std::fs::create_dir_all(&cfg_prefix).unwrap();
780        let cfg = write_cfg(dir.path(), &cfg_prefix);
781        let empty_prefix = dir.path().join("never_written");
782        std::fs::create_dir_all(&empty_prefix).unwrap();
783
784        let report = dir.path().join("report.json");
785        let err = run_validate_command(
786            cfg.to_str().unwrap(),
787            Some("orders"),
788            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
789            ValidateTarget {
790                prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
791                ..Default::default()
792            },
793        )
794        .expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
795        assert!(
796            format!("{err:#}").contains("1 export(s) failed verification"),
797            "got: {err:#}"
798        );
799
800        // The verdict (written before the bail) carries the explicit reason so
801        // the operator sees why the gate refused, not a bare exit code.
802        let json: serde_json::Value =
803            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
804        let verification = &json["exports"][0]["verification"];
805        assert_eq!(verification["manifest_found"], false);
806        assert_eq!(verification["legacy_run"], false);
807        assert_eq!(
808            verification["failures"][0]["kind"],
809            "manifest_required_but_absent"
810        );
811    }
812
813    /// Without `--prefix`, an absent manifest stays the benign M6 legacy-run
814    /// pass (exit 0) — today's behaviour is preserved for config-resolved
815    /// destinations that may legitimately be pre-0.7.0 prefixes.
816    #[test]
817    fn absent_manifest_without_prefix_override_stays_legacy_pass() {
818        let dir = tempfile::tempdir().unwrap();
819        let prefix = dir.path().join("out");
820        std::fs::create_dir_all(&prefix).unwrap(); // exists, but no manifest
821        let cfg = write_cfg(dir.path(), &prefix);
822
823        run_validate_command(
824            cfg.to_str().unwrap(),
825            Some("orders"),
826            ValidateOutputFormat::Json(None),
827            ValidateTarget::default(), // no --prefix
828        )
829        .expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
830    }
831}