Skip to main content

rivet/pipeline/
validate_cmd.rs

1//! **Layer: Coordinator** (config → destination → verification → render)
2//!
3//! `rivet validate` — re-run the manifest-aware `--validate` pass against
4//! an existing destination, without performing an extraction.
5//!
6//! ADR-0013 amendment: this is **not** a new trust noun.  It is a standalone
7//! driver for the same M5/M6 verification surface that `rivet run --validate`
8//! already performs at end-of-run (see [`crate::pipeline::validate_manifest`]).
9//! The verdict shape is identical; the only difference is no source query,
10//! no extraction, no state writes.
11//!
12//! Use cases:
13//! - "Is the output at this prefix still complete?" — Airflow / CI poller
14//!   between runs.
15//! - "Did someone delete a part by mistake?" — operator triage on a
16//!   suspected-broken dataset.
17//! - "Does this legacy prefix have a manifest yet?" — fast check for M6.
18//! - "Was yesterday's run complete?" — `--date YYYY-MM-DD` or `--run-id`
19//!   re-targets a prior day's prefix without re-running the export
20//!   (v0.7.2 historical-validation flags).
21//!
22//! Out of scope:
23//! - Source-side reconciliation (`COUNT(*)`).  That's `--reconcile` /
24//!   `rivet reconcile`, which already exists.
25//! - Per-byte re-fingerprint of every part (`--validate --deep`, future).
26//!
27//! Exit code: `0` if `passed` (or the legacy-run case where the verifier
28//! cannot certify but no failures were seen); non-zero on any explicit
29//! failure (`PartMissing`, `PartSizeMismatch`, `SuccessMarkerStale`, …).
30
31use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41/// Output format mirroring the `rivet reconcile` / `rivet repair` pattern.
42pub enum ValidateOutputFormat {
43    /// Human-readable summary printed to stdout.
44    Pretty,
45    /// JSON to the given path or stdout if `None`.
46    Json(Option<String>),
47}
48
49/// Re-targeting overrides for `rivet validate`.
50///
51/// Default (`ValidateTarget::default()`) reproduces the v0.7.1 behaviour:
52/// resolve `{date}` against today's UTC date, with no `{run_id}`
53/// substitution, and use the config's destination prefix/path unchanged.
54#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56    /// `--date YYYY-MM-DD` — override the date used for `{date}`.
57    pub date: Option<NaiveDate>,
58    /// `--run-id RID` — substitute `{run_id}` in the destination template.
59    pub run_id: Option<String>,
60    /// `--prefix STRING` — bypass placeholder resolution entirely and
61    /// verify exactly this prefix.  Replaces both `prefix` and `path`.
62    pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66    fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67        let mut ctx = match self.date {
68            Some(d) => PlaceholderContext::for_date(d, export_name),
69            None => PlaceholderContext::for_today(export_name),
70        };
71        if let Some(rid) = &self.run_id {
72            ctx = ctx.with_run_id(rid.clone());
73        }
74        ctx
75    }
76}
77
78/// Driver for `rivet validate <export>` (or every export when
79/// `export_name` is `None`).
80///
81/// Returns `Err` on the first explicit verification failure across the
82/// requested exports so an Airflow / CI step can branch on the exit code.
83/// Per-export verdicts are still printed to stdout / written to JSON for
84/// every export, including subsequent ones — the bail at the end is the
85/// last action.
86pub fn run_validate_command(
87    config_path: &str,
88    export_name: Option<&str>,
89    format: ValidateOutputFormat,
90    target: ValidateTarget,
91) -> Result<()> {
92    let config = Config::load_with_params(config_path, None)?;
93
94    let exports: Vec<&crate::config::ExportConfig> = match export_name {
95        Some(name) => match config.exports.iter().find(|e| e.name == name) {
96            Some(e) => vec![e],
97            None => anyhow::bail!("export '{}' not found in config", name),
98        },
99        None => config.exports.iter().collect(),
100    };
101
102    if exports.is_empty() {
103        anyhow::bail!("no exports defined in config — nothing to validate");
104    }
105
106    // `--prefix` only makes sense for a single export; with multiple
107    // exports it would silently re-point all of them at the same physical
108    // bytes.  Catch this at the boundary so we never head-check the wrong
109    // dataset under operator triage pressure.
110    if target.prefix_override.is_some() && exports.len() > 1 {
111        anyhow::bail!(
112            "--prefix requires --export <name>: cannot apply one override to {} exports",
113            exports.len()
114        );
115    }
116
117    let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118    let mut hard_failures: Vec<String> = Vec::new();
119
120    for export in &exports {
121        // Apply the operator-supplied re-targeting if any, else fall back
122        // to today's UTC date (same shape `rivet run` resolves at write
123        // time).
124        let ctx = target.placeholder_context(&export.name);
125        let mut expanded_dest =
126            crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127        if let Some(p) = &target.prefix_override {
128            // Bypass placeholder resolution: trust the operator's literal
129            // prefix.  Replace both `path` (local) and `prefix` (cloud)
130            // so whichever the backend reads picks up the override.
131            expanded_dest.path = Some(p.clone());
132            expanded_dest.prefix = Some(p.clone());
133        }
134        let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135        let dest = match crate::destination::create_destination(&expanded_dest) {
136            Ok(d) => d,
137            Err(e) => {
138                let msg = format!(
139                    "export '{}' (prefix: {}): could not open destination: {:#}",
140                    export.name, resolved_prefix, e
141                );
142                hard_failures.push(msg);
143                continue;
144            }
145        };
146        // Streaming destinations have no prefix to verify — note and skip.
147        if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148        {
149            log::info!(
150                "export '{}': streaming destination, skipping (nothing to verify)",
151                export.name
152            );
153            continue;
154        }
155        match verify_at_destination(&*dest, "") {
156            Ok(mut v) => {
157                // Apply this export's `verify` policy: `content` fails the
158                // verdict when any part is only size-verified (review D).
159                v.enforce_content_policy(export.verify.requires_content());
160                all_results.push(ExportVerdict {
161                    name: export.name.clone(),
162                    resolved_prefix,
163                    verification: v,
164                });
165            }
166            Err(e) => {
167                hard_failures.push(format!(
168                    "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
169                    export.name, resolved_prefix, e
170                ));
171            }
172        }
173    }
174
175    match format {
176        ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
177        ValidateOutputFormat::Json(out_path) => {
178            render_json(&all_results, &hard_failures, out_path)?
179        }
180    }
181
182    // Exit-code policy: the standalone driver fails when an export's
183    // composite verdict is `passed: false` — i.e. M5 saw an explicit
184    // verification failure (missing part, size mismatch, stale _SUCCESS,
185    // self-inconsistent manifest).  Surplus untracked objects (`UntrackedObject`)
186    // are surfaced in `failures` for operator audit but do NOT flip `passed`,
187    // because their cleanup is M9's job (resume), not validate's.  An
188    // operator who wants strict "no surplus allowed" can grep the JSON
189    // report for `kind: untracked_object` themselves; a future
190    // `rivet validate --strict` flag may surface that exit-code mode if
191    // demand appears (out of scope for this PR).
192    //
193    // Legacy runs (M6) keep exit 0: `passed: false` there means "verifier
194    // cannot certify", not "verifier found a problem".
195    let any_failed = all_results
196        .iter()
197        .any(|r| r.verification.manifest_found && !r.verification.passed);
198    if !hard_failures.is_empty() || any_failed {
199        anyhow::bail!(
200            "rivet validate: {} export(s) failed verification",
201            hard_failures.len()
202                + all_results
203                    .iter()
204                    .filter(|r| r.verification.manifest_found && !r.verification.passed)
205                    .count()
206        );
207    }
208    Ok(())
209}
210
211/// Per-export verdict plus the resolved physical prefix the verifier
212/// looked at — surfaced in both pretty and JSON output so an operator can
213/// confirm at a glance which bytes were checked.
214struct ExportVerdict {
215    name: String,
216    resolved_prefix: String,
217    verification: ManifestVerification,
218}
219
220/// Render the destination's resolved prefix for human/JSON output.
221///
222/// Cloud backends carry the data location in `prefix`; the local backend
223/// uses `path`.  Falling back to `<unresolved>` should never fire under
224/// normal config (clap + Config::load enforce one of the two) but keeps
225/// `validate` from panicking if a future config shape lands here.
226fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
227    dest.prefix
228        .clone()
229        .or_else(|| dest.path.clone())
230        .unwrap_or_else(|| "<unresolved>".into())
231}
232
233fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
234    use std::io::Write;
235    let stdout = std::io::stdout();
236    let mut h = stdout.lock();
237
238    for r in results {
239        let _ = writeln!(h, "── {} ──", r.name);
240        let _ = writeln!(h, "  prefix:    {}", r.resolved_prefix);
241        let v = &r.verification;
242        if v.legacy_run {
243            let _ = writeln!(
244                h,
245                "  status:    legacy_run (no manifest at destination — pre-0.7.0 prefix)"
246            );
247            continue;
248        }
249        if !v.manifest_found {
250            let _ = writeln!(h, "  status:    NO MANIFEST");
251            continue;
252        }
253        let _ = writeln!(
254            h,
255            "  status:    {}",
256            if v.passed { "PASSED" } else { "FAILED" }
257        );
258        let _ = writeln!(
259            h,
260            "  parts:     {} verified ({} md5, {} size-only), {} failed",
261            v.parts_verified,
262            v.parts_md5_verified,
263            v.parts_verified.saturating_sub(v.parts_md5_verified),
264            v.parts_failed
265        );
266        let _ = writeln!(
267            h,
268            "  _SUCCESS:  {}",
269            if v.success_marker_consistent {
270                "consistent"
271            } else if v.failures.iter().any(|f| matches!(
272                f,
273                crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
274                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
275                    | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
276            )) {
277                "INCONSISTENT (see failures)"
278            } else {
279                "absent (no signal)"
280            }
281        );
282        let _ = writeln!(
283            h,
284            "  manifest:  {}",
285            if v.manifest_self_consistent {
286                "self-consistent"
287            } else {
288                "INCONSISTENT (see failures)"
289            }
290        );
291        for failure in &v.failures {
292            // `Failure: Display` is the single source of truth for these
293            // lines; same string the run report's "failure:" entry uses.
294            let _ = writeln!(h, "  failure:   {}", failure);
295        }
296    }
297
298    if !hard_failures.is_empty() {
299        let _ = writeln!(h);
300        let _ = writeln!(h, "── errors ──");
301        for e in hard_failures {
302            let _ = writeln!(h, "  {}", e);
303        }
304    }
305    let _ = h.flush();
306}
307
308fn render_json(
309    results: &[ExportVerdict],
310    hard_failures: &[String],
311    out_path: Option<String>,
312) -> Result<()> {
313    let payload = serde_json::json!({
314        "exports": results
315            .iter()
316            .map(|r| {
317                serde_json::json!({
318                    "export_name": r.name,
319                    "resolved_prefix": r.resolved_prefix,
320                    "verification": r.verification,
321                })
322            })
323            .collect::<Vec<_>>(),
324        "errors": hard_failures,
325    });
326    let serialized = serde_json::to_string_pretty(&payload)?;
327    match out_path {
328        Some(p) => {
329            std::fs::write(Path::new(&p), &serialized)?;
330            log::info!("rivet validate: wrote JSON report to {}", p);
331        }
332        None => {
333            println!("{}", serialized);
334        }
335    }
336    Ok(())
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    // ── ValidateTarget::placeholder_context ────────────────────────────────
344
345    #[test]
346    fn target_default_uses_today() {
347        let target = ValidateTarget::default();
348        let ctx = target.placeholder_context("orders");
349        assert_eq!(ctx.date, chrono::Utc::now().date_naive());
350        assert_eq!(ctx.export_name, "orders");
351        assert!(ctx.run_id.is_none());
352    }
353
354    #[test]
355    fn target_with_date_overrides_today() {
356        let target = ValidateTarget {
357            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
358            ..Default::default()
359        };
360        let ctx = target.placeholder_context("orders");
361        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
362        assert!(ctx.run_id.is_none());
363    }
364
365    #[test]
366    fn target_composes_date_and_run_id() {
367        // Regression for the "run yesterday, validate today" scenario:
368        // operator passes both --date and --run-id; the resolver must see
369        // both.
370        let target = ValidateTarget {
371            date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
372            run_id: Some("r-abc123".into()),
373            prefix_override: None,
374        };
375        let ctx = target.placeholder_context("orders");
376        assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
377        assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
378    }
379
380    // ── resolved_prefix_for_display ────────────────────────────────────────
381
382    #[test]
383    fn resolved_prefix_prefers_cloud_prefix_over_path() {
384        let dest = crate::config::DestinationConfig {
385            destination_type: crate::config::DestinationType::S3,
386            prefix: Some("exports/2026-05-21/orders/".into()),
387            path: Some("/scratch".into()),
388            ..Default::default()
389        };
390        assert_eq!(
391            resolved_prefix_for_display(&dest),
392            "exports/2026-05-21/orders/",
393        );
394    }
395
396    #[test]
397    fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
398        let dest = crate::config::DestinationConfig {
399            destination_type: crate::config::DestinationType::Local,
400            prefix: None,
401            path: Some("/data/out".into()),
402            ..Default::default()
403        };
404        assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
405    }
406}