1use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41pub enum ValidateOutputFormat {
43 Pretty,
45 Json(Option<String>),
47}
48
49#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56 pub date: Option<NaiveDate>,
58 pub run_id: Option<String>,
60 pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66 fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67 let mut ctx = match self.date {
68 Some(d) => PlaceholderContext::for_date(d, export_name),
69 None => PlaceholderContext::for_today(export_name),
70 };
71 if let Some(rid) = &self.run_id {
72 ctx = ctx.with_run_id(rid.clone());
73 }
74 ctx
75 }
76}
77
78pub fn run_validate_command(
87 config_path: &str,
88 export_name: Option<&str>,
89 format: ValidateOutputFormat,
90 target: ValidateTarget,
91) -> Result<()> {
92 let config = Config::load_with_params(config_path, None)?;
93
94 let exports: Vec<&crate::config::ExportConfig> = match export_name {
95 Some(name) => match config.exports.iter().find(|e| e.name == name) {
96 Some(e) => vec![e],
97 None => anyhow::bail!("export '{}' not found in config", name),
98 },
99 None => config.exports.iter().collect(),
100 };
101
102 if exports.is_empty() {
103 anyhow::bail!("no exports defined in config — nothing to validate");
104 }
105
106 if target.prefix_override.is_some() && exports.len() > 1 {
111 anyhow::bail!(
112 "--prefix requires --export <name>: cannot apply one override to {} exports",
113 exports.len()
114 );
115 }
116
117 let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118 let mut hard_failures: Vec<String> = Vec::new();
119
120 for export in &exports {
121 let ctx = target.placeholder_context(&export.name);
125 let mut expanded_dest =
126 crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127 if let Some(p) = &target.prefix_override {
128 expanded_dest.path = Some(p.clone());
132 expanded_dest.prefix = Some(p.clone());
133 }
134 let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135 let dest = match crate::destination::create_destination(&expanded_dest) {
136 Ok(d) => d,
137 Err(e) => {
138 let msg = format!(
139 "export '{}' (prefix: {}): could not open destination: {:#}",
140 export.name, resolved_prefix, e
141 );
142 hard_failures.push(msg);
143 continue;
144 }
145 };
146 if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148 {
149 log::info!(
150 "export '{}': streaming destination, skipping (nothing to verify)",
151 export.name
152 );
153 continue;
154 }
155 match verify_at_destination(&*dest, "") {
156 Ok(mut v) => {
157 v.enforce_content_policy(export.verify.requires_content());
160 if target.prefix_override.is_some() {
170 v.require_manifest_present(&resolved_prefix);
171 }
172 all_results.push(ExportVerdict {
173 name: export.name.clone(),
174 resolved_prefix,
175 verification: v,
176 });
177 }
178 Err(e) => {
179 hard_failures.push(format!(
180 "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
181 export.name, resolved_prefix, e
182 ));
183 }
184 }
185 }
186
187 match format {
188 ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
189 ValidateOutputFormat::Json(out_path) => {
190 render_json(&all_results, &hard_failures, out_path)?
191 }
192 }
193
194 let failed_verdicts = all_results
212 .iter()
213 .filter(|r| verdict_fails_exit(&r.verification))
214 .count();
215 if failed_verdicts > 0 {
216 return Err(crate::error::DataIntegrityError::new(format!(
223 "rivet validate: {} export(s) failed verification",
224 hard_failures.len() + failed_verdicts
225 ))
226 .into());
227 }
228 if !hard_failures.is_empty() {
229 anyhow::bail!(
231 "rivet validate: {} export(s) failed verification",
232 hard_failures.len()
233 );
234 }
235 Ok(())
236}
237
238fn verdict_fails_exit(v: &ManifestVerification) -> bool {
245 !v.passed && v.has_failures()
246}
247
248struct ExportVerdict {
252 name: String,
253 resolved_prefix: String,
254 verification: ManifestVerification,
255}
256
257fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
264 dest.prefix
265 .clone()
266 .or_else(|| dest.path.clone())
267 .unwrap_or_else(|| "<unresolved>".into())
268}
269
270fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
271 use std::io::Write;
272 let stdout = std::io::stdout();
273 let mut h = stdout.lock();
274
275 for r in results {
276 let _ = writeln!(h, "── {} ──", r.name);
277 let _ = writeln!(h, " prefix: {}", r.resolved_prefix);
278 let v = &r.verification;
279 if v.legacy_run {
280 let _ = writeln!(
281 h,
282 " status: legacy_run (no manifest at destination — pre-0.7.0 prefix)"
283 );
284 continue;
285 }
286 if !v.manifest_found {
287 let _ = writeln!(h, " status: NO MANIFEST");
288 for failure in &v.failures {
293 let _ = writeln!(h, " failure: {}", failure);
294 }
295 continue;
296 }
297 let _ = writeln!(
298 h,
299 " status: {}",
300 if v.passed { "PASSED" } else { "FAILED" }
301 );
302 let _ = writeln!(
303 h,
304 " parts: {} verified ({} md5, {} size-only), {} failed",
305 v.parts_verified,
306 v.parts_md5_verified,
307 v.parts_verified.saturating_sub(v.parts_md5_verified),
308 v.parts_failed
309 );
310 let _ = writeln!(
311 h,
312 " _SUCCESS: {}",
313 if v.success_marker_consistent {
314 "consistent"
315 } else if v.failures.iter().any(|f| matches!(
316 f,
317 crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
318 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
319 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
320 )) {
321 "INCONSISTENT (see failures)"
322 } else {
323 "absent (no signal)"
324 }
325 );
326 let _ = writeln!(
327 h,
328 " manifest: {}",
329 if v.manifest_self_consistent {
330 "self-consistent"
331 } else {
332 "INCONSISTENT (see failures)"
333 }
334 );
335 for failure in &v.failures {
336 let label = if failure.is_fatal() {
344 "failure:"
345 } else {
346 "warning:"
347 };
348 let _ = writeln!(h, " {} {}", label, failure);
349 }
350 }
351
352 if !hard_failures.is_empty() {
353 let _ = writeln!(h);
354 let _ = writeln!(h, "── errors ──");
355 for e in hard_failures {
356 let _ = writeln!(h, " {}", e);
357 }
358 }
359 let _ = h.flush();
360}
361
362fn render_json(
363 results: &[ExportVerdict],
364 hard_failures: &[String],
365 out_path: Option<String>,
366) -> Result<()> {
367 let warnings: Vec<serde_json::Value> = results
374 .iter()
375 .flat_map(|r| {
376 r.verification
377 .failures
378 .iter()
379 .filter(|f| !f.is_fatal())
380 .map(move |f| {
381 serde_json::json!({
382 "export_name": r.name,
383 "warning": f,
384 })
385 })
386 })
387 .collect();
388
389 let payload = serde_json::json!({
390 "exports": results
391 .iter()
392 .map(|r| {
393 serde_json::json!({
394 "export_name": r.name,
395 "resolved_prefix": r.resolved_prefix,
396 "verification": r.verification,
397 })
398 })
399 .collect::<Vec<_>>(),
400 "warnings": warnings,
401 "errors": hard_failures,
402 });
403 let serialized = serde_json::to_string_pretty(&payload)?;
404 match out_path {
405 Some(p) => {
406 std::fs::write(Path::new(&p), &serialized)?;
407 log::info!("rivet validate: wrote JSON report to {}", p);
408 }
409 None => {
410 println!("{}", serialized);
411 }
412 }
413 Ok(())
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 #[test]
423 fn target_default_uses_today() {
424 let target = ValidateTarget::default();
425 let ctx = target.placeholder_context("orders");
426 assert_eq!(ctx.date, chrono::Utc::now().date_naive());
427 assert_eq!(ctx.export_name, "orders");
428 assert!(ctx.run_id.is_none());
429 }
430
431 #[test]
432 fn target_with_date_overrides_today() {
433 let target = ValidateTarget {
434 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
435 ..Default::default()
436 };
437 let ctx = target.placeholder_context("orders");
438 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
439 assert!(ctx.run_id.is_none());
440 }
441
442 #[test]
443 fn target_composes_date_and_run_id() {
444 let target = ValidateTarget {
448 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
449 run_id: Some("r-abc123".into()),
450 prefix_override: None,
451 };
452 let ctx = target.placeholder_context("orders");
453 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
454 assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
455 }
456
457 #[test]
460 fn resolved_prefix_prefers_cloud_prefix_over_path() {
461 let dest = crate::config::DestinationConfig {
462 destination_type: crate::config::DestinationType::S3,
463 prefix: Some("exports/2026-05-21/orders/".into()),
464 path: Some("/scratch".into()),
465 ..Default::default()
466 };
467 assert_eq!(
468 resolved_prefix_for_display(&dest),
469 "exports/2026-05-21/orders/",
470 );
471 }
472
473 #[test]
474 fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
475 let dest = crate::config::DestinationConfig {
476 destination_type: crate::config::DestinationType::Local,
477 prefix: None,
478 path: Some("/data/out".into()),
479 ..Default::default()
480 };
481 assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
482 }
483
484 use crate::pipeline::ManifestVerificationFailure as VFailure;
487
488 fn read_error_verdict() -> ManifestVerification {
492 ManifestVerification {
493 legacy_run: false,
494 failures: vec![VFailure::ManifestReadError {
495 detail: "permission denied".into(),
496 }],
497 ..ManifestVerification::legacy()
498 }
499 }
500
501 #[test]
502 fn exit_gate_counts_manifest_read_error_as_failure() {
503 assert!(verdict_fails_exit(&read_error_verdict()));
504 }
505
506 #[test]
507 fn exit_gate_keeps_legacy_run_at_zero() {
508 assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
511 }
512
513 #[test]
514 fn exit_gate_keeps_advisory_untracked_at_zero() {
515 let v = ManifestVerification {
516 manifest_found: true,
517 legacy_run: false,
518 passed: true,
519 parts_verified: 1,
520 failures: vec![VFailure::UntrackedObject {
521 key: "stray.parquet".into(),
522 size_bytes: 9,
523 }],
524 ..ManifestVerification::legacy()
525 };
526 assert!(!verdict_fails_exit(&v));
527 }
528
529 #[test]
530 fn exit_gate_counts_fatal_failure_on_found_manifest() {
531 let v = ManifestVerification {
532 manifest_found: true,
533 legacy_run: false,
534 failures: vec![VFailure::PartMissing {
535 part_id: 1,
536 path: "part-000001.parquet".into(),
537 }],
538 ..ManifestVerification::legacy()
539 };
540 assert!(verdict_fails_exit(&v));
541 }
542
543 use crate::manifest::{
547 MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
548 PartStatus, RunManifest,
549 };
550
551 fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
552 let row_count: i64 = parts.iter().map(|p| p.rows).sum();
553 let part_count = parts.len() as u32;
554 RunManifest {
555 manifest_version: MANIFEST_VERSION,
556 run_id: "r-validate-cmd".into(),
557 export_name: "orders".into(),
558 started_at: "2026-06-09T12:00:00Z".into(),
559 finished_at: "2026-06-09T12:01:00Z".into(),
560 status: ManifestStatus::Success,
561 source: ManifestSource {
562 engine: "postgres".into(),
563 schema: Some("public".into()),
564 table: Some("orders".into()),
565 },
566 destination: ManifestDestination {
567 kind: "local".into(),
568 uri: "file:///tmp/out".into(),
569 },
570 format: "parquet".into(),
571 compression: "zstd".into(),
572 schema_fingerprint: "xxh3:0123456789abcdef".into(),
573 row_count,
574 part_count,
575 parts,
576 }
577 }
578
579 fn stage_dataset(prefix: &Path, m: &RunManifest) {
582 std::fs::create_dir_all(prefix).unwrap();
583 let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
584 destination_type: crate::config::DestinationType::Local,
585 path: Some(prefix.to_string_lossy().into_owned()),
586 ..Default::default()
587 })
588 .unwrap();
589 crate::pipeline::write_manifest(&*dest, m).unwrap();
590 }
591
592 fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
595 let cfg = dir.join("rivet.yaml");
596 let yaml = format!(
597 "source:\n type: postgres\n url: postgresql://nobody@localhost/nope\nexports:\n - name: orders\n query: \"SELECT 1\"\n mode: full\n format: parquet\n destination:\n type: local\n path: \"{}\"\n",
598 prefix.to_string_lossy()
599 );
600 std::fs::write(&cfg, yaml).unwrap();
601 cfg
602 }
603
604 #[cfg(unix)]
609 #[test]
610 fn unreadable_manifest_fails_the_command() {
611 use std::os::unix::fs::PermissionsExt;
612
613 let dir = tempfile::tempdir().unwrap();
614 let prefix = dir.path().join("out");
615 stage_dataset(&prefix, &success_manifest(Vec::new()));
616 let cfg = write_cfg(dir.path(), &prefix);
617
618 let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
619 std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
620 if std::fs::read(&manifest_path).is_ok() {
621 eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
623 return;
624 }
625
626 let report = dir.path().join("report.json");
627 let err = run_validate_command(
628 cfg.to_str().unwrap(),
629 Some("orders"),
630 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
631 ValidateTarget::default(),
632 )
633 .expect_err("an unreadable manifest is an explicit failure, not exit 0");
634 assert!(
635 format!("{err:#}").contains("1 export(s) failed verification"),
636 "got: {err:#}"
637 );
638
639 let json: serde_json::Value =
642 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
643 let verification = &json["exports"][0]["verification"];
644 assert_eq!(verification["manifest_found"], false);
645 assert_eq!(verification["legacy_run"], false);
646 assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
647 }
648
649 #[test]
650 fn untracked_surplus_alone_keeps_exit_zero() {
651 let dir = tempfile::tempdir().unwrap();
655 let prefix = dir.path().join("out");
656 stage_dataset(&prefix, &success_manifest(Vec::new()));
657 std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
658 let cfg = write_cfg(dir.path(), &prefix);
659
660 let report = dir.path().join("report.json");
661 run_validate_command(
662 cfg.to_str().unwrap(),
663 Some("orders"),
664 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
665 ValidateTarget::default(),
666 )
667 .expect("advisory untracked surplus must not flip the exit code");
668
669 let json: serde_json::Value =
670 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
671 let verification = &json["exports"][0]["verification"];
672 assert_eq!(verification["passed"], true);
673 assert_eq!(verification["failures"][0]["kind"], "untracked_object");
676
677 let warnings = json["warnings"].as_array().expect("warnings array present");
681 assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
682 assert_eq!(warnings[0]["export_name"], "orders");
683 assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
684 assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
685 }
686
687 #[test]
688 fn json_warnings_array_is_empty_when_no_advisory_failures() {
689 let dir = tempfile::tempdir().unwrap();
692 let prefix = dir.path().join("out");
693 stage_dataset(&prefix, &success_manifest(Vec::new()));
694 let cfg = write_cfg(dir.path(), &prefix);
695
696 let report = dir.path().join("report.json");
697 run_validate_command(
698 cfg.to_str().unwrap(),
699 Some("orders"),
700 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
701 ValidateTarget::default(),
702 )
703 .expect("a clean dataset must pass");
704
705 let json: serde_json::Value =
706 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
707 assert_eq!(
708 json["warnings"]
709 .as_array()
710 .expect("warnings array present")
711 .len(),
712 0,
713 "no surplus → no warnings"
714 );
715 }
716
717 #[test]
718 fn missing_part_fails_the_command() {
719 let dir = tempfile::tempdir().unwrap();
720 let prefix = dir.path().join("out");
721 let m = success_manifest(vec![ManifestPart {
722 part_id: 1,
723 path: "part-000001.parquet".into(),
724 rows: 10,
725 size_bytes: 4,
726 content_fingerprint: "xxh3:1111111111111111".into(),
727 content_md5: String::new(),
728 status: PartStatus::Committed,
729 }]);
730 stage_dataset(&prefix, &m); let cfg = write_cfg(dir.path(), &prefix);
732
733 let err = run_validate_command(
734 cfg.to_str().unwrap(),
735 Some("orders"),
736 ValidateOutputFormat::Json(None),
737 ValidateTarget::default(),
738 )
739 .expect_err("a missing committed part must fail verification");
740 assert!(
741 format!("{err:#}").contains("1 export(s) failed verification"),
742 "got: {err:#}"
743 );
744 }
745
746 #[test]
751 fn prefix_override_with_real_manifest_passes() {
752 let dir = tempfile::tempdir().unwrap();
753 let prefix = dir.path().join("out");
754 stage_dataset(&prefix, &success_manifest(Vec::new()));
755 let cfg = write_cfg(dir.path(), &prefix);
756
757 run_validate_command(
758 cfg.to_str().unwrap(),
759 Some("orders"),
760 ValidateOutputFormat::Json(None),
761 ValidateTarget {
762 prefix_override: Some(prefix.to_string_lossy().into_owned()),
763 ..Default::default()
764 },
765 )
766 .expect("a real dataset under a pinned --prefix must pass");
767 }
768
769 #[test]
774 fn prefix_override_at_absent_manifest_fails() {
775 let dir = tempfile::tempdir().unwrap();
776 let cfg_prefix = dir.path().join("cfg_dest");
779 std::fs::create_dir_all(&cfg_prefix).unwrap();
780 let cfg = write_cfg(dir.path(), &cfg_prefix);
781 let empty_prefix = dir.path().join("never_written");
782 std::fs::create_dir_all(&empty_prefix).unwrap();
783
784 let report = dir.path().join("report.json");
785 let err = run_validate_command(
786 cfg.to_str().unwrap(),
787 Some("orders"),
788 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
789 ValidateTarget {
790 prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
791 ..Default::default()
792 },
793 )
794 .expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
795 assert!(
796 format!("{err:#}").contains("1 export(s) failed verification"),
797 "got: {err:#}"
798 );
799
800 let json: serde_json::Value =
803 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
804 let verification = &json["exports"][0]["verification"];
805 assert_eq!(verification["manifest_found"], false);
806 assert_eq!(verification["legacy_run"], false);
807 assert_eq!(
808 verification["failures"][0]["kind"],
809 "manifest_required_but_absent"
810 );
811 }
812
813 #[test]
817 fn absent_manifest_without_prefix_override_stays_legacy_pass() {
818 let dir = tempfile::tempdir().unwrap();
819 let prefix = dir.path().join("out");
820 std::fs::create_dir_all(&prefix).unwrap(); let cfg = write_cfg(dir.path(), &prefix);
822
823 run_validate_command(
824 cfg.to_str().unwrap(),
825 Some("orders"),
826 ValidateOutputFormat::Json(None),
827 ValidateTarget::default(), )
829 .expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
830 }
831}