1use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41pub enum ValidateOutputFormat {
43 Pretty,
45 Json(Option<String>),
47}
48
49#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56 pub date: Option<NaiveDate>,
58 pub run_id: Option<String>,
60 pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66 fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67 let mut ctx = match self.date {
68 Some(d) => PlaceholderContext::for_date(d, export_name),
69 None => PlaceholderContext::for_today(export_name),
70 };
71 if let Some(rid) = &self.run_id {
72 ctx = ctx.with_run_id(rid.clone());
73 }
74 ctx
75 }
76}
77
78pub fn run_validate_command(
87 config_path: &str,
88 export_name: Option<&str>,
89 format: ValidateOutputFormat,
90 target: ValidateTarget,
91) -> Result<()> {
92 let config = Config::load_with_params(config_path, None)?;
93
94 let exports: Vec<&crate::config::ExportConfig> = match export_name {
95 Some(name) => match config.exports.iter().find(|e| e.name == name) {
96 Some(e) => vec![e],
97 None => anyhow::bail!("export '{}' not found in config", name),
98 },
99 None => config.exports.iter().collect(),
100 };
101
102 if exports.is_empty() {
103 anyhow::bail!("no exports defined in config — nothing to validate");
104 }
105
106 if target.prefix_override.is_some() && exports.len() > 1 {
111 anyhow::bail!(
112 "--prefix requires --export <name>: cannot apply one override to {} exports",
113 exports.len()
114 );
115 }
116
117 let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118 let mut hard_failures: Vec<String> = Vec::new();
119
120 for export in &exports {
121 let ctx = target.placeholder_context(&export.name);
125 let mut expanded_dest =
126 crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127 if let Some(p) = &target.prefix_override {
128 expanded_dest.path = Some(p.clone());
132 expanded_dest.prefix = Some(p.clone());
133 }
134 let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135 let dest = match crate::destination::create_destination(&expanded_dest) {
136 Ok(d) => d,
137 Err(e) => {
138 let msg = format!(
139 "export '{}' (prefix: {}): could not open destination: {:#}",
140 export.name, resolved_prefix, e
141 );
142 hard_failures.push(msg);
143 continue;
144 }
145 };
146 if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148 {
149 log::info!(
150 "export '{}': streaming destination, skipping (nothing to verify)",
151 export.name
152 );
153 continue;
154 }
155 match verify_at_destination(&*dest, "") {
156 Ok(mut v) => {
157 v.enforce_content_policy(export.verify.requires_content());
160 if target.prefix_override.is_some() {
170 v.require_manifest_present(&resolved_prefix);
171 }
172 let manifest_verified = v.manifest_found && v.passed;
176 all_results.push(ExportVerdict {
177 name: export.name.clone(),
178 resolved_prefix,
179 verification: v,
180 });
181 if export.mode == crate::config::ExportMode::Cdc
185 && export.format == crate::config::FormatType::Parquet
186 {
187 match crate::source::cdc::validate::check_positions(&*dest, "") {
188 Ok(pc) if pc.is_ok() => log::info!(
189 "export '{}': cdc __pos continuity OK — {} changes across {} parts, range {:?}..{:?}",
190 export.name,
191 pc.rows,
192 pc.parts,
193 pc.first,
194 pc.last
195 ),
196 Ok(pc) => {
197 for v in &pc.violations {
198 hard_failures
199 .push(format!("export '{}': cdc __pos: {}", export.name, v));
200 }
201 }
202 Err(e) => hard_failures.push(format!(
203 "export '{}': cdc __pos check failed: {:#}",
204 export.name, e
205 )),
206 }
207 }
208 if manifest_verified
215 && export.format == crate::config::FormatType::Parquet
216 && let Err(e) =
217 crate::source::value_checksum::validate_manifest_checksums(&*dest, "")
218 {
219 hard_failures
220 .push(format!("export '{}': value checksum: {:#}", export.name, e));
221 }
222 }
223 Err(e) => {
224 hard_failures.push(format!(
225 "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
226 export.name, resolved_prefix, e
227 ));
228 }
229 }
230 }
231
232 match format {
233 ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
234 ValidateOutputFormat::Json(out_path) => {
235 render_json(&all_results, &hard_failures, out_path)?
236 }
237 }
238
239 let failed_verdicts = all_results
257 .iter()
258 .filter(|r| verdict_fails_exit(&r.verification))
259 .count();
260 if failed_verdicts > 0 {
261 return Err(crate::error::DataIntegrityError::new(format!(
268 "rivet validate: {} export(s) failed verification",
269 hard_failures.len() + failed_verdicts
270 ))
271 .into());
272 }
273 if !hard_failures.is_empty() {
274 anyhow::bail!(
276 "rivet validate: {} export(s) failed verification",
277 hard_failures.len()
278 );
279 }
280 Ok(())
281}
282
283fn verdict_fails_exit(v: &ManifestVerification) -> bool {
290 !v.passed && v.has_failures()
291}
292
293struct ExportVerdict {
297 name: String,
298 resolved_prefix: String,
299 verification: ManifestVerification,
300}
301
302fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
309 dest.prefix
310 .clone()
311 .or_else(|| dest.path.clone())
312 .unwrap_or_else(|| "<unresolved>".into())
313}
314
315fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
316 use std::io::Write;
317 let stdout = std::io::stdout();
318 let mut h = stdout.lock();
319
320 for r in results {
321 let _ = writeln!(h, "── {} ──", r.name);
322 let _ = writeln!(h, " prefix: {}", r.resolved_prefix);
323 let v = &r.verification;
324 if v.legacy_run {
325 let _ = writeln!(
326 h,
327 " status: legacy_run (no manifest at destination — pre-0.7.0 prefix)"
328 );
329 continue;
330 }
331 if !v.manifest_found {
332 let _ = writeln!(h, " status: NO MANIFEST");
333 for failure in &v.failures {
338 let _ = writeln!(h, " failure: {}", failure);
339 }
340 continue;
341 }
342 let _ = writeln!(
343 h,
344 " status: {}",
345 if v.passed { "PASSED" } else { "FAILED" }
346 );
347 let _ = writeln!(
348 h,
349 " parts: {} verified ({} md5, {} size-only), {} failed",
350 v.parts_verified,
351 v.parts_md5_verified,
352 v.parts_verified.saturating_sub(v.parts_md5_verified),
353 v.parts_failed
354 );
355 let _ = writeln!(
356 h,
357 " _SUCCESS: {}",
358 if v.success_marker_consistent {
359 "consistent"
360 } else if v.failures.iter().any(|f| matches!(
361 f,
362 crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
363 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
364 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
365 )) {
366 "INCONSISTENT (see failures)"
367 } else {
368 "absent (no signal)"
369 }
370 );
371 let _ = writeln!(
372 h,
373 " manifest: {}",
374 if v.manifest_self_consistent {
375 "self-consistent"
376 } else {
377 "INCONSISTENT (see failures)"
378 }
379 );
380 for failure in &v.failures {
381 let label = if failure.is_fatal() {
389 "failure:"
390 } else {
391 "warning:"
392 };
393 let _ = writeln!(h, " {} {}", label, failure);
394 }
395 }
396
397 if !hard_failures.is_empty() {
398 let _ = writeln!(h);
399 let _ = writeln!(h, "── errors ──");
400 for e in hard_failures {
401 let _ = writeln!(h, " {}", e);
402 }
403 }
404 let _ = h.flush();
405}
406
407fn render_json(
408 results: &[ExportVerdict],
409 hard_failures: &[String],
410 out_path: Option<String>,
411) -> Result<()> {
412 let warnings: Vec<serde_json::Value> = results
419 .iter()
420 .flat_map(|r| {
421 r.verification
422 .failures
423 .iter()
424 .filter(|f| !f.is_fatal())
425 .map(move |f| {
426 serde_json::json!({
427 "export_name": r.name,
428 "warning": f,
429 })
430 })
431 })
432 .collect();
433
434 let payload = serde_json::json!({
435 "exports": results
436 .iter()
437 .map(|r| {
438 serde_json::json!({
439 "export_name": r.name,
440 "resolved_prefix": r.resolved_prefix,
441 "verification": r.verification,
442 })
443 })
444 .collect::<Vec<_>>(),
445 "warnings": warnings,
446 "errors": hard_failures,
447 });
448 let serialized = serde_json::to_string_pretty(&payload)?;
449 match out_path {
450 Some(p) => {
451 std::fs::write(Path::new(&p), &serialized)?;
452 log::info!("rivet validate: wrote JSON report to {}", p);
453 }
454 None => {
455 println!("{}", serialized);
456 }
457 }
458 Ok(())
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464
465 #[test]
468 fn target_default_uses_today() {
469 let target = ValidateTarget::default();
470 let ctx = target.placeholder_context("orders");
471 assert_eq!(ctx.date, chrono::Utc::now().date_naive());
472 assert_eq!(ctx.export_name, "orders");
473 assert!(ctx.run_id.is_none());
474 }
475
476 #[test]
477 fn target_with_date_overrides_today() {
478 let target = ValidateTarget {
479 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
480 ..Default::default()
481 };
482 let ctx = target.placeholder_context("orders");
483 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
484 assert!(ctx.run_id.is_none());
485 }
486
487 #[test]
488 fn target_composes_date_and_run_id() {
489 let target = ValidateTarget {
493 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
494 run_id: Some("r-abc123".into()),
495 prefix_override: None,
496 };
497 let ctx = target.placeholder_context("orders");
498 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
499 assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
500 }
501
502 #[test]
505 fn resolved_prefix_prefers_cloud_prefix_over_path() {
506 let dest = crate::config::DestinationConfig {
507 destination_type: crate::config::DestinationType::S3,
508 prefix: Some("exports/2026-05-21/orders/".into()),
509 path: Some("/scratch".into()),
510 ..Default::default()
511 };
512 assert_eq!(
513 resolved_prefix_for_display(&dest),
514 "exports/2026-05-21/orders/",
515 );
516 }
517
518 #[test]
519 fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
520 let dest = crate::config::DestinationConfig {
521 destination_type: crate::config::DestinationType::Local,
522 prefix: None,
523 path: Some("/data/out".into()),
524 ..Default::default()
525 };
526 assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
527 }
528
529 use crate::pipeline::ManifestVerificationFailure as VFailure;
532
533 fn read_error_verdict() -> ManifestVerification {
537 ManifestVerification {
538 legacy_run: false,
539 failures: vec![VFailure::ManifestReadError {
540 detail: "permission denied".into(),
541 }],
542 ..ManifestVerification::legacy()
543 }
544 }
545
546 #[test]
547 fn exit_gate_counts_manifest_read_error_as_failure() {
548 assert!(verdict_fails_exit(&read_error_verdict()));
549 }
550
551 #[test]
552 fn exit_gate_keeps_legacy_run_at_zero() {
553 assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
556 }
557
558 #[test]
559 fn exit_gate_keeps_advisory_untracked_at_zero() {
560 let v = ManifestVerification {
561 manifest_found: true,
562 legacy_run: false,
563 passed: true,
564 parts_verified: 1,
565 failures: vec![VFailure::UntrackedObject {
566 key: "stray.parquet".into(),
567 size_bytes: 9,
568 }],
569 ..ManifestVerification::legacy()
570 };
571 assert!(!verdict_fails_exit(&v));
572 }
573
574 #[test]
575 fn exit_gate_counts_fatal_failure_on_found_manifest() {
576 let v = ManifestVerification {
577 manifest_found: true,
578 legacy_run: false,
579 failures: vec![VFailure::PartMissing {
580 part_id: 1,
581 path: "part-000001.parquet".into(),
582 }],
583 ..ManifestVerification::legacy()
584 };
585 assert!(verdict_fails_exit(&v));
586 }
587
588 use crate::manifest::{
592 MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
593 PartStatus, RunManifest,
594 };
595
596 fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
597 let row_count: i64 = parts.iter().map(|p| p.rows).sum();
598 let part_count = parts.len() as u32;
599 RunManifest {
600 manifest_version: MANIFEST_VERSION,
601 run_id: "r-validate-cmd".into(),
602 export_name: "orders".into(),
603 started_at: "2026-06-09T12:00:00Z".into(),
604 finished_at: "2026-06-09T12:01:00Z".into(),
605 status: ManifestStatus::Success,
606 source: ManifestSource {
607 engine: "postgres".into(),
608 schema: Some("public".into()),
609 table: Some("orders".into()),
610 },
611 destination: ManifestDestination {
612 kind: "local".into(),
613 uri: "file:///tmp/out".into(),
614 },
615 format: "parquet".into(),
616 compression: "zstd".into(),
617 schema_fingerprint: "xxh3:0123456789abcdef".into(),
618 row_count,
619 part_count,
620 parts,
621 column_checksums: None,
622 checksum_key_column: None,
623 }
624 }
625
626 fn stage_dataset(prefix: &Path, m: &RunManifest) {
629 std::fs::create_dir_all(prefix).unwrap();
630 let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
631 destination_type: crate::config::DestinationType::Local,
632 path: Some(prefix.to_string_lossy().into_owned()),
633 ..Default::default()
634 })
635 .unwrap();
636 crate::pipeline::write_manifest(&*dest, m).unwrap();
637 }
638
639 fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
642 let cfg = dir.join("rivet.yaml");
643 let yaml = format!(
644 "source:\n type: postgres\n url: postgresql://nobody@localhost/nope\nexports:\n - name: orders\n query: \"SELECT 1\"\n mode: full\n format: parquet\n destination:\n type: local\n path: \"{}\"\n",
645 prefix.to_string_lossy()
646 );
647 std::fs::write(&cfg, yaml).unwrap();
648 cfg
649 }
650
651 #[cfg(unix)]
656 #[test]
657 fn unreadable_manifest_fails_the_command() {
658 use std::os::unix::fs::PermissionsExt;
659
660 let dir = tempfile::tempdir().unwrap();
661 let prefix = dir.path().join("out");
662 stage_dataset(&prefix, &success_manifest(Vec::new()));
663 let cfg = write_cfg(dir.path(), &prefix);
664
665 let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
666 std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
667 if std::fs::read(&manifest_path).is_ok() {
668 eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
670 return;
671 }
672
673 let report = dir.path().join("report.json");
674 let err = run_validate_command(
675 cfg.to_str().unwrap(),
676 Some("orders"),
677 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
678 ValidateTarget::default(),
679 )
680 .expect_err("an unreadable manifest is an explicit failure, not exit 0");
681 assert!(
682 format!("{err:#}").contains("1 export(s) failed verification"),
683 "got: {err:#}"
684 );
685
686 let json: serde_json::Value =
689 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
690 let verification = &json["exports"][0]["verification"];
691 assert_eq!(verification["manifest_found"], false);
692 assert_eq!(verification["legacy_run"], false);
693 assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
694 }
695
696 #[test]
697 fn untracked_surplus_alone_keeps_exit_zero() {
698 let dir = tempfile::tempdir().unwrap();
702 let prefix = dir.path().join("out");
703 stage_dataset(&prefix, &success_manifest(Vec::new()));
704 std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
705 let cfg = write_cfg(dir.path(), &prefix);
706
707 let report = dir.path().join("report.json");
708 run_validate_command(
709 cfg.to_str().unwrap(),
710 Some("orders"),
711 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
712 ValidateTarget::default(),
713 )
714 .expect("advisory untracked surplus must not flip the exit code");
715
716 let json: serde_json::Value =
717 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
718 let verification = &json["exports"][0]["verification"];
719 assert_eq!(verification["passed"], true);
720 assert_eq!(verification["failures"][0]["kind"], "untracked_object");
723
724 let warnings = json["warnings"].as_array().expect("warnings array present");
728 assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
729 assert_eq!(warnings[0]["export_name"], "orders");
730 assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
731 assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
732 }
733
734 #[test]
735 fn json_warnings_array_is_empty_when_no_advisory_failures() {
736 let dir = tempfile::tempdir().unwrap();
739 let prefix = dir.path().join("out");
740 stage_dataset(&prefix, &success_manifest(Vec::new()));
741 let cfg = write_cfg(dir.path(), &prefix);
742
743 let report = dir.path().join("report.json");
744 run_validate_command(
745 cfg.to_str().unwrap(),
746 Some("orders"),
747 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
748 ValidateTarget::default(),
749 )
750 .expect("a clean dataset must pass");
751
752 let json: serde_json::Value =
753 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
754 assert_eq!(
755 json["warnings"]
756 .as_array()
757 .expect("warnings array present")
758 .len(),
759 0,
760 "no surplus → no warnings"
761 );
762 }
763
764 #[test]
765 fn missing_part_fails_the_command() {
766 let dir = tempfile::tempdir().unwrap();
767 let prefix = dir.path().join("out");
768 let m = success_manifest(vec![ManifestPart {
769 part_id: 1,
770 path: "part-000001.parquet".into(),
771 rows: 10,
772 size_bytes: 4,
773 content_fingerprint: "xxh3:1111111111111111".into(),
774 content_md5: String::new(),
775 status: PartStatus::Committed,
776 }]);
777 stage_dataset(&prefix, &m); let cfg = write_cfg(dir.path(), &prefix);
779
780 let err = run_validate_command(
781 cfg.to_str().unwrap(),
782 Some("orders"),
783 ValidateOutputFormat::Json(None),
784 ValidateTarget::default(),
785 )
786 .expect_err("a missing committed part must fail verification");
787 assert!(
788 format!("{err:#}").contains("1 export(s) failed verification"),
789 "got: {err:#}"
790 );
791 }
792
793 #[test]
798 fn prefix_override_with_real_manifest_passes() {
799 let dir = tempfile::tempdir().unwrap();
800 let prefix = dir.path().join("out");
801 stage_dataset(&prefix, &success_manifest(Vec::new()));
802 let cfg = write_cfg(dir.path(), &prefix);
803
804 run_validate_command(
805 cfg.to_str().unwrap(),
806 Some("orders"),
807 ValidateOutputFormat::Json(None),
808 ValidateTarget {
809 prefix_override: Some(prefix.to_string_lossy().into_owned()),
810 ..Default::default()
811 },
812 )
813 .expect("a real dataset under a pinned --prefix must pass");
814 }
815
816 #[test]
821 fn prefix_override_at_absent_manifest_fails() {
822 let dir = tempfile::tempdir().unwrap();
823 let cfg_prefix = dir.path().join("cfg_dest");
826 std::fs::create_dir_all(&cfg_prefix).unwrap();
827 let cfg = write_cfg(dir.path(), &cfg_prefix);
828 let empty_prefix = dir.path().join("never_written");
829 std::fs::create_dir_all(&empty_prefix).unwrap();
830
831 let report = dir.path().join("report.json");
832 let err = run_validate_command(
833 cfg.to_str().unwrap(),
834 Some("orders"),
835 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
836 ValidateTarget {
837 prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
838 ..Default::default()
839 },
840 )
841 .expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
842 assert!(
843 format!("{err:#}").contains("1 export(s) failed verification"),
844 "got: {err:#}"
845 );
846
847 let json: serde_json::Value =
850 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
851 let verification = &json["exports"][0]["verification"];
852 assert_eq!(verification["manifest_found"], false);
853 assert_eq!(verification["legacy_run"], false);
854 assert_eq!(
855 verification["failures"][0]["kind"],
856 "manifest_required_but_absent"
857 );
858 }
859
860 #[test]
864 fn absent_manifest_without_prefix_override_stays_legacy_pass() {
865 let dir = tempfile::tempdir().unwrap();
866 let prefix = dir.path().join("out");
867 std::fs::create_dir_all(&prefix).unwrap(); let cfg = write_cfg(dir.path(), &prefix);
869
870 run_validate_command(
871 cfg.to_str().unwrap(),
872 Some("orders"),
873 ValidateOutputFormat::Json(None),
874 ValidateTarget::default(), )
876 .expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
877 }
878}