Skip to main content

rivet/pipeline/
validate_cmd.rs

1//! **Layer: Coordinator** (config → destination → verification → render)
2//!
3//! `rivet validate` — re-run the manifest-aware `--validate` pass against
4//! an existing destination, without performing an extraction.
5//!
6//! ADR-0013 amendment: this is **not** a new trust noun.  It is a standalone
7//! driver for the same M5/M6 verification surface that `rivet run --validate`
8//! already performs at end-of-run (see [`crate::pipeline::validate_manifest`]).
9//! The verdict shape is identical; the only difference is no source query,
10//! no extraction, no state writes.
11//!
12//! Use cases:
13//! - "Is the output at this prefix still complete?" — Airflow / CI poller
14//!   between runs.
15//! - "Did someone delete a part by mistake?" — operator triage on a
16//!   suspected-broken dataset.
17//! - "Does this legacy prefix have a manifest yet?" — fast check for M6.
18//! - "Was yesterday's run complete?" — `--date YYYY-MM-DD` or `--run-id`
19//!   re-targets a prior day's prefix without re-running the export
20//!   (v0.7.2 historical-validation flags).
21//!
22//! Out of scope:
23//! - Source-side reconciliation (`COUNT(*)`).  That's `--reconcile` /
24//!   `rivet reconcile`, which already exists.
25//! - Per-byte re-fingerprint of every part (`--validate --deep`, future).
26//!
27//! Exit code: `0` if `passed` (or the legacy-run case where the verifier
28//! cannot certify but no failures were seen); non-zero on any explicit
29//! failure (`PartMissing`, `PartSizeMismatch`, `SuccessMarkerStale`, …).
30
31use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41/// Output format mirroring the `rivet reconcile` / `rivet repair` pattern.
42pub enum ValidateOutputFormat {
43    /// Human-readable summary printed to stdout.
44    Pretty,
45    /// JSON to the given path or stdout if `None`.
46    Json(Option<String>),
47}
48
49/// Re-targeting overrides for `rivet validate`.
50///
51/// Default (`ValidateTarget::default()`) reproduces the v0.7.1 behaviour:
52/// resolve `{date}` against today's UTC date, with no `{run_id}`
53/// substitution, and use the config's destination prefix/path unchanged.
54#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56    /// `--date YYYY-MM-DD` — override the date used for `{date}`.
57    pub date: Option<NaiveDate>,
58    /// `--run-id RID` — substitute `{run_id}` in the destination template.
59    pub run_id: Option<String>,
60    /// `--prefix STRING` — bypass placeholder resolution entirely and
61    /// verify exactly this prefix.  Replaces both `prefix` and `path`.
62    pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66    fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67        let mut ctx = match self.date {
68            Some(d) => PlaceholderContext::for_date(d, export_name),
69            None => PlaceholderContext::for_today(export_name),
70        };
71        if let Some(rid) = &self.run_id {
72            ctx = ctx.with_run_id(rid.clone());
73        }
74        ctx
75    }
76}
77
78/// Driver for `rivet validate <export>` (or every export when
79/// `export_name` is `None`).
80///
81/// Returns `Err` on the first explicit verification failure across the
82/// requested exports so an Airflow / CI step can branch on the exit code.
83/// Per-export verdicts are still printed to stdout / written to JSON for
84/// every export, including subsequent ones — the bail at the end is the
85/// last action.
86pub fn run_validate_command(
87    config_path: &str,
88    export_name: Option<&str>,
89    format: ValidateOutputFormat,
90    target: ValidateTarget,
91) -> Result<()> {
92    let config = Config::load_with_params(config_path, None)?;
93
94    let exports: Vec<&crate::config::ExportConfig> = match export_name {
95        Some(name) => match config.exports.iter().find(|e| e.name == name) {
96            Some(e) => vec![e],
97            None => anyhow::bail!("export '{}' not found in config", name),
98        },
99        None => config.exports.iter().collect(),
100    };
101
102    if exports.is_empty() {
103        anyhow::bail!("no exports defined in config — nothing to validate");
104    }
105
106    // `--prefix` only makes sense for a single export; with multiple
107    // exports it would silently re-point all of them at the same physical
108    // bytes.  Catch this at the boundary so we never head-check the wrong
109    // dataset under operator triage pressure.
110    if target.prefix_override.is_some() && exports.len() > 1 {
111        anyhow::bail!(
112            "--prefix requires --export <name>: cannot apply one override to {} exports",
113            exports.len()
114        );
115    }
116
117    let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118    let mut hard_failures: Vec<String> = Vec::new();
119
120    for export in &exports {
121        // Apply the operator-supplied re-targeting if any, else fall back
122        // to today's UTC date (same shape `rivet run` resolves at write
123        // time).
124        let ctx = target.placeholder_context(&export.name);
125        let mut expanded_dest =
126            crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127        if let Some(p) = &target.prefix_override {
128            // Bypass placeholder resolution: trust the operator's literal
129            // prefix.  Replace both `path` (local) and `prefix` (cloud)
130            // so whichever the backend reads picks up the override.
131            expanded_dest.path = Some(p.clone());
132            expanded_dest.prefix = Some(p.clone());
133        }
134        let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135        let dest = match crate::destination::create_destination(&expanded_dest) {
136            Ok(d) => d,
137            Err(e) => {
138                let msg = format!(
139                    "export '{}' (prefix: {}): could not open destination: {:#}",
140                    export.name, resolved_prefix, e
141                );
142                hard_failures.push(msg);
143                continue;
144            }
145        };
146        // Streaming destinations have no prefix to verify — note and skip.
147        if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148        {
149            log::info!(
150                "export '{}': streaming destination, skipping (nothing to verify)",
151                export.name
152            );
153            continue;
154        }
155        match verify_at_destination(&*dest, "") {
156            Ok(mut v) => {
157                // Apply this export's `verify` policy: `content` fails the
158                // verdict when any part is only size-verified (review D).
159                v.enforce_content_policy(export.verify.requires_content());
160                // Finding #20: when the operator pinned a literal `--prefix`,
161                // they asserted a real dataset lives here. An absent manifest is
162                // then NOT the benign M6 legacy-run case (exit 0) — it almost
163                // always means the prefix was never written (a misconfigured CI
164                // gate `rivet validate && deploy` sailing past nothing). Escalate
165                // that exact shape (no manifest, no other failure) to a fatal
166                // `ManifestRequiredButAbsent` so the exit gate refuses it loudly
167                // instead of silently passing. No-op for every other shape (a
168                // real manifest, or an absent one already carrying a read error).
169                if target.prefix_override.is_some() {
170                    v.require_manifest_present(&resolved_prefix);
171                }
172                // Capture the verdict before `v` is moved into the result: the
173                // deeper Form B checksum re-read below must run *only* on a
174                // manifest that was found and passed the standard checks.
175                let manifest_verified = v.manifest_found && v.passed;
176                all_results.push(ExportVerdict {
177                    name: export.name.clone(),
178                    resolved_prefix,
179                    verification: v,
180                });
181                // CDC-specific: re-read the parts and confirm `__pos` stayed in
182                // source-log order (no reorder / no part-boundary overlap). The
183                // manifest check above already covered per-part MD5 / size / _SUCCESS.
184                if export.mode == crate::config::ExportMode::Cdc
185                    && export.format == crate::config::FormatType::Parquet
186                {
187                    match crate::source::cdc::validate::check_positions(&*dest, "") {
188                        Ok(pc) if pc.is_ok() => log::info!(
189                            "export '{}': cdc __pos continuity OK — {} changes across {} parts, range {:?}..{:?}",
190                            export.name,
191                            pc.rows,
192                            pc.parts,
193                            pc.first,
194                            pc.last
195                        ),
196                        Ok(pc) => {
197                            for v in &pc.violations {
198                                hard_failures
199                                    .push(format!("export '{}': cdc __pos: {}", export.name, v));
200                            }
201                        }
202                        Err(e) => hard_failures.push(format!(
203                            "export '{}': cdc __pos check failed: {:#}",
204                            export.name, e
205                        )),
206                    }
207                }
208                // Form B: re-read the parts and verify the per-column value
209                // checksums recorded in the manifest (catches an Arrow→Parquet
210                // encode / post-write fault the in-process Form A cannot see).
211                // Gated on a found+passed manifest: an absent (legacy pass) or
212                // unreadable manifest is already accounted for above, so re-reading
213                // it here would either break the legacy pass or double-count.
214                if manifest_verified
215                    && export.format == crate::config::FormatType::Parquet
216                    && let Err(e) =
217                        crate::source::value_checksum::validate_manifest_checksums(&*dest, "")
218                {
219                    hard_failures
220                        .push(format!("export '{}': value checksum: {:#}", export.name, e));
221                }
222            }
223            Err(e) => {
224                hard_failures.push(format!(
225                    "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
226                    export.name, resolved_prefix, e
227                ));
228            }
229        }
230    }
231
232    match format {
233        ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
234        ValidateOutputFormat::Json(out_path) => {
235            render_json(&all_results, &hard_failures, out_path)?
236        }
237    }
238
239    // Exit-code policy: the standalone driver fails when an export's
240    // verdict surfaced an explicit failure it could not pass over
241    // (`verdict_fails_exit`) — an M5 verification failure on a found
242    // manifest (missing part, size mismatch, stale _SUCCESS,
243    // self-inconsistent manifest) or a manifest that could not even be
244    // read (`ManifestReadError`: `manifest_found` is false, but the
245    // verifier has a concrete reason to refuse, not a legacy prefix).
246    // Surplus untracked objects (`UntrackedObject`) are surfaced in
247    // `failures` for operator audit but do NOT flip `passed`, because
248    // their cleanup is M9's job (resume), not validate's.  An operator
249    // who wants strict "no surplus allowed" can grep the JSON report for
250    // `kind: untracked_object` themselves; a future
251    // `rivet validate --strict` flag may surface that exit-code mode if
252    // demand appears (out of scope for this PR).
253    //
254    // Legacy runs (M6) keep exit 0: `passed: false` with no failures
255    // means "verifier cannot certify", not "verifier found a problem".
256    let failed_verdicts = all_results
257        .iter()
258        .filter(|r| verdict_fails_exit(&r.verification))
259        .count();
260    if failed_verdicts > 0 {
261        // A verified-and-wrong verdict (missing part, size mismatch, stale
262        // _SUCCESS, self-inconsistent manifest) is the data-integrity class
263        // (exit 3) — typed so a scheduler stops rather than blindly retries.
264        // `hard_failures` (couldn't open / read the destination) are operational
265        // "could not verify", not "verified wrong", so they fold into the count
266        // but the class is driven by the real verdict failure.
267        return Err(crate::error::DataIntegrityError::new(format!(
268            "rivet validate: {} export(s) failed verification",
269            hard_failures.len() + failed_verdicts
270        ))
271        .into());
272    }
273    if !hard_failures.is_empty() {
274        // Could-not-verify only (no verified-wrong verdict): operational, generic.
275        anyhow::bail!(
276            "rivet validate: {} export(s) failed verification",
277            hard_failures.len()
278        );
279    }
280    Ok(())
281}
282
283/// Exit-code predicate for one export's verdict: non-zero iff the verifier
284/// surfaced an explicit failure (`has_failures` — "a reason an orchestrator
285/// should refuse the run") on a verdict that did not pass.  Both documented
286/// exit-0 cases survive: legacy runs (M6 — `passed: false` with no failures
287/// is "cannot certify", not "found a problem") and advisory-only verdicts
288/// (`UntrackedObject` never flips `passed`).
289fn verdict_fails_exit(v: &ManifestVerification) -> bool {
290    !v.passed && v.has_failures()
291}
292
293/// Per-export verdict plus the resolved physical prefix the verifier
294/// looked at — surfaced in both pretty and JSON output so an operator can
295/// confirm at a glance which bytes were checked.
296struct ExportVerdict {
297    name: String,
298    resolved_prefix: String,
299    verification: ManifestVerification,
300}
301
302/// Render the destination's resolved prefix for human/JSON output.
303///
304/// Cloud backends carry the data location in `prefix`; the local backend
305/// uses `path`.  Falling back to `<unresolved>` should never fire under
306/// normal config (clap + Config::load enforce one of the two) but keeps
307/// `validate` from panicking if a future config shape lands here.
308fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
309    dest.prefix
310        .clone()
311        .or_else(|| dest.path.clone())
312        .unwrap_or_else(|| "<unresolved>".into())
313}
314
315fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
316    use std::io::Write;
317    let stdout = std::io::stdout();
318    let mut h = stdout.lock();
319
320    for r in results {
321        let _ = writeln!(h, "── {} ──", r.name);
322        let _ = writeln!(h, "  prefix:    {}", r.resolved_prefix);
323        let v = &r.verification;
324        if v.legacy_run {
325            let _ = writeln!(
326                h,
327                "  status:    legacy_run (no manifest at destination — pre-0.7.0 prefix)"
328            );
329            continue;
330        }
331        if !v.manifest_found {
332            let _ = writeln!(h, "  status:    NO MANIFEST");
333            // A read-error verdict lands here (manifest present but
334            // unreadable, or head failed): its `failures` are the
335            // operator's only signal, so print them before bailing out
336            // of this export's section.
337            for failure in &v.failures {
338                let _ = writeln!(h, "  failure:   {}", failure);
339            }
340            continue;
341        }
342        let _ = writeln!(
343            h,
344            "  status:    {}",
345            if v.passed { "PASSED" } else { "FAILED" }
346        );
347        let _ = writeln!(
348            h,
349            "  parts:     {} verified ({} md5, {} size-only), {} failed",
350            v.parts_verified,
351            v.parts_md5_verified,
352            v.parts_verified.saturating_sub(v.parts_md5_verified),
353            v.parts_failed
354        );
355        let _ = writeln!(
356            h,
357            "  _SUCCESS:  {}",
358            if v.success_marker_consistent {
359                "consistent"
360            } else if v.failures.iter().any(|f| matches!(
361                f,
362                crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
363                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
364                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
365            )) {
366                "INCONSISTENT (see failures)"
367            } else {
368                "absent (no signal)"
369            }
370        );
371        let _ = writeln!(
372            h,
373            "  manifest:  {}",
374            if v.manifest_self_consistent {
375                "self-consistent"
376            } else {
377                "INCONSISTENT (see failures)"
378            }
379        );
380        for failure in &v.failures {
381            // `Failure: Display` is the single source of truth for the message;
382            // same string the run report uses.  L14: advisory (non-fatal)
383            // entries — `UntrackedObject` surplus — are labelled "warning:" not
384            // "failure:".  They never flip `passed` and never change the exit
385            // code (cleanup is `--resume`'s job, M9), so rendering them as
386            // "failure:" beside exit 0 was contradictory.  Fatal failures keep
387            // the "failure:" label.
388            let label = if failure.is_fatal() {
389                "failure:"
390            } else {
391                "warning:"
392            };
393            let _ = writeln!(h, "  {}   {}", label, failure);
394        }
395    }
396
397    if !hard_failures.is_empty() {
398        let _ = writeln!(h);
399        let _ = writeln!(h, "── errors ──");
400        for e in hard_failures {
401            let _ = writeln!(h, "  {}", e);
402        }
403    }
404    let _ = h.flush();
405}
406
407fn render_json(
408    results: &[ExportVerdict],
409    hard_failures: &[String],
410    out_path: Option<String>,
411) -> Result<()> {
412    // L14: surface advisory (non-fatal) entries — `UntrackedObject` surplus —
413    // in a dedicated top-level `warnings` array so a consumer can tell at a
414    // glance that "failures means failures".  The per-export
415    // `verification.failures` array is the stable wire contract (consumers
416    // branch on `failures[].kind`), so advisory entries stay there too — this
417    // is an additive lens over the same data, not a relocation.
418    let warnings: Vec<serde_json::Value> = results
419        .iter()
420        .flat_map(|r| {
421            r.verification
422                .failures
423                .iter()
424                .filter(|f| !f.is_fatal())
425                .map(move |f| {
426                    serde_json::json!({
427                        "export_name": r.name,
428                        "warning": f,
429                    })
430                })
431        })
432        .collect();
433
434    let payload = serde_json::json!({
435        "exports": results
436            .iter()
437            .map(|r| {
438                serde_json::json!({
439                    "export_name": r.name,
440                    "resolved_prefix": r.resolved_prefix,
441                    "verification": r.verification,
442                })
443            })
444            .collect::<Vec<_>>(),
445        "warnings": warnings,
446        "errors": hard_failures,
447    });
448    let serialized = serde_json::to_string_pretty(&payload)?;
449    match out_path {
450        Some(p) => {
451            std::fs::write(Path::new(&p), &serialized)?;
452            log::info!("rivet validate: wrote JSON report to {}", p);
453        }
454        None => {
455            println!("{}", serialized);
456        }
457    }
458    Ok(())
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464
465    // ── ValidateTarget::placeholder_context ────────────────────────────────
466
467    #[test]
468    fn target_default_uses_today() {
469        let target = ValidateTarget::default();
470        let ctx = target.placeholder_context("orders");
471        assert_eq!(ctx.date, chrono::Utc::now().date_naive());
472        assert_eq!(ctx.export_name, "orders");
473        assert!(ctx.run_id.is_none());
474    }
475
476    #[test]
477    fn target_with_date_overrides_today() {
478        let target = ValidateTarget {
479            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
480            ..Default::default()
481        };
482        let ctx = target.placeholder_context("orders");
483        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
484        assert!(ctx.run_id.is_none());
485    }
486
487    #[test]
488    fn target_composes_date_and_run_id() {
489        // Regression for the "run yesterday, validate today" scenario:
490        // operator passes both --date and --run-id; the resolver must see
491        // both.
492        let target = ValidateTarget {
493            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
494            run_id: Some("r-abc123".into()),
495            prefix_override: None,
496        };
497        let ctx = target.placeholder_context("orders");
498        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
499        assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
500    }
501
502    // ── resolved_prefix_for_display ────────────────────────────────────────
503
504    #[test]
505    fn resolved_prefix_prefers_cloud_prefix_over_path() {
506        let dest = crate::config::DestinationConfig {
507            destination_type: crate::config::DestinationType::S3,
508            prefix: Some("exports/2026-05-21/orders/".into()),
509            path: Some("/scratch".into()),
510            ..Default::default()
511        };
512        assert_eq!(
513            resolved_prefix_for_display(&dest),
514            "exports/2026-05-21/orders/",
515        );
516    }
517
518    #[test]
519    fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
520        let dest = crate::config::DestinationConfig {
521            destination_type: crate::config::DestinationType::Local,
522            prefix: None,
523            path: Some("/data/out".into()),
524            ..Default::default()
525        };
526        assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
527    }
528
529    // ── verdict_fails_exit (exit-code policy) ──────────────────────────────
530
531    use crate::pipeline::ManifestVerificationFailure as VFailure;
532
533    /// Verdict shape `verify_at_destination` returns when `manifest.json`
534    /// exists but cannot be read: not legacy, not passed, one explicit
535    /// `ManifestReadError`.
536    fn read_error_verdict() -> ManifestVerification {
537        ManifestVerification {
538            legacy_run: false,
539            failures: vec![VFailure::ManifestReadError {
540                detail: "permission denied".into(),
541            }],
542            ..ManifestVerification::legacy()
543        }
544    }
545
546    #[test]
547    fn exit_gate_counts_manifest_read_error_as_failure() {
548        assert!(verdict_fails_exit(&read_error_verdict()));
549    }
550
551    #[test]
552    fn exit_gate_keeps_legacy_run_at_zero() {
553        // M6: no manifest, no failures — "cannot certify" is not "found a
554        // problem".
555        assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
556    }
557
558    #[test]
559    fn exit_gate_keeps_advisory_untracked_at_zero() {
560        let v = ManifestVerification {
561            manifest_found: true,
562            legacy_run: false,
563            passed: true,
564            parts_verified: 1,
565            failures: vec![VFailure::UntrackedObject {
566                key: "stray.parquet".into(),
567                size_bytes: 9,
568            }],
569            ..ManifestVerification::legacy()
570        };
571        assert!(!verdict_fails_exit(&v));
572    }
573
574    #[test]
575    fn exit_gate_counts_fatal_failure_on_found_manifest() {
576        let v = ManifestVerification {
577            manifest_found: true,
578            legacy_run: false,
579            failures: vec![VFailure::PartMissing {
580                part_id: 1,
581                path: "part-000001.parquet".into(),
582            }],
583            ..ManifestVerification::legacy()
584        };
585        assert!(verdict_fails_exit(&v));
586    }
587
588    // ── run_validate_command end-to-end (local destination; the source URL
589    //     is never dialed — see tests/validate_historical.rs) ──────────────
590
591    use crate::manifest::{
592        MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
593        PartStatus, RunManifest,
594    };
595
596    fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
597        let row_count: i64 = parts.iter().map(|p| p.rows).sum();
598        let part_count = parts.len() as u32;
599        RunManifest {
600            manifest_version: MANIFEST_VERSION,
601            run_id: "r-validate-cmd".into(),
602            export_name: "orders".into(),
603            started_at: "2026-06-09T12:00:00Z".into(),
604            finished_at: "2026-06-09T12:01:00Z".into(),
605            status: ManifestStatus::Success,
606            source: ManifestSource {
607                engine: "postgres".into(),
608                schema: Some("public".into()),
609                table: Some("orders".into()),
610            },
611            destination: ManifestDestination {
612                kind: "local".into(),
613                uri: "file:///tmp/out".into(),
614            },
615            format: "parquet".into(),
616            compression: "zstd".into(),
617            schema_fingerprint: "xxh3:0123456789abcdef".into(),
618            row_count,
619            part_count,
620            parts,
621            column_checksums: None,
622            checksum_key_column: None,
623        }
624    }
625
626    /// Land `manifest.json` + `_SUCCESS` at `prefix` via the public writer
627    /// surface — same path the `rivet run` end-of-run writer takes.
628    fn stage_dataset(prefix: &Path, m: &RunManifest) {
629        std::fs::create_dir_all(prefix).unwrap();
630        let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
631            destination_type: crate::config::DestinationType::Local,
632            path: Some(prefix.to_string_lossy().into_owned()),
633            ..Default::default()
634        })
635        .unwrap();
636        crate::pipeline::write_manifest(&*dest, m).unwrap();
637    }
638
639    /// Config with a single export pointing at `prefix`.  Written next to —
640    /// never inside — the prefix, so it can't surface as untracked surplus.
641    fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
642        let cfg = dir.join("rivet.yaml");
643        let yaml = format!(
644            "source:\n  type: postgres\n  url: postgresql://nobody@localhost/nope\nexports:\n  - name: orders\n    query: \"SELECT 1\"\n    mode: full\n    format: parquet\n    destination:\n      type: local\n      path: \"{}\"\n",
645            prefix.to_string_lossy()
646        );
647        std::fs::write(&cfg, yaml).unwrap();
648        cfg
649    }
650
651    /// In-process twin of the live roast test (tests/roast_validate_exit.rs):
652    /// `manifest.json` present but unreadable must exit non-zero.  head()
653    /// (fs::metadata) succeeds, read() (fs::read) hits EACCES — exactly the
654    /// `ManifestReadError` verdict.
655    #[cfg(unix)]
656    #[test]
657    fn unreadable_manifest_fails_the_command() {
658        use std::os::unix::fs::PermissionsExt;
659
660        let dir = tempfile::tempdir().unwrap();
661        let prefix = dir.path().join("out");
662        stage_dataset(&prefix, &success_manifest(Vec::new()));
663        let cfg = write_cfg(dir.path(), &prefix);
664
665        let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
666        std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
667        if std::fs::read(&manifest_path).is_ok() {
668            // euid 0 ignores file modes — the degraded state can't be staged.
669            eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
670            return;
671        }
672
673        let report = dir.path().join("report.json");
674        let err = run_validate_command(
675            cfg.to_str().unwrap(),
676            Some("orders"),
677            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
678            ValidateTarget::default(),
679        )
680        .expect_err("an unreadable manifest is an explicit failure, not exit 0");
681        assert!(
682            format!("{err:#}").contains("1 export(s) failed verification"),
683            "got: {err:#}"
684        );
685
686        // The JSON report (written before the bail) still carries the
687        // verdict so the operator sees why.
688        let json: serde_json::Value =
689            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
690        let verification = &json["exports"][0]["verification"];
691        assert_eq!(verification["manifest_found"], false);
692        assert_eq!(verification["legacy_run"], false);
693        assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
694    }
695
696    #[test]
697    fn untracked_surplus_alone_keeps_exit_zero() {
698        // The advisory neighbor of the read-error fix: gating on
699        // `has_failures()` alone would flip this verdict to non-zero, but
700        // surplus cleanup is `--resume`'s job (M9), not validate's.
701        let dir = tempfile::tempdir().unwrap();
702        let prefix = dir.path().join("out");
703        stage_dataset(&prefix, &success_manifest(Vec::new()));
704        std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
705        let cfg = write_cfg(dir.path(), &prefix);
706
707        let report = dir.path().join("report.json");
708        run_validate_command(
709            cfg.to_str().unwrap(),
710            Some("orders"),
711            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
712            ValidateTarget::default(),
713        )
714        .expect("advisory untracked surplus must not flip the exit code");
715
716        let json: serde_json::Value =
717            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
718        let verification = &json["exports"][0]["verification"];
719        assert_eq!(verification["passed"], true);
720        // The stable wire contract is preserved: untracked entries still ride
721        // `verification.failures` (consumers branch on `failures[].kind`).
722        assert_eq!(verification["failures"][0]["kind"], "untracked_object");
723
724        // L14: …and the same advisory entry is also surfaced in the top-level
725        // `warnings` array so "failures means failures" — an exit-0 verdict no
726        // longer hides a surplus object under a "failure" label.
727        let warnings = json["warnings"].as_array().expect("warnings array present");
728        assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
729        assert_eq!(warnings[0]["export_name"], "orders");
730        assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
731        assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
732    }
733
734    #[test]
735    fn json_warnings_array_is_empty_when_no_advisory_failures() {
736        // A clean dataset with no surplus → no warnings.  Guards against the
737        // `warnings` lens accidentally picking up fatal failures.
738        let dir = tempfile::tempdir().unwrap();
739        let prefix = dir.path().join("out");
740        stage_dataset(&prefix, &success_manifest(Vec::new()));
741        let cfg = write_cfg(dir.path(), &prefix);
742
743        let report = dir.path().join("report.json");
744        run_validate_command(
745            cfg.to_str().unwrap(),
746            Some("orders"),
747            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
748            ValidateTarget::default(),
749        )
750        .expect("a clean dataset must pass");
751
752        let json: serde_json::Value =
753            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
754        assert_eq!(
755            json["warnings"]
756                .as_array()
757                .expect("warnings array present")
758                .len(),
759            0,
760            "no surplus → no warnings"
761        );
762    }
763
764    #[test]
765    fn missing_part_fails_the_command() {
766        let dir = tempfile::tempdir().unwrap();
767        let prefix = dir.path().join("out");
768        let m = success_manifest(vec![ManifestPart {
769            part_id: 1,
770            path: "part-000001.parquet".into(),
771            rows: 10,
772            size_bytes: 4,
773            content_fingerprint: "xxh3:1111111111111111".into(),
774            content_md5: String::new(),
775            status: PartStatus::Committed,
776        }]);
777        stage_dataset(&prefix, &m); // the part itself is never written
778        let cfg = write_cfg(dir.path(), &prefix);
779
780        let err = run_validate_command(
781            cfg.to_str().unwrap(),
782            Some("orders"),
783            ValidateOutputFormat::Json(None),
784            ValidateTarget::default(),
785        )
786        .expect_err("a missing committed part must fail verification");
787        assert!(
788            format!("{err:#}").contains("1 export(s) failed verification"),
789            "got: {err:#}"
790        );
791    }
792
793    // ── finding #20: operator-pinned --prefix requires a manifest ────────────
794
795    /// `--prefix` at a real, complete dataset still passes — the normal
796    /// "validate exactly this prefix" case must not regress.
797    #[test]
798    fn prefix_override_with_real_manifest_passes() {
799        let dir = tempfile::tempdir().unwrap();
800        let prefix = dir.path().join("out");
801        stage_dataset(&prefix, &success_manifest(Vec::new()));
802        let cfg = write_cfg(dir.path(), &prefix);
803
804        run_validate_command(
805            cfg.to_str().unwrap(),
806            Some("orders"),
807            ValidateOutputFormat::Json(None),
808            ValidateTarget {
809                prefix_override: Some(prefix.to_string_lossy().into_owned()),
810                ..Default::default()
811            },
812        )
813        .expect("a real dataset under a pinned --prefix must pass");
814    }
815
816    /// `--prefix` at a never-written directory FAILS (exit non-zero): the
817    /// operator asserted a dataset lives here, so an absent manifest is a
818    /// refusal reason, not the benign legacy-run pass. This is the in-process
819    /// twin of the live `audit_validate_absent_prefix_can_fail` roast.
820    #[test]
821    fn prefix_override_at_absent_manifest_fails() {
822        let dir = tempfile::tempdir().unwrap();
823        // The export's config destination is irrelevant — `--prefix` overrides
824        // it. Point the override at a dir that exists but was never written.
825        let cfg_prefix = dir.path().join("cfg_dest");
826        std::fs::create_dir_all(&cfg_prefix).unwrap();
827        let cfg = write_cfg(dir.path(), &cfg_prefix);
828        let empty_prefix = dir.path().join("never_written");
829        std::fs::create_dir_all(&empty_prefix).unwrap();
830
831        let report = dir.path().join("report.json");
832        let err = run_validate_command(
833            cfg.to_str().unwrap(),
834            Some("orders"),
835            ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
836            ValidateTarget {
837                prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
838                ..Default::default()
839            },
840        )
841        .expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
842        assert!(
843            format!("{err:#}").contains("1 export(s) failed verification"),
844            "got: {err:#}"
845        );
846
847        // The verdict (written before the bail) carries the explicit reason so
848        // the operator sees why the gate refused, not a bare exit code.
849        let json: serde_json::Value =
850            serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
851        let verification = &json["exports"][0]["verification"];
852        assert_eq!(verification["manifest_found"], false);
853        assert_eq!(verification["legacy_run"], false);
854        assert_eq!(
855            verification["failures"][0]["kind"],
856            "manifest_required_but_absent"
857        );
858    }
859
860    /// Without `--prefix`, an absent manifest stays the benign M6 legacy-run
861    /// pass (exit 0) — today's behaviour is preserved for config-resolved
862    /// destinations that may legitimately be pre-0.7.0 prefixes.
863    #[test]
864    fn absent_manifest_without_prefix_override_stays_legacy_pass() {
865        let dir = tempfile::tempdir().unwrap();
866        let prefix = dir.path().join("out");
867        std::fs::create_dir_all(&prefix).unwrap(); // exists, but no manifest
868        let cfg = write_cfg(dir.path(), &prefix);
869
870        run_validate_command(
871            cfg.to_str().unwrap(),
872            Some("orders"),
873            ValidateOutputFormat::Json(None),
874            ValidateTarget::default(), // no --prefix
875        )
876        .expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
877    }
878}