1use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::verify_at_destination;
40
41pub enum ValidateOutputFormat {
43 Pretty,
45 Json(Option<String>),
47}
48
49#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56 pub date: Option<NaiveDate>,
58 pub run_id: Option<String>,
60 pub prefix_override: Option<String>,
63}
64
65impl ValidateTarget {
66 fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
67 let mut ctx = match self.date {
68 Some(d) => PlaceholderContext::for_date(d, export_name),
69 None => PlaceholderContext::for_today(export_name),
70 };
71 if let Some(rid) = &self.run_id {
72 ctx = ctx.with_run_id(rid.clone());
73 }
74 ctx
75 }
76}
77
78pub fn run_validate_command(
87 config_path: &str,
88 export_name: Option<&str>,
89 format: ValidateOutputFormat,
90 target: ValidateTarget,
91) -> Result<()> {
92 let config = Config::load_with_params(config_path, None)?;
93
94 let exports: Vec<&crate::config::ExportConfig> = match export_name {
95 Some(name) => match config.exports.iter().find(|e| e.name == name) {
96 Some(e) => vec![e],
97 None => anyhow::bail!("export '{}' not found in config", name),
98 },
99 None => config.exports.iter().collect(),
100 };
101
102 if exports.is_empty() {
103 anyhow::bail!("no exports defined in config — nothing to validate");
104 }
105
106 if target.prefix_override.is_some() && exports.len() > 1 {
111 anyhow::bail!(
112 "--prefix requires --export <name>: cannot apply one override to {} exports",
113 exports.len()
114 );
115 }
116
117 let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
118 let mut hard_failures: Vec<String> = Vec::new();
119
120 for export in &exports {
121 let ctx = target.placeholder_context(&export.name);
125 let mut expanded_dest =
126 crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
127 if let Some(p) = &target.prefix_override {
128 expanded_dest.path = Some(p.clone());
132 expanded_dest.prefix = Some(p.clone());
133 }
134 let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
135 let dest = match crate::destination::create_destination(&expanded_dest) {
136 Ok(d) => d,
137 Err(e) => {
138 let msg = format!(
139 "export '{}' (prefix: {}): could not open destination: {:#}",
140 export.name, resolved_prefix, e
141 );
142 hard_failures.push(msg);
143 continue;
144 }
145 };
146 if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
148 {
149 log::info!(
150 "export '{}': streaming destination, skipping (nothing to verify)",
151 export.name
152 );
153 continue;
154 }
155 match verify_at_destination(&*dest, "") {
156 Ok(mut v) => {
157 v.enforce_content_policy(export.verify.requires_content());
160 if target.prefix_override.is_some() {
170 v.require_manifest_present(&resolved_prefix);
171 }
172 all_results.push(ExportVerdict {
173 name: export.name.clone(),
174 resolved_prefix,
175 verification: v,
176 });
177 if export.mode == crate::config::ExportMode::Cdc
181 && export.format == crate::config::FormatType::Parquet
182 {
183 match crate::source::cdc::validate::check_positions(&*dest, "") {
184 Ok(pc) if pc.is_ok() => log::info!(
185 "export '{}': cdc __pos continuity OK — {} changes across {} parts, range {:?}..{:?}",
186 export.name,
187 pc.rows,
188 pc.parts,
189 pc.first,
190 pc.last
191 ),
192 Ok(pc) => {
193 for v in &pc.violations {
194 hard_failures
195 .push(format!("export '{}': cdc __pos: {}", export.name, v));
196 }
197 }
198 Err(e) => hard_failures.push(format!(
199 "export '{}': cdc __pos check failed: {:#}",
200 export.name, e
201 )),
202 }
203 }
204 }
205 Err(e) => {
206 hard_failures.push(format!(
207 "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
208 export.name, resolved_prefix, e
209 ));
210 }
211 }
212 }
213
214 match format {
215 ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
216 ValidateOutputFormat::Json(out_path) => {
217 render_json(&all_results, &hard_failures, out_path)?
218 }
219 }
220
221 let failed_verdicts = all_results
239 .iter()
240 .filter(|r| verdict_fails_exit(&r.verification))
241 .count();
242 if failed_verdicts > 0 {
243 return Err(crate::error::DataIntegrityError::new(format!(
250 "rivet validate: {} export(s) failed verification",
251 hard_failures.len() + failed_verdicts
252 ))
253 .into());
254 }
255 if !hard_failures.is_empty() {
256 anyhow::bail!(
258 "rivet validate: {} export(s) failed verification",
259 hard_failures.len()
260 );
261 }
262 Ok(())
263}
264
265fn verdict_fails_exit(v: &ManifestVerification) -> bool {
272 !v.passed && v.has_failures()
273}
274
275struct ExportVerdict {
279 name: String,
280 resolved_prefix: String,
281 verification: ManifestVerification,
282}
283
284fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
291 dest.prefix
292 .clone()
293 .or_else(|| dest.path.clone())
294 .unwrap_or_else(|| "<unresolved>".into())
295}
296
297fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
298 use std::io::Write;
299 let stdout = std::io::stdout();
300 let mut h = stdout.lock();
301
302 for r in results {
303 let _ = writeln!(h, "── {} ──", r.name);
304 let _ = writeln!(h, " prefix: {}", r.resolved_prefix);
305 let v = &r.verification;
306 if v.legacy_run {
307 let _ = writeln!(
308 h,
309 " status: legacy_run (no manifest at destination — pre-0.7.0 prefix)"
310 );
311 continue;
312 }
313 if !v.manifest_found {
314 let _ = writeln!(h, " status: NO MANIFEST");
315 for failure in &v.failures {
320 let _ = writeln!(h, " failure: {}", failure);
321 }
322 continue;
323 }
324 let _ = writeln!(
325 h,
326 " status: {}",
327 if v.passed { "PASSED" } else { "FAILED" }
328 );
329 let _ = writeln!(
330 h,
331 " parts: {} verified ({} md5, {} size-only), {} failed",
332 v.parts_verified,
333 v.parts_md5_verified,
334 v.parts_verified.saturating_sub(v.parts_md5_verified),
335 v.parts_failed
336 );
337 let _ = writeln!(
338 h,
339 " _SUCCESS: {}",
340 if v.success_marker_consistent {
341 "consistent"
342 } else if v.failures.iter().any(|f| matches!(
343 f,
344 crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
345 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
346 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
347 )) {
348 "INCONSISTENT (see failures)"
349 } else {
350 "absent (no signal)"
351 }
352 );
353 let _ = writeln!(
354 h,
355 " manifest: {}",
356 if v.manifest_self_consistent {
357 "self-consistent"
358 } else {
359 "INCONSISTENT (see failures)"
360 }
361 );
362 for failure in &v.failures {
363 let label = if failure.is_fatal() {
371 "failure:"
372 } else {
373 "warning:"
374 };
375 let _ = writeln!(h, " {} {}", label, failure);
376 }
377 }
378
379 if !hard_failures.is_empty() {
380 let _ = writeln!(h);
381 let _ = writeln!(h, "── errors ──");
382 for e in hard_failures {
383 let _ = writeln!(h, " {}", e);
384 }
385 }
386 let _ = h.flush();
387}
388
389fn render_json(
390 results: &[ExportVerdict],
391 hard_failures: &[String],
392 out_path: Option<String>,
393) -> Result<()> {
394 let warnings: Vec<serde_json::Value> = results
401 .iter()
402 .flat_map(|r| {
403 r.verification
404 .failures
405 .iter()
406 .filter(|f| !f.is_fatal())
407 .map(move |f| {
408 serde_json::json!({
409 "export_name": r.name,
410 "warning": f,
411 })
412 })
413 })
414 .collect();
415
416 let payload = serde_json::json!({
417 "exports": results
418 .iter()
419 .map(|r| {
420 serde_json::json!({
421 "export_name": r.name,
422 "resolved_prefix": r.resolved_prefix,
423 "verification": r.verification,
424 })
425 })
426 .collect::<Vec<_>>(),
427 "warnings": warnings,
428 "errors": hard_failures,
429 });
430 let serialized = serde_json::to_string_pretty(&payload)?;
431 match out_path {
432 Some(p) => {
433 std::fs::write(Path::new(&p), &serialized)?;
434 log::info!("rivet validate: wrote JSON report to {}", p);
435 }
436 None => {
437 println!("{}", serialized);
438 }
439 }
440 Ok(())
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446
447 #[test]
450 fn target_default_uses_today() {
451 let target = ValidateTarget::default();
452 let ctx = target.placeholder_context("orders");
453 assert_eq!(ctx.date, chrono::Utc::now().date_naive());
454 assert_eq!(ctx.export_name, "orders");
455 assert!(ctx.run_id.is_none());
456 }
457
458 #[test]
459 fn target_with_date_overrides_today() {
460 let target = ValidateTarget {
461 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
462 ..Default::default()
463 };
464 let ctx = target.placeholder_context("orders");
465 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
466 assert!(ctx.run_id.is_none());
467 }
468
469 #[test]
470 fn target_composes_date_and_run_id() {
471 let target = ValidateTarget {
475 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
476 run_id: Some("r-abc123".into()),
477 prefix_override: None,
478 };
479 let ctx = target.placeholder_context("orders");
480 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
481 assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
482 }
483
484 #[test]
487 fn resolved_prefix_prefers_cloud_prefix_over_path() {
488 let dest = crate::config::DestinationConfig {
489 destination_type: crate::config::DestinationType::S3,
490 prefix: Some("exports/2026-05-21/orders/".into()),
491 path: Some("/scratch".into()),
492 ..Default::default()
493 };
494 assert_eq!(
495 resolved_prefix_for_display(&dest),
496 "exports/2026-05-21/orders/",
497 );
498 }
499
500 #[test]
501 fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
502 let dest = crate::config::DestinationConfig {
503 destination_type: crate::config::DestinationType::Local,
504 prefix: None,
505 path: Some("/data/out".into()),
506 ..Default::default()
507 };
508 assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
509 }
510
511 use crate::pipeline::ManifestVerificationFailure as VFailure;
514
515 fn read_error_verdict() -> ManifestVerification {
519 ManifestVerification {
520 legacy_run: false,
521 failures: vec![VFailure::ManifestReadError {
522 detail: "permission denied".into(),
523 }],
524 ..ManifestVerification::legacy()
525 }
526 }
527
528 #[test]
529 fn exit_gate_counts_manifest_read_error_as_failure() {
530 assert!(verdict_fails_exit(&read_error_verdict()));
531 }
532
533 #[test]
534 fn exit_gate_keeps_legacy_run_at_zero() {
535 assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
538 }
539
540 #[test]
541 fn exit_gate_keeps_advisory_untracked_at_zero() {
542 let v = ManifestVerification {
543 manifest_found: true,
544 legacy_run: false,
545 passed: true,
546 parts_verified: 1,
547 failures: vec![VFailure::UntrackedObject {
548 key: "stray.parquet".into(),
549 size_bytes: 9,
550 }],
551 ..ManifestVerification::legacy()
552 };
553 assert!(!verdict_fails_exit(&v));
554 }
555
556 #[test]
557 fn exit_gate_counts_fatal_failure_on_found_manifest() {
558 let v = ManifestVerification {
559 manifest_found: true,
560 legacy_run: false,
561 failures: vec![VFailure::PartMissing {
562 part_id: 1,
563 path: "part-000001.parquet".into(),
564 }],
565 ..ManifestVerification::legacy()
566 };
567 assert!(verdict_fails_exit(&v));
568 }
569
570 use crate::manifest::{
574 MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
575 PartStatus, RunManifest,
576 };
577
578 fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
579 let row_count: i64 = parts.iter().map(|p| p.rows).sum();
580 let part_count = parts.len() as u32;
581 RunManifest {
582 manifest_version: MANIFEST_VERSION,
583 run_id: "r-validate-cmd".into(),
584 export_name: "orders".into(),
585 started_at: "2026-06-09T12:00:00Z".into(),
586 finished_at: "2026-06-09T12:01:00Z".into(),
587 status: ManifestStatus::Success,
588 source: ManifestSource {
589 engine: "postgres".into(),
590 schema: Some("public".into()),
591 table: Some("orders".into()),
592 },
593 destination: ManifestDestination {
594 kind: "local".into(),
595 uri: "file:///tmp/out".into(),
596 },
597 format: "parquet".into(),
598 compression: "zstd".into(),
599 schema_fingerprint: "xxh3:0123456789abcdef".into(),
600 row_count,
601 part_count,
602 parts,
603 }
604 }
605
606 fn stage_dataset(prefix: &Path, m: &RunManifest) {
609 std::fs::create_dir_all(prefix).unwrap();
610 let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
611 destination_type: crate::config::DestinationType::Local,
612 path: Some(prefix.to_string_lossy().into_owned()),
613 ..Default::default()
614 })
615 .unwrap();
616 crate::pipeline::write_manifest(&*dest, m).unwrap();
617 }
618
619 fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
622 let cfg = dir.join("rivet.yaml");
623 let yaml = format!(
624 "source:\n type: postgres\n url: postgresql://nobody@localhost/nope\nexports:\n - name: orders\n query: \"SELECT 1\"\n mode: full\n format: parquet\n destination:\n type: local\n path: \"{}\"\n",
625 prefix.to_string_lossy()
626 );
627 std::fs::write(&cfg, yaml).unwrap();
628 cfg
629 }
630
631 #[cfg(unix)]
636 #[test]
637 fn unreadable_manifest_fails_the_command() {
638 use std::os::unix::fs::PermissionsExt;
639
640 let dir = tempfile::tempdir().unwrap();
641 let prefix = dir.path().join("out");
642 stage_dataset(&prefix, &success_manifest(Vec::new()));
643 let cfg = write_cfg(dir.path(), &prefix);
644
645 let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
646 std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
647 if std::fs::read(&manifest_path).is_ok() {
648 eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
650 return;
651 }
652
653 let report = dir.path().join("report.json");
654 let err = run_validate_command(
655 cfg.to_str().unwrap(),
656 Some("orders"),
657 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
658 ValidateTarget::default(),
659 )
660 .expect_err("an unreadable manifest is an explicit failure, not exit 0");
661 assert!(
662 format!("{err:#}").contains("1 export(s) failed verification"),
663 "got: {err:#}"
664 );
665
666 let json: serde_json::Value =
669 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
670 let verification = &json["exports"][0]["verification"];
671 assert_eq!(verification["manifest_found"], false);
672 assert_eq!(verification["legacy_run"], false);
673 assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
674 }
675
676 #[test]
677 fn untracked_surplus_alone_keeps_exit_zero() {
678 let dir = tempfile::tempdir().unwrap();
682 let prefix = dir.path().join("out");
683 stage_dataset(&prefix, &success_manifest(Vec::new()));
684 std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
685 let cfg = write_cfg(dir.path(), &prefix);
686
687 let report = dir.path().join("report.json");
688 run_validate_command(
689 cfg.to_str().unwrap(),
690 Some("orders"),
691 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
692 ValidateTarget::default(),
693 )
694 .expect("advisory untracked surplus must not flip the exit code");
695
696 let json: serde_json::Value =
697 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
698 let verification = &json["exports"][0]["verification"];
699 assert_eq!(verification["passed"], true);
700 assert_eq!(verification["failures"][0]["kind"], "untracked_object");
703
704 let warnings = json["warnings"].as_array().expect("warnings array present");
708 assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
709 assert_eq!(warnings[0]["export_name"], "orders");
710 assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
711 assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
712 }
713
714 #[test]
715 fn json_warnings_array_is_empty_when_no_advisory_failures() {
716 let dir = tempfile::tempdir().unwrap();
719 let prefix = dir.path().join("out");
720 stage_dataset(&prefix, &success_manifest(Vec::new()));
721 let cfg = write_cfg(dir.path(), &prefix);
722
723 let report = dir.path().join("report.json");
724 run_validate_command(
725 cfg.to_str().unwrap(),
726 Some("orders"),
727 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
728 ValidateTarget::default(),
729 )
730 .expect("a clean dataset must pass");
731
732 let json: serde_json::Value =
733 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
734 assert_eq!(
735 json["warnings"]
736 .as_array()
737 .expect("warnings array present")
738 .len(),
739 0,
740 "no surplus → no warnings"
741 );
742 }
743
744 #[test]
745 fn missing_part_fails_the_command() {
746 let dir = tempfile::tempdir().unwrap();
747 let prefix = dir.path().join("out");
748 let m = success_manifest(vec![ManifestPart {
749 part_id: 1,
750 path: "part-000001.parquet".into(),
751 rows: 10,
752 size_bytes: 4,
753 content_fingerprint: "xxh3:1111111111111111".into(),
754 content_md5: String::new(),
755 status: PartStatus::Committed,
756 }]);
757 stage_dataset(&prefix, &m); let cfg = write_cfg(dir.path(), &prefix);
759
760 let err = run_validate_command(
761 cfg.to_str().unwrap(),
762 Some("orders"),
763 ValidateOutputFormat::Json(None),
764 ValidateTarget::default(),
765 )
766 .expect_err("a missing committed part must fail verification");
767 assert!(
768 format!("{err:#}").contains("1 export(s) failed verification"),
769 "got: {err:#}"
770 );
771 }
772
773 #[test]
778 fn prefix_override_with_real_manifest_passes() {
779 let dir = tempfile::tempdir().unwrap();
780 let prefix = dir.path().join("out");
781 stage_dataset(&prefix, &success_manifest(Vec::new()));
782 let cfg = write_cfg(dir.path(), &prefix);
783
784 run_validate_command(
785 cfg.to_str().unwrap(),
786 Some("orders"),
787 ValidateOutputFormat::Json(None),
788 ValidateTarget {
789 prefix_override: Some(prefix.to_string_lossy().into_owned()),
790 ..Default::default()
791 },
792 )
793 .expect("a real dataset under a pinned --prefix must pass");
794 }
795
796 #[test]
801 fn prefix_override_at_absent_manifest_fails() {
802 let dir = tempfile::tempdir().unwrap();
803 let cfg_prefix = dir.path().join("cfg_dest");
806 std::fs::create_dir_all(&cfg_prefix).unwrap();
807 let cfg = write_cfg(dir.path(), &cfg_prefix);
808 let empty_prefix = dir.path().join("never_written");
809 std::fs::create_dir_all(&empty_prefix).unwrap();
810
811 let report = dir.path().join("report.json");
812 let err = run_validate_command(
813 cfg.to_str().unwrap(),
814 Some("orders"),
815 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
816 ValidateTarget {
817 prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
818 ..Default::default()
819 },
820 )
821 .expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
822 assert!(
823 format!("{err:#}").contains("1 export(s) failed verification"),
824 "got: {err:#}"
825 );
826
827 let json: serde_json::Value =
830 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
831 let verification = &json["exports"][0]["verification"];
832 assert_eq!(verification["manifest_found"], false);
833 assert_eq!(verification["legacy_run"], false);
834 assert_eq!(
835 verification["failures"][0]["kind"],
836 "manifest_required_but_absent"
837 );
838 }
839
840 #[test]
844 fn absent_manifest_without_prefix_override_stays_legacy_pass() {
845 let dir = tempfile::tempdir().unwrap();
846 let prefix = dir.path().join("out");
847 std::fs::create_dir_all(&prefix).unwrap(); let cfg = write_cfg(dir.path(), &prefix);
849
850 run_validate_command(
851 cfg.to_str().unwrap(),
852 Some("orders"),
853 ValidateOutputFormat::Json(None),
854 ValidateTarget::default(), )
856 .expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
857 }
858}