Skip to main content

rivet/pipeline/
validate_cmd.rs

1//! **Layer: Coordinator** (config → destination → verification → render)
2//!
3//! `rivet validate` — re-run the manifest-aware `--validate` pass against
4//! an existing destination, without performing an extraction.
5//!
6//! ADR-0013 amendment: this is **not** a new trust noun.  It is a standalone
7//! driver for the same M5/M6 verification surface that `rivet run --validate`
8//! already performs at end-of-run (see [`crate::pipeline::validate_manifest`]).
9//! The verdict shape is identical; the only difference is no source query,
10//! no extraction, no state writes.
11//!
12//! Use cases:
13//! - "Is the output at this prefix still complete?" — Airflow / CI poller
14//!   between runs.
15//! - "Did someone delete a part by mistake?" — operator triage on a
16//!   suspected-broken dataset.
17//! - "Does this legacy prefix have a manifest yet?" — fast check for M6.
18//! - "Was yesterday's run complete?" — `--date YYYY-MM-DD` or `--run-id`
19//!   re-targets a prior day's prefix without re-running the export
20//!   (v0.7.2 historical-validation flags).
21//!
22//! Out of scope:
23//! - Source-side reconciliation (`COUNT(*)`).  That's `--reconcile` /
24//!   `rivet reconcile`, which already exists.
25//! - Per-byte re-fingerprint of every part (`--validate --deep`, future).
26//!
27//! Exit code: `0` if `passed` (or the legacy-run case where the verifier
28//! cannot certify but no failures were seen); non-zero on any explicit
29//! failure (`PartMissing`, `PartSizeMismatch`, `SuccessMarkerStale`, …).
30
31use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41/// Output format mirroring the `rivet reconcile` / `rivet repair` pattern.
42pub enum ValidateOutputFormat {
43    /// Human-readable summary printed to stdout.
44    Pretty,
45    /// JSON to the given path or stdout if `None`.
46    Json(Option<String>),
47}
48
49/// Re-targeting overrides for `rivet validate`.
50///
51/// Default (`ValidateTarget::default()`) reproduces the v0.7.1 behaviour:
52/// resolve `{date}` against today's UTC date, with no `{run_id}`
53/// substitution, and use the config's destination prefix/path unchanged.
54#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56    /// `--date YYYY-MM-DD` — override the date used for `{date}`.
57    pub date: Option<NaiveDate>,
58    /// `--run-id RID` — substitute `{run_id}` in the destination template.
59    pub run_id: Option<String>,
60    /// `--prefix STRING` — bypass placeholder resolution entirely and
61    /// verify exactly this prefix.  Replaces both `prefix` and `path`.
62    pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66    fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67        let mut ctx = match self.date {
68            Some(d) => PlaceholderContext::for_date(d, export_name),
69            None => PlaceholderContext::for_today(export_name),
70        };
71        if let Some(rid) = &self.run_id {
72            ctx = ctx.with_run_id(rid.clone());
73        }
74        ctx
75    }
76}
77
78/// Driver for `rivet validate <export>` (or every export when
79/// `export_name` is `None`).
80///
81/// Returns `Err` on the first explicit verification failure across the
82/// requested exports so an Airflow / CI step can branch on the exit code.
83/// Per-export verdicts are still printed to stdout / written to JSON for
84/// every export, including subsequent ones — the bail at the end is the
85/// last action.
86pub fn run_validate_command(
87    config_path: &str,
88    export_name: Option<&str>,
89    format: ValidateOutputFormat,
90    target: ValidateTarget,
91) -> Result<()> {
92    let config = Config::load_with_params(config_path, None)?;
93
94    let exports: Vec<&crate::config::ExportConfig> = match export_name {
95        Some(name) => match config.exports.iter().find(|e| e.name == name) {
96            Some(e) => vec![e],
97            None => anyhow::bail!("export '{}' not found in config", name),
98        },
99        None => config.exports.iter().collect(),
100    };
101
102    if exports.is_empty() {
103        anyhow::bail!("no exports defined in config — nothing to validate");
104    }
105
106    // `--prefix` only makes sense for a single export; with multiple
107    // exports it would silently re-point all of them at the same physical
108    // bytes.  Catch this at the boundary so we never head-check the wrong
109    // dataset under operator triage pressure.
110    if target.prefix_override.is_some() && exports.len() > 1 {
111        anyhow::bail!(
112            "--prefix requires --export <name>: cannot apply one override to {} exports",
113            exports.len()
114        );
115    }
116
117    let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118    let mut hard_failures: Vec<String> = Vec::new();
119
120    for export in &exports {
121        // Apply the operator-supplied re-targeting if any, else fall back
122        // to today's UTC date (same shape `rivet run` resolves at write
123        // time).
124        let ctx = target.placeholder_context(&export.name);
125        let mut expanded_dest =
126            crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127        if let Some(p) = &target.prefix_override {
128            // Bypass placeholder resolution: trust the operator's literal
129            // prefix.  Replace both `path` (local) and `prefix` (cloud)
130            // so whichever the backend reads picks up the override.
131            expanded_dest.path = Some(p.clone());
132            expanded_dest.prefix = Some(p.clone());
133        }
134        let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135        let dest = match crate::destination::create_destination(&expanded_dest) {
136            Ok(d) => d,
137            Err(e) => {
138                let msg = format!(
139                    "export '{}' (prefix: {}): could not open destination: {:#}",
140                    export.name, resolved_prefix, e
141                );
142                hard_failures.push(msg);
143                continue;
144            }
145        };
146        // Streaming destinations have no prefix to verify — note and skip.
147        if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148        {
149            log::info!(
150                "export '{}': streaming destination, skipping (nothing to verify)",
151                export.name
152            );
153            continue;
154        }
155        match verify_at_destination(&*dest, "") {
156            Ok(mut v) => {
157                // Apply this export's `verify` policy: `content` fails the
158                // verdict when any part is only size-verified (review D).
159                v.enforce_content_policy(export.verify.requires_content());
160                // Finding #20: when the operator pinned a literal `--prefix`,
161                // they asserted a real dataset lives here. An absent manifest is
162                // then NOT the benign M6 legacy-run case (exit 0) — it almost
163                // always means the prefix was never written (a misconfigured CI
164                // gate `rivet validate && deploy` sailing past nothing). Escalate
165                // that exact shape (no manifest, no other failure) to a fatal
166                // `ManifestRequiredButAbsent` so the exit gate refuses it loudly
167                // instead of silently passing. No-op for every other shape (a
168                // real manifest, or an absent one already carrying a read error).
169                if target.prefix_override.is_some() {
170                    v.require_manifest_present(&resolved_prefix);
171                }
172                all_results.push(ExportVerdict {
173                    name: export.name.clone(),
174                    resolved_prefix,
175                    verification: v,
176                });
177                // CDC-specific: re-read the parts and confirm `__pos` stayed in
178                // source-log order (no reorder / no part-boundary overlap). The
179                // manifest check above already covered per-part MD5 / size / _SUCCESS.
180                if export.mode == crate::config::ExportMode::Cdc
181                    && export.format == crate::config::FormatType::Parquet
182                {
183                    match crate::source::cdc::validate::check_positions(&*dest, "") {
184                        Ok(pc) if pc.is_ok() => log::info!(
185                            "export '{}': cdc __pos continuity OK — {} changes across {} parts, range {:?}..{:?}",
186                            export.name,
187                            pc.rows,
188                            pc.parts,
189                            pc.first,
190                            pc.last
191                        ),
192                        Ok(pc) => {
193                            for v in &pc.violations {
194                                hard_failures
195                                    .push(format!("export '{}': cdc __pos: {}", export.name, v));
196                            }
197                        }
198                        Err(e) => hard_failures.push(format!(
199                            "export '{}': cdc __pos check failed: {:#}",
200                            export.name, e
201                        )),
202                    }
203                }
204            }
205            Err(e) => {
206                hard_failures.push(format!(
207                    "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
208                    export.name, resolved_prefix, e
209                ));
210            }
211        }
212    }
213
214    match format {
215        ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
216        ValidateOutputFormat::Json(out_path) => {
217            render_json(&all_results, &hard_failures, out_path)?
218        }
219    }
220
221    // Exit-code policy: the standalone driver fails when an export's
222    // verdict surfaced an explicit failure it could not pass over
223    // (`verdict_fails_exit`) — an M5 verification failure on a found
224    // manifest (missing part, size mismatch, stale _SUCCESS,
225    // self-inconsistent manifest) or a manifest that could not even be
226    // read (`ManifestReadError`: `manifest_found` is false, but the
227    // verifier has a concrete reason to refuse, not a legacy prefix).
228    // Surplus untracked objects (`UntrackedObject`) are surfaced in
229    // `failures` for operator audit but do NOT flip `passed`, because
230    // their cleanup is M9's job (resume), not validate's.  An operator
231    // who wants strict "no surplus allowed" can grep the JSON report for
232    // `kind: untracked_object` themselves; a future
233    // `rivet validate --strict` flag may surface that exit-code mode if
234    // demand appears (out of scope for this PR).
235    //
236    // Legacy runs (M6) keep exit 0: `passed: false` with no failures
237    // means "verifier cannot certify", not "verifier found a problem".
238    let failed_verdicts = all_results
239        .iter()
240        .filter(|r| verdict_fails_exit(&r.verification))
241        .count();
242    if failed_verdicts > 0 {
243        // A verified-and-wrong verdict (missing part, size mismatch, stale
244        // _SUCCESS, self-inconsistent manifest) is the data-integrity class
245        // (exit 3) — typed so a scheduler stops rather than blindly retries.
246        // `hard_failures` (couldn't open / read the destination) are operational
247        // "could not verify", not "verified wrong", so they fold into the count
248        // but the class is driven by the real verdict failure.
249        return Err(crate::error::DataIntegrityError::new(format!(
250            "rivet validate: {} export(s) failed verification",
251            hard_failures.len() + failed_verdicts
252        ))
253        .into());
254    }
255    if !hard_failures.is_empty() {
256        // Could-not-verify only (no verified-wrong verdict): operational, generic.
257        anyhow::bail!(
258            "rivet validate: {} export(s) failed verification",
259            hard_failures.len()
260        );
261    }
262    Ok(())
263}
264
265/// Exit-code predicate for one export's verdict: non-zero iff the verifier
266/// surfaced an explicit failure (`has_failures` — "a reason an orchestrator
267/// should refuse the run") on a verdict that did not pass.  Both documented
268/// exit-0 cases survive: legacy runs (M6 — `passed: false` with no failures
269/// is "cannot certify", not "found a problem") and advisory-only verdicts
270/// (`UntrackedObject` never flips `passed`).
271fn verdict_fails_exit(v: &ManifestVerification) -> bool {
272    !v.passed && v.has_failures()
273}
274
275/// Per-export verdict plus the resolved physical prefix the verifier
276/// looked at — surfaced in both pretty and JSON output so an operator can
277/// confirm at a glance which bytes were checked.
278struct ExportVerdict {
279    name: String,
280    resolved_prefix: String,
281    verification: ManifestVerification,
282}
283
284/// Render the destination's resolved prefix for human/JSON output.
285///
286/// Cloud backends carry the data location in `prefix`; the local backend
287/// uses `path`.  Falling back to `<unresolved>` should never fire under
288/// normal config (clap + Config::load enforce one of the two) but keeps
289/// `validate` from panicking if a future config shape lands here.
290fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
291    dest.prefix
292        .clone()
293        .or_else(|| dest.path.clone())
294        .unwrap_or_else(|| "<unresolved>".into())
295}
296
297fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
298    use std::io::Write;
299    let stdout = std::io::stdout();
300    let mut h = stdout.lock();
301
302    for r in results {
303        let _ = writeln!(h, "── {} ──", r.name);
304        let _ = writeln!(h, "  prefix:    {}", r.resolved_prefix);
305        let v = &r.verification;
306        if v.legacy_run {
307            let _ = writeln!(
308                h,
309                "  status:    legacy_run (no manifest at destination — pre-0.7.0 prefix)"
310            );
311            continue;
312        }
313        if !v.manifest_found {
314            let _ = writeln!(h, "  status:    NO MANIFEST");
315            // A read-error verdict lands here (manifest present but
316            // unreadable, or head failed): its `failures` are the
317            // operator's only signal, so print them before bailing out
318            // of this export's section.
319            for failure in &v.failures {
320                let _ = writeln!(h, "  failure:   {}", failure);
321            }
322            continue;
323        }
324        let _ = writeln!(
325            h,
326            "  status:    {}",
327            if v.passed { "PASSED" } else { "FAILED" }
328        );
329        let _ = writeln!(
330            h,
331            "  parts:     {} verified ({} md5, {} size-only), {} failed",
332            v.parts_verified,
333            v.parts_md5_verified,
334            v.parts_verified.saturating_sub(v.parts_md5_verified),
335            v.parts_failed
336        );
337        let _ = writeln!(
338            h,
339            "  _SUCCESS:  {}",
340            if v.success_marker_consistent {
341                "consistent"
342            } else if v.failures.iter().any(|f| matches!(
343                f,
344                crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
345                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
346                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
347            )) {
348                "INCONSISTENT (see failures)"
349            } else {
350                "absent (no signal)"
351            }
352        );
353        let _ = writeln!(
354            h,
355            "  manifest:  {}",
356            if v.manifest_self_consistent {
357                "self-consistent"
358            } else {
359                "INCONSISTENT (see failures)"
360            }
361        );
362        for failure in &v.failures {
363            // `Failure: Display` is the single source of truth for the message;
364            // same string the run report uses.  L14: advisory (non-fatal)
365            // entries — `UntrackedObject` surplus — are labelled "warning:" not
366            // "failure:".  They never flip `passed` and never change the exit
367            // code (cleanup is `--resume`'s job, M9), so rendering them as
368            // "failure:" beside exit 0 was contradictory.  Fatal failures keep
369            // the "failure:" label.
370            let label = if failure.is_fatal() {
371                "failure:"
372            } else {
373                "warning:"
374            };
375            let _ = writeln!(h, "  {}   {}", label, failure);
376        }
377    }
378
379    if !hard_failures.is_empty() {
380        let _ = writeln!(h);
381        let _ = writeln!(h, "── errors ──");
382        for e in hard_failures {
383            let _ = writeln!(h, "  {}", e);
384        }
385    }
386    let _ = h.flush();
387}
388
389fn render_json(
390    results: &[ExportVerdict],
391    hard_failures: &[String],
392    out_path: Option<String>,
393) -> Result<()> {
394    // L14: surface advisory (non-fatal) entries — `UntrackedObject` surplus —
395    // in a dedicated top-level `warnings` array so a consumer can tell at a
396    // glance that "failures means failures".  The per-export
397    // `verification.failures` array is the stable wire contract (consumers
398    // branch on `failures[].kind`), so advisory entries stay there too — this
399    // is an additive lens over the same data, not a relocation.
400    let warnings: Vec<serde_json::Value> = results
401        .iter()
402        .flat_map(|r| {
403            r.verification
404                .failures
405                .iter()
406                .filter(|f| !f.is_fatal())
407                .map(move |f| {
408                    serde_json::json!({
409                        "export_name": r.name,
410                        "warning": f,
411                    })
412                })
413        })
414        .collect();
415
416    let payload = serde_json::json!({
417        "exports": results
418            .iter()
419            .map(|r| {
420                serde_json::json!({
421                    "export_name": r.name,
422                    "resolved_prefix": r.resolved_prefix,
423                    "verification": r.verification,
424                })
425            })
426            .collect::<Vec<_>>(),
427        "warnings": warnings,
428        "errors": hard_failures,
429    });
430    let serialized = serde_json::to_string_pretty(&payload)?;
431    match out_path {
432        Some(p) => {
433            std::fs::write(Path::new(&p), &serialized)?;
434            log::info!("rivet validate: wrote JSON report to {}", p);
435        }
436        None => {
437            println!("{}", serialized);
438        }
439    }
440    Ok(())
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446
447    // ── ValidateTarget::placeholder_context ────────────────────────────────
448
449    #[test]
450    fn target_default_uses_today() {
451        let target = ValidateTarget::default();
452        let ctx = target.placeholder_context("orders");
453        assert_eq!(ctx.date, chrono::Utc::now().date_naive());
454        assert_eq!(ctx.export_name, "orders");
455        assert!(ctx.run_id.is_none());
456    }
457
458    #[test]
459    fn target_with_date_overrides_today() {
460        let target = ValidateTarget {
461            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
462            ..Default::default()
463        };
464        let ctx = target.placeholder_context("orders");
465        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
466        assert!(ctx.run_id.is_none());
467    }
468
469    #[test]
470    fn target_composes_date_and_run_id() {
471        // Regression for the "run yesterday, validate today" scenario:
472        // operator passes both --date and --run-id; the resolver must see
473        // both.
474        let target = ValidateTarget {
475            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
476            run_id: Some("r-abc123".into()),
477            prefix_override: None,
478        };
479        let ctx = target.placeholder_context("orders");
480        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
481        assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
482    }
483
484    // ── resolved_prefix_for_display ────────────────────────────────────────
485
486    #[test]
487    fn resolved_prefix_prefers_cloud_prefix_over_path() {
488        let dest = crate::config::DestinationConfig {
489            destination_type: crate::config::DestinationType::S3,
490            prefix: Some("exports/2026-05-21/orders/".into()),
491            path: Some("/scratch".into()),
492            ..Default::default()
493        };
494        assert_eq!(
495            resolved_prefix_for_display(&dest),
496            "exports/2026-05-21/orders/",
497        );
498    }
499
500    #[test]
501    fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
502        let dest = crate::config::DestinationConfig {
503            destination_type: crate::config::DestinationType::Local,
504            prefix: None,
505            path: Some("/data/out".into()),
506            ..Default::default()
507        };
508        assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
509    }
510
511    // ── verdict_fails_exit (exit-code policy) ──────────────────────────────
512
513    use crate::pipeline::ManifestVerificationFailure as VFailure;
514
515    /// Verdict shape `verify_at_destination` returns when `manifest.json`
516    /// exists but cannot be read: not legacy, not passed, one explicit
517    /// `ManifestReadError`.
518    fn read_error_verdict() -> ManifestVerification {
519        ManifestVerification {
520            legacy_run: false,
521            failures: vec![VFailure::ManifestReadError {
522                detail: "permission denied".into(),
523            }],
524            ..ManifestVerification::legacy()
525        }
526    }
527
528    #[test]
529    fn exit_gate_counts_manifest_read_error_as_failure() {
530        assert!(verdict_fails_exit(&read_error_verdict()));
531    }
532
533    #[test]
534    fn exit_gate_keeps_legacy_run_at_zero() {
535        // M6: no manifest, no failures — "cannot certify" is not "found a
536        // problem".
537        assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
538    }
539
540    #[test]
541    fn exit_gate_keeps_advisory_untracked_at_zero() {
542        let v = ManifestVerification {
543            manifest_found: true,
544            legacy_run: false,
545            passed: true,
546            parts_verified: 1,
547            failures: vec![VFailure::UntrackedObject {
548                key: "stray.parquet".into(),
549                size_bytes: 9,
550            }],
551            ..ManifestVerification::legacy()
552        };
553        assert!(!verdict_fails_exit(&v));
554    }
555
556    #[test]
557    fn exit_gate_counts_fatal_failure_on_found_manifest() {
558        let v = ManifestVerification {
559            manifest_found: true,
560            legacy_run: false,
561            failures: vec![VFailure::PartMissing {
562                part_id: 1,
563                path: "part-000001.parquet".into(),
564            }],
565            ..ManifestVerification::legacy()
566        };
567        assert!(verdict_fails_exit(&v));
568    }
569
570    // ── run_validate_command end-to-end (local destination; the source URL
571    //     is never dialed — see tests/validate_historical.rs) ──────────────
572
573    use crate::manifest::{
574        MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
575        PartStatus, RunManifest,
576    };
577
578    fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
579        let row_count: i64 = parts.iter().map(|p| p.rows).sum();
580        let part_count = parts.len() as u32;
581        RunManifest {
582            manifest_version: MANIFEST_VERSION,
583            run_id: "r-validate-cmd".into(),
584            export_name: "orders".into(),
585            started_at: "2026-06-09T12:00:00Z".into(),
586            finished_at: "2026-06-09T12:01:00Z".into(),
587            status: ManifestStatus::Success,
588            source: ManifestSource {
589                engine: "postgres".into(),
590                schema: Some("public".into()),
591                table: Some("orders".into()),
592            },
593            destination: ManifestDestination {
594                kind: "local".into(),
595                uri: "file:///tmp/out".into(),
596            },
597            format: "parquet".into(),
598            compression: "zstd".into(),
599            schema_fingerprint: "xxh3:0123456789abcdef".into(),
600            row_count,
601            part_count,
602            parts,
603        }
604    }
605
606    /// Land `manifest.json` + `_SUCCESS` at `prefix` via the public writer
607    /// surface — same path the `rivet run` end-of-run writer takes.
608    fn stage_dataset(prefix: &Path, m: &RunManifest) {
609        std::fs::create_dir_all(prefix).unwrap();
610        let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
611            destination_type: crate::config::DestinationType::Local,
612            path: Some(prefix.to_string_lossy().into_owned()),
613            ..Default::default()
614        })
615        .unwrap();
616        crate::pipeline::write_manifest(&*dest, m).unwrap();
617    }
618
619    /// Config with a single export pointing at `prefix`.  Written next to —
620    /// never inside — the prefix, so it can't surface as untracked surplus.
621    fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
622        let cfg = dir.join("rivet.yaml");
623        let yaml = format!(
624            "source:\n  type: postgres\n  url: postgresql://nobody@localhost/nope\nexports:\n  - name: orders\n    query: \"SELECT 1\"\n    mode: full\n    format: parquet\n    destination:\n      type: local\n      path: \"{}\"\n",
625            prefix.to_string_lossy()
626        );
627        std::fs::write(&cfg, yaml).unwrap();
628        cfg
629    }
630
631    /// In-process twin of the live roast test (tests/roast_validate_exit.rs):
632    /// `manifest.json` present but unreadable must exit non-zero.  head()
633    /// (fs::metadata) succeeds, read() (fs::read) hits EACCES — exactly the
634    /// `ManifestReadError` verdict.
635    #[cfg(unix)]
636    #[test]
637    fn unreadable_manifest_fails_the_command() {
638        use std::os::unix::fs::PermissionsExt;
639
640        let dir = tempfile::tempdir().unwrap();
641        let prefix = dir.path().join("out");
642        stage_dataset(&prefix, &success_manifest(Vec::new()));
643        let cfg = write_cfg(dir.path(), &prefix);
644
645        let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
646        std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
647        if std::fs::read(&manifest_path).is_ok() {
648            // euid 0 ignores file modes — the degraded state can't be staged.
649            eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
650            return;
651        }
652
653        let report = dir.path().join("report.json");
654        let err = run_validate_command(
655            cfg.to_str().unwrap(),
656            Some("orders"),
657            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
658            ValidateTarget::default(),
659        )
660        .expect_err("an unreadable manifest is an explicit failure, not exit 0");
661        assert!(
662            format!("{err:#}").contains("1 export(s) failed verification"),
663            "got: {err:#}"
664        );
665
666        // The JSON report (written before the bail) still carries the
667        // verdict so the operator sees why.
668        let json: serde_json::Value =
669            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
670        let verification = &json["exports"][0]["verification"];
671        assert_eq!(verification["manifest_found"], false);
672        assert_eq!(verification["legacy_run"], false);
673        assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
674    }
675
676    #[test]
677    fn untracked_surplus_alone_keeps_exit_zero() {
678        // The advisory neighbor of the read-error fix: gating on
679        // `has_failures()` alone would flip this verdict to non-zero, but
680        // surplus cleanup is `--resume`'s job (M9), not validate's.
681        let dir = tempfile::tempdir().unwrap();
682        let prefix = dir.path().join("out");
683        stage_dataset(&prefix, &success_manifest(Vec::new()));
684        std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
685        let cfg = write_cfg(dir.path(), &prefix);
686
687        let report = dir.path().join("report.json");
688        run_validate_command(
689            cfg.to_str().unwrap(),
690            Some("orders"),
691            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
692            ValidateTarget::default(),
693        )
694        .expect("advisory untracked surplus must not flip the exit code");
695
696        let json: serde_json::Value =
697            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
698        let verification = &json["exports"][0]["verification"];
699        assert_eq!(verification["passed"], true);
700        // The stable wire contract is preserved: untracked entries still ride
701        // `verification.failures` (consumers branch on `failures[].kind`).
702        assert_eq!(verification["failures"][0]["kind"], "untracked_object");
703
704        // L14: …and the same advisory entry is also surfaced in the top-level
705        // `warnings` array so "failures means failures" — an exit-0 verdict no
706        // longer hides a surplus object under a "failure" label.
707        let warnings = json["warnings"].as_array().expect("warnings array present");
708        assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
709        assert_eq!(warnings[0]["export_name"], "orders");
710        assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
711        assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
712    }
713
714    #[test]
715    fn json_warnings_array_is_empty_when_no_advisory_failures() {
716        // A clean dataset with no surplus → no warnings.  Guards against the
717        // `warnings` lens accidentally picking up fatal failures.
718        let dir = tempfile::tempdir().unwrap();
719        let prefix = dir.path().join("out");
720        stage_dataset(&prefix, &success_manifest(Vec::new()));
721        let cfg = write_cfg(dir.path(), &prefix);
722
723        let report = dir.path().join("report.json");
724        run_validate_command(
725            cfg.to_str().unwrap(),
726            Some("orders"),
727            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
728            ValidateTarget::default(),
729        )
730        .expect("a clean dataset must pass");
731
732        let json: serde_json::Value =
733            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
734        assert_eq!(
735            json["warnings"]
736                .as_array()
737                .expect("warnings array present")
738                .len(),
739            0,
740            "no surplus → no warnings"
741        );
742    }
743
744    #[test]
745    fn missing_part_fails_the_command() {
746        let dir = tempfile::tempdir().unwrap();
747        let prefix = dir.path().join("out");
748        let m = success_manifest(vec![ManifestPart {
749            part_id: 1,
750            path: "part-000001.parquet".into(),
751            rows: 10,
752            size_bytes: 4,
753            content_fingerprint: "xxh3:1111111111111111".into(),
754            content_md5: String::new(),
755            status: PartStatus::Committed,
756        }]);
757        stage_dataset(&prefix, &m); // the part itself is never written
758        let cfg = write_cfg(dir.path(), &prefix);
759
760        let err = run_validate_command(
761            cfg.to_str().unwrap(),
762            Some("orders"),
763            ValidateOutputFormat::Json(None),
764            ValidateTarget::default(),
765        )
766        .expect_err("a missing committed part must fail verification");
767        assert!(
768            format!("{err:#}").contains("1 export(s) failed verification"),
769            "got: {err:#}"
770        );
771    }
772
773    // ── finding #20: operator-pinned --prefix requires a manifest ────────────
774
775    /// `--prefix` at a real, complete dataset still passes — the normal
776    /// "validate exactly this prefix" case must not regress.
777    #[test]
778    fn prefix_override_with_real_manifest_passes() {
779        let dir = tempfile::tempdir().unwrap();
780        let prefix = dir.path().join("out");
781        stage_dataset(&prefix, &success_manifest(Vec::new()));
782        let cfg = write_cfg(dir.path(), &prefix);
783
784        run_validate_command(
785            cfg.to_str().unwrap(),
786            Some("orders"),
787            ValidateOutputFormat::Json(None),
788            ValidateTarget {
789                prefix_override: Some(prefix.to_string_lossy().into_owned()),
790                ..Default::default()
791            },
792        )
793        .expect("a real dataset under a pinned --prefix must pass");
794    }
795
796    /// `--prefix` at a never-written directory FAILS (exit non-zero): the
797    /// operator asserted a dataset lives here, so an absent manifest is a
798    /// refusal reason, not the benign legacy-run pass. This is the in-process
799    /// twin of the live `audit_validate_absent_prefix_can_fail` roast.
800    #[test]
801    fn prefix_override_at_absent_manifest_fails() {
802        let dir = tempfile::tempdir().unwrap();
803        // The export's config destination is irrelevant — `--prefix` overrides
804        // it. Point the override at a dir that exists but was never written.
805        let cfg_prefix = dir.path().join("cfg_dest");
806        std::fs::create_dir_all(&cfg_prefix).unwrap();
807        let cfg = write_cfg(dir.path(), &cfg_prefix);
808        let empty_prefix = dir.path().join("never_written");
809        std::fs::create_dir_all(&empty_prefix).unwrap();
810
811        let report = dir.path().join("report.json");
812        let err = run_validate_command(
813            cfg.to_str().unwrap(),
814            Some("orders"),
815            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
816            ValidateTarget {
817                prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
818                ..Default::default()
819            },
820        )
821        .expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
822        assert!(
823            format!("{err:#}").contains("1 export(s) failed verification"),
824            "got: {err:#}"
825        );
826
827        // The verdict (written before the bail) carries the explicit reason so
828        // the operator sees why the gate refused, not a bare exit code.
829        let json: serde_json::Value =
830            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
831        let verification = &json["exports"][0]["verification"];
832        assert_eq!(verification["manifest_found"], false);
833        assert_eq!(verification["legacy_run"], false);
834        assert_eq!(
835            verification["failures"][0]["kind"],
836            "manifest_required_but_absent"
837        );
838    }
839
840    /// Without `--prefix`, an absent manifest stays the benign M6 legacy-run
841    /// pass (exit 0) — today's behaviour is preserved for config-resolved
842    /// destinations that may legitimately be pre-0.7.0 prefixes.
843    #[test]
844    fn absent_manifest_without_prefix_override_stays_legacy_pass() {
845        let dir = tempfile::tempdir().unwrap();
846        let prefix = dir.path().join("out");
847        std::fs::create_dir_all(&prefix).unwrap(); // exists, but no manifest
848        let cfg = write_cfg(dir.path(), &prefix);
849
850        run_validate_command(
851            cfg.to_str().unwrap(),
852            Some("orders"),
853            ValidateOutputFormat::Json(None),
854            ValidateTarget::default(), // no --prefix
855        )
856        .expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
857    }
858}