1use std::path::Path;
32
33use chrono::NaiveDate;
34
35use crate::config::Config;
36use crate::destination::placeholder::PlaceholderContext;
37use crate::error::Result;
38use crate::pipeline::ManifestVerification;
39use crate::pipeline::validate_manifest::{ValidateDepth, verify_at_destination};
40
41pub enum ValidateOutputFormat {
43 Pretty,
45 Json(Option<String>),
47}
48
49#[derive(Debug, Default, Clone)]
55pub struct ValidateTarget {
56 pub date: Option<NaiveDate>,
58 pub run_id: Option<String>,
60 pub prefix_override: Option<String>,
63 pub depth: ValidateDepth,
69}
70
71impl ValidateTarget {
72 fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
73 let mut ctx = match self.date {
74 Some(d) => PlaceholderContext::for_date(d, export_name),
75 None => PlaceholderContext::for_today(export_name),
76 };
77 if let Some(rid) = &self.run_id {
78 ctx = ctx.with_run_id(rid.clone());
79 }
80 ctx
81 }
82}
83
84pub fn run_validate_command(
93 config_path: &str,
94 export_name: Option<&str>,
95 format: ValidateOutputFormat,
96 target: ValidateTarget,
97) -> Result<()> {
98 let config = Config::load_with_params(config_path, None)?;
99
100 let exports: Vec<&crate::config::ExportConfig> = match export_name {
101 Some(name) => match config.exports.iter().find(|e| e.name == name) {
102 Some(e) => vec![e],
103 None => anyhow::bail!("export '{}' not found in config", name),
104 },
105 None => config.exports.iter().collect(),
106 };
107
108 if exports.is_empty() {
109 anyhow::bail!("no exports defined in config — nothing to validate");
110 }
111
112 if target.prefix_override.is_some() && exports.len() > 1 {
117 anyhow::bail!(
118 "--prefix requires --export <name>: cannot apply one override to {} exports",
119 exports.len()
120 );
121 }
122
123 let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
124 let mut hard_failures: Vec<String> = Vec::new();
125
126 for export in &exports {
127 let ctx = target.placeholder_context(&export.name);
131 let mut expanded_dest =
132 crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
133 if let Some(p) = &target.prefix_override {
134 expanded_dest.path = Some(p.clone());
138 expanded_dest.prefix = Some(p.clone());
139 }
140 let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
141 let dest = match crate::destination::create_destination(&expanded_dest) {
142 Ok(d) => d,
143 Err(e) => {
144 let msg = format!(
145 "export '{}' (prefix: {}): could not open destination: {:#}",
146 export.name, resolved_prefix, e
147 );
148 hard_failures.push(msg);
149 continue;
150 }
151 };
152 if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
154 {
155 log::info!(
156 "export '{}': streaming destination, skipping (nothing to verify)",
157 export.name
158 );
159 continue;
160 }
161 match verify_at_destination(&*dest, "", target.depth) {
162 Ok(mut v) => {
163 v.enforce_content_policy(export.verify.requires_content());
166 if target.prefix_override.is_some() {
176 v.require_manifest_present(&resolved_prefix);
177 }
178 let manifest_verified = v.manifest_found && v.passed;
182 all_results.push(ExportVerdict {
183 name: export.name.clone(),
184 resolved_prefix,
185 verification: v,
186 });
187 if target.depth.runs_part_download()
193 && export.mode == crate::config::ExportMode::Cdc
194 && export.format == crate::config::FormatType::Parquet
195 {
196 match crate::source::cdc::validate::check_positions(&*dest, "") {
197 Ok(pc) if pc.is_ok() => log::info!(
198 "export '{}': cdc __pos continuity OK — {} changes across {} parts, range {:?}..{:?}",
199 export.name,
200 pc.rows,
201 pc.parts,
202 pc.first,
203 pc.last
204 ),
205 Ok(pc) => {
206 for v in &pc.violations {
207 hard_failures
208 .push(format!("export '{}': cdc __pos: {}", export.name, v));
209 }
210 }
211 Err(e) => hard_failures.push(format!(
212 "export '{}': cdc __pos check failed: {:#}",
213 export.name, e
214 )),
215 }
216 }
217 if target.depth.runs_part_download()
228 && manifest_verified
229 && export.format == crate::config::FormatType::Parquet
230 && let Err(e) =
231 crate::source::value_checksum::validate_manifest_checksums(&*dest, "")
232 {
233 hard_failures
234 .push(format!("export '{}': value checksum: {:#}", export.name, e));
235 }
236 }
237 Err(e) => {
238 hard_failures.push(format!(
239 "export '{}' (prefix: {}): verify_at_destination failed: {:#}",
240 export.name, resolved_prefix, e
241 ));
242 }
243 }
244 }
245
246 match format {
247 ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
248 ValidateOutputFormat::Json(out_path) => {
249 render_json(&all_results, &hard_failures, out_path)?
250 }
251 }
252
253 let failed_verdicts = all_results
271 .iter()
272 .filter(|r| verdict_fails_exit(&r.verification))
273 .count();
274 if failed_verdicts > 0 {
275 return Err(crate::error::DataIntegrityError::new(format!(
282 "rivet validate: {} export(s) failed verification",
283 hard_failures.len() + failed_verdicts
284 ))
285 .into());
286 }
287 if !hard_failures.is_empty() {
288 anyhow::bail!(
290 "rivet validate: {} export(s) failed verification",
291 hard_failures.len()
292 );
293 }
294 Ok(())
295}
296
297fn verdict_fails_exit(v: &ManifestVerification) -> bool {
304 !v.passed && v.has_failures()
305}
306
307struct ExportVerdict {
311 name: String,
312 resolved_prefix: String,
313 verification: ManifestVerification,
314}
315
316fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
323 dest.prefix
324 .clone()
325 .or_else(|| dest.path.clone())
326 .unwrap_or_else(|| "<unresolved>".into())
327}
328
329fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
330 use std::io::Write;
331 let stdout = std::io::stdout();
332 let mut h = stdout.lock();
333
334 for r in results {
335 let _ = writeln!(h, "── {} ──", r.name);
336 let _ = writeln!(h, " prefix: {}", r.resolved_prefix);
337 let v = &r.verification;
338 let _ = writeln!(h, " depth: {}", v.depth_level);
342 if v.legacy_run {
343 let _ = writeln!(
344 h,
345 " status: legacy_run (no manifest at destination — pre-0.7.0 prefix)"
346 );
347 continue;
348 }
349 if !v.manifest_found {
350 let _ = writeln!(h, " status: NO MANIFEST");
351 for failure in &v.failures {
357 let _ = writeln!(h, " failure: [{}] {}", failure.error_code(), failure);
358 }
359 continue;
360 }
361 let _ = writeln!(
362 h,
363 " status: {}",
364 if v.passed { "PASSED" } else { "FAILED" }
365 );
366 let _ = writeln!(
367 h,
368 " parts: {} verified ({} md5, {} size-only), {} failed",
369 v.parts_verified,
370 v.parts_md5_verified,
371 v.parts_verified.saturating_sub(v.parts_md5_verified),
372 v.parts_failed
373 );
374 let _ = writeln!(
375 h,
376 " _SUCCESS: {}",
377 if v.success_marker_consistent {
378 "consistent"
379 } else if v.failures.iter().any(|f| matches!(
380 f,
381 crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
382 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
383 | crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
384 )) {
385 "INCONSISTENT (see failures)"
386 } else {
387 "absent (no signal)"
388 }
389 );
390 let _ = writeln!(
391 h,
392 " manifest: {}",
393 if v.manifest_self_consistent {
394 "self-consistent"
395 } else {
396 "INCONSISTENT (see failures)"
397 }
398 );
399 for failure in &v.failures {
400 let label = if failure.is_fatal() {
408 "failure:"
409 } else {
410 "warning:"
411 };
412 let _ = writeln!(h, " {} [{}] {}", label, failure.error_code(), failure);
416 }
417 }
418
419 if !hard_failures.is_empty() {
420 let _ = writeln!(h);
421 let _ = writeln!(h, "── errors ──");
422 for e in hard_failures {
423 let _ = writeln!(h, " {}", e);
424 }
425 }
426 let _ = h.flush();
427}
428
429fn failure_json(f: &crate::pipeline::ManifestVerificationFailure) -> serde_json::Value {
437 let mut value = serde_json::json!(f);
438 if let Some(obj) = value.as_object_mut() {
439 obj.insert(
440 "code".to_string(),
441 serde_json::Value::String(f.error_code().to_string()),
442 );
443 }
444 value
445}
446
447fn verification_json(v: &ManifestVerification) -> serde_json::Value {
452 let mut value = serde_json::json!(v);
453 if let Some(obj) = value.as_object_mut() {
454 let failures: Vec<serde_json::Value> = v.failures.iter().map(failure_json).collect();
455 obj.insert("failures".to_string(), serde_json::Value::Array(failures));
456 }
457 value
458}
459
460fn render_json(
461 results: &[ExportVerdict],
462 hard_failures: &[String],
463 out_path: Option<String>,
464) -> Result<()> {
465 let warnings: Vec<serde_json::Value> = results
473 .iter()
474 .flat_map(|r| {
475 r.verification
476 .failures
477 .iter()
478 .filter(|f| !f.is_fatal())
479 .map(move |f| {
480 serde_json::json!({
481 "export_name": r.name,
482 "warning": failure_json(f),
483 })
484 })
485 })
486 .collect();
487
488 let payload = serde_json::json!({
489 "exports": results
490 .iter()
491 .map(|r| {
492 serde_json::json!({
493 "export_name": r.name,
494 "resolved_prefix": r.resolved_prefix,
495 "verification": verification_json(&r.verification),
496 })
497 })
498 .collect::<Vec<_>>(),
499 "warnings": warnings,
500 "errors": hard_failures,
501 });
502 let serialized = serde_json::to_string_pretty(&payload)?;
503 match out_path {
504 Some(p) => {
505 std::fs::write(Path::new(&p), &serialized)?;
506 log::info!("rivet validate: wrote JSON report to {}", p);
507 }
508 None => {
509 println!("{}", serialized);
510 }
511 }
512 Ok(())
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518
519 #[test]
522 fn target_default_uses_today() {
523 let target = ValidateTarget::default();
524 let ctx = target.placeholder_context("orders");
525 assert_eq!(ctx.date, chrono::Utc::now().date_naive());
526 assert_eq!(ctx.export_name, "orders");
527 assert!(ctx.run_id.is_none());
528 }
529
530 #[test]
531 fn target_with_date_overrides_today() {
532 let target = ValidateTarget {
533 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
534 ..Default::default()
535 };
536 let ctx = target.placeholder_context("orders");
537 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
538 assert!(ctx.run_id.is_none());
539 }
540
541 #[test]
542 fn target_composes_date_and_run_id() {
543 let target = ValidateTarget {
547 date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
548 run_id: Some("r-abc123".into()),
549 prefix_override: None,
550 ..Default::default()
551 };
552 let ctx = target.placeholder_context("orders");
553 assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
554 assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
555 }
556
557 #[test]
560 fn resolved_prefix_prefers_cloud_prefix_over_path() {
561 let dest = crate::config::DestinationConfig {
562 destination_type: crate::config::DestinationType::S3,
563 prefix: Some("exports/2026-05-21/orders/".into()),
564 path: Some("/scratch".into()),
565 ..Default::default()
566 };
567 assert_eq!(
568 resolved_prefix_for_display(&dest),
569 "exports/2026-05-21/orders/",
570 );
571 }
572
573 #[test]
574 fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
575 let dest = crate::config::DestinationConfig {
576 destination_type: crate::config::DestinationType::Local,
577 prefix: None,
578 path: Some("/data/out".into()),
579 ..Default::default()
580 };
581 assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
582 }
583
584 use crate::pipeline::ManifestVerificationFailure as VFailure;
587
588 fn read_error_verdict() -> ManifestVerification {
592 ManifestVerification {
593 legacy_run: false,
594 failures: vec![VFailure::ManifestReadError {
595 detail: "permission denied".into(),
596 }],
597 ..ManifestVerification::legacy()
598 }
599 }
600
601 #[test]
602 fn exit_gate_counts_manifest_read_error_as_failure() {
603 assert!(verdict_fails_exit(&read_error_verdict()));
604 }
605
606 #[test]
607 fn exit_gate_keeps_legacy_run_at_zero() {
608 assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
611 }
612
613 #[test]
614 fn exit_gate_keeps_advisory_untracked_at_zero() {
615 let v = ManifestVerification {
616 manifest_found: true,
617 legacy_run: false,
618 passed: true,
619 parts_verified: 1,
620 failures: vec![VFailure::UntrackedObject {
621 key: "stray.parquet".into(),
622 size_bytes: 9,
623 }],
624 ..ManifestVerification::legacy()
625 };
626 assert!(!verdict_fails_exit(&v));
627 }
628
629 #[test]
630 fn exit_gate_counts_fatal_failure_on_found_manifest() {
631 let v = ManifestVerification {
632 manifest_found: true,
633 legacy_run: false,
634 failures: vec![VFailure::PartMissing {
635 part_id: 1,
636 path: "part-000001.parquet".into(),
637 }],
638 ..ManifestVerification::legacy()
639 };
640 assert!(verdict_fails_exit(&v));
641 }
642
643 use crate::manifest::{
647 MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
648 PartStatus, RunManifest,
649 };
650
651 fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
652 let row_count: i64 = parts.iter().map(|p| p.rows).sum();
653 let part_count = parts.len() as u32;
654 RunManifest {
655 manifest_version: MANIFEST_VERSION,
656 run_id: "r-validate-cmd".into(),
657 export_name: "orders".into(),
658 started_at: "2026-06-09T12:00:00Z".into(),
659 finished_at: "2026-06-09T12:01:00Z".into(),
660 status: ManifestStatus::Success,
661 source: ManifestSource {
662 engine: "postgres".into(),
663 schema: Some("public".into()),
664 table: Some("orders".into()),
665 },
666 destination: ManifestDestination {
667 kind: "local".into(),
668 uri: "file:///tmp/out".into(),
669 },
670 format: "parquet".into(),
671 compression: "zstd".into(),
672 schema_fingerprint: "xxh3:0123456789abcdef".into(),
673 row_count,
674 part_count,
675 parts,
676 column_checksums: None,
677 checksum_key_column: None,
678 }
679 }
680
681 fn stage_dataset(prefix: &Path, m: &RunManifest) {
684 std::fs::create_dir_all(prefix).unwrap();
685 let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
686 destination_type: crate::config::DestinationType::Local,
687 path: Some(prefix.to_string_lossy().into_owned()),
688 ..Default::default()
689 })
690 .unwrap();
691 crate::pipeline::write_manifest(&*dest, m).unwrap();
692 }
693
694 fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
697 let cfg = dir.join("rivet.yaml");
698 let yaml = format!(
699 "source:\n type: postgres\n url: postgresql://nobody@localhost/nope\nexports:\n - name: orders\n query: \"SELECT 1\"\n mode: full\n format: parquet\n destination:\n type: local\n path: \"{}\"\n",
700 prefix.to_string_lossy()
701 );
702 std::fs::write(&cfg, yaml).unwrap();
703 cfg
704 }
705
706 #[cfg(unix)]
711 #[test]
712 fn unreadable_manifest_fails_the_command() {
713 use std::os::unix::fs::PermissionsExt;
714
715 let dir = tempfile::tempdir().unwrap();
716 let prefix = dir.path().join("out");
717 stage_dataset(&prefix, &success_manifest(Vec::new()));
718 let cfg = write_cfg(dir.path(), &prefix);
719
720 let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
721 std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
722 if std::fs::read(&manifest_path).is_ok() {
723 eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
725 return;
726 }
727
728 let report = dir.path().join("report.json");
729 let err = run_validate_command(
730 cfg.to_str().unwrap(),
731 Some("orders"),
732 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
733 ValidateTarget::default(),
734 )
735 .expect_err("an unreadable manifest is an explicit failure, not exit 0");
736 assert!(
737 format!("{err:#}").contains("1 export(s) failed verification"),
738 "got: {err:#}"
739 );
740
741 let json: serde_json::Value =
744 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
745 let verification = &json["exports"][0]["verification"];
746 assert_eq!(verification["manifest_found"], false);
747 assert_eq!(verification["legacy_run"], false);
748 assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
749 }
750
751 #[test]
752 fn untracked_surplus_alone_keeps_exit_zero() {
753 let dir = tempfile::tempdir().unwrap();
757 let prefix = dir.path().join("out");
758 stage_dataset(&prefix, &success_manifest(Vec::new()));
759 std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
760 let cfg = write_cfg(dir.path(), &prefix);
761
762 let report = dir.path().join("report.json");
763 run_validate_command(
764 cfg.to_str().unwrap(),
765 Some("orders"),
766 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
767 ValidateTarget::default(),
768 )
769 .expect("advisory untracked surplus must not flip the exit code");
770
771 let json: serde_json::Value =
772 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
773 let verification = &json["exports"][0]["verification"];
774 assert_eq!(verification["passed"], true);
775 assert_eq!(verification["failures"][0]["kind"], "untracked_object");
778
779 let warnings = json["warnings"].as_array().expect("warnings array present");
783 assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
784 assert_eq!(warnings[0]["export_name"], "orders");
785 assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
786 assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
787 }
788
789 #[test]
790 fn json_warnings_array_is_empty_when_no_advisory_failures() {
791 let dir = tempfile::tempdir().unwrap();
794 let prefix = dir.path().join("out");
795 stage_dataset(&prefix, &success_manifest(Vec::new()));
796 let cfg = write_cfg(dir.path(), &prefix);
797
798 let report = dir.path().join("report.json");
799 run_validate_command(
800 cfg.to_str().unwrap(),
801 Some("orders"),
802 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
803 ValidateTarget::default(),
804 )
805 .expect("a clean dataset must pass");
806
807 let json: serde_json::Value =
808 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
809 assert_eq!(
810 json["warnings"]
811 .as_array()
812 .expect("warnings array present")
813 .len(),
814 0,
815 "no surplus → no warnings"
816 );
817 }
818
819 #[test]
820 fn missing_part_fails_the_command() {
821 let dir = tempfile::tempdir().unwrap();
822 let prefix = dir.path().join("out");
823 let m = success_manifest(vec![ManifestPart {
824 part_id: 1,
825 path: "part-000001.parquet".into(),
826 rows: 10,
827 size_bytes: 4,
828 content_fingerprint: "xxh3:1111111111111111".into(),
829 content_md5: String::new(),
830 status: PartStatus::Committed,
831 }]);
832 stage_dataset(&prefix, &m); let cfg = write_cfg(dir.path(), &prefix);
834
835 let err = run_validate_command(
836 cfg.to_str().unwrap(),
837 Some("orders"),
838 ValidateOutputFormat::Json(None),
839 ValidateTarget::default(),
840 )
841 .expect_err("a missing committed part must fail verification");
842 assert!(
843 format!("{err:#}").contains("1 export(s) failed verification"),
844 "got: {err:#}"
845 );
846 }
847
848 #[test]
853 fn prefix_override_with_real_manifest_passes() {
854 let dir = tempfile::tempdir().unwrap();
855 let prefix = dir.path().join("out");
856 stage_dataset(&prefix, &success_manifest(Vec::new()));
857 let cfg = write_cfg(dir.path(), &prefix);
858
859 run_validate_command(
860 cfg.to_str().unwrap(),
861 Some("orders"),
862 ValidateOutputFormat::Json(None),
863 ValidateTarget {
864 prefix_override: Some(prefix.to_string_lossy().into_owned()),
865 ..Default::default()
866 },
867 )
868 .expect("a real dataset under a pinned --prefix must pass");
869 }
870
871 #[test]
876 fn prefix_override_at_absent_manifest_fails() {
877 let dir = tempfile::tempdir().unwrap();
878 let cfg_prefix = dir.path().join("cfg_dest");
881 std::fs::create_dir_all(&cfg_prefix).unwrap();
882 let cfg = write_cfg(dir.path(), &cfg_prefix);
883 let empty_prefix = dir.path().join("never_written");
884 std::fs::create_dir_all(&empty_prefix).unwrap();
885
886 let report = dir.path().join("report.json");
887 let err = run_validate_command(
888 cfg.to_str().unwrap(),
889 Some("orders"),
890 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
891 ValidateTarget {
892 prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
893 ..Default::default()
894 },
895 )
896 .expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
897 assert!(
898 format!("{err:#}").contains("1 export(s) failed verification"),
899 "got: {err:#}"
900 );
901
902 let json: serde_json::Value =
905 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
906 let verification = &json["exports"][0]["verification"];
907 assert_eq!(verification["manifest_found"], false);
908 assert_eq!(verification["legacy_run"], false);
909 assert_eq!(
910 verification["failures"][0]["kind"],
911 "manifest_required_but_absent"
912 );
913 }
914
915 #[test]
919 fn absent_manifest_without_prefix_override_stays_legacy_pass() {
920 let dir = tempfile::tempdir().unwrap();
921 let prefix = dir.path().join("out");
922 std::fs::create_dir_all(&prefix).unwrap(); let cfg = write_cfg(dir.path(), &prefix);
924
925 run_validate_command(
926 cfg.to_str().unwrap(),
927 Some("orders"),
928 ValidateOutputFormat::Json(None),
929 ValidateTarget::default(), )
931 .expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
932 }
933
934 fn stage_dataset_form_b_would_fail(prefix: &Path) {
943 std::fs::create_dir_all(prefix).unwrap();
944 let part_body: &[u8] = b"AAAA";
947 std::fs::write(prefix.join("part-000001.parquet"), part_body).unwrap();
948
949 let mut m = success_manifest(vec![ManifestPart {
950 part_id: 1,
951 path: "part-000001.parquet".into(),
952 rows: 1,
953 size_bytes: part_body.len() as u64,
954 content_fingerprint: "xxh3:1111111111111111".into(),
955 content_md5: String::new(),
956 status: PartStatus::Committed,
957 }]);
958 m.column_checksums = Some(vec![crate::manifest::ColumnChecksum {
961 name: "id".into(),
962 checksum: "0".into(),
963 }]);
964 stage_dataset(prefix, &m);
965 }
966
967 #[test]
968 fn sample_depth_does_not_run_form_b() {
969 let dir = tempfile::tempdir().unwrap();
973 let prefix = dir.path().join("out");
974 stage_dataset_form_b_would_fail(&prefix);
975 let cfg = write_cfg(dir.path(), &prefix);
976
977 let report = dir.path().join("report.json");
978 run_validate_command(
979 cfg.to_str().unwrap(),
980 Some("orders"),
981 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
982 ValidateTarget {
983 depth: ValidateDepth::Sample,
984 ..Default::default()
985 },
986 )
987 .expect("sample depth skips Form B, so a non-Parquet part still passes");
988
989 let json: serde_json::Value =
990 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
991 let verification = &json["exports"][0]["verification"];
992 assert_eq!(verification["passed"], true);
993 assert_eq!(verification["parts_verified"], 1, "sample reconciles parts");
994 assert_eq!(verification["depth_level"], "sample");
995 }
996
997 #[test]
998 fn full_depth_runs_form_b() {
999 let dir = tempfile::tempdir().unwrap();
1004 let prefix = dir.path().join("out");
1005 stage_dataset_form_b_would_fail(&prefix);
1006 let cfg = write_cfg(dir.path(), &prefix);
1007
1008 let err = run_validate_command(
1009 cfg.to_str().unwrap(),
1010 Some("orders"),
1011 ValidateOutputFormat::Json(None),
1012 ValidateTarget {
1013 depth: ValidateDepth::Full,
1014 ..Default::default()
1015 },
1016 )
1017 .expect_err("full depth runs Form B, which fails on a non-Parquet part");
1018 assert!(
1019 format!("{err:#}").contains("1 export(s) failed verification"),
1020 "got: {err:#}"
1021 );
1022 }
1023
1024 #[test]
1025 fn json_report_carries_failure_code_and_depth_level() {
1026 let dir = tempfile::tempdir().unwrap();
1030 let prefix = dir.path().join("out");
1031 let m = success_manifest(vec![ManifestPart {
1032 part_id: 1,
1033 path: "part-000001.parquet".into(),
1034 rows: 10,
1035 size_bytes: 4,
1036 content_fingerprint: "xxh3:1111111111111111".into(),
1037 content_md5: String::new(),
1038 status: PartStatus::Committed,
1039 }]);
1040 stage_dataset(&prefix, &m); let cfg = write_cfg(dir.path(), &prefix);
1042
1043 let report = dir.path().join("report.json");
1044 let _ = run_validate_command(
1045 cfg.to_str().unwrap(),
1046 Some("orders"),
1047 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
1048 ValidateTarget {
1049 depth: ValidateDepth::Sample,
1050 ..Default::default()
1051 },
1052 )
1053 .expect_err("a missing part fails the command");
1054
1055 let json: serde_json::Value =
1056 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
1057 let verification = &json["exports"][0]["verification"];
1058 assert_eq!(verification["depth_level"], "sample");
1060 let failure = &verification["failures"][0];
1062 assert_eq!(failure["kind"], "part_missing");
1063 assert_eq!(failure["code"], "RIVET_VERIFY_PART_MISSING");
1064 assert_eq!(failure["part_id"], 1);
1067 }
1068
1069 #[test]
1070 fn json_warning_entry_also_carries_its_code() {
1071 let dir = tempfile::tempdir().unwrap();
1074 let prefix = dir.path().join("out");
1075 stage_dataset(&prefix, &success_manifest(Vec::new()));
1076 std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
1077 let cfg = write_cfg(dir.path(), &prefix);
1078
1079 let report = dir.path().join("report.json");
1080 run_validate_command(
1081 cfg.to_str().unwrap(),
1082 Some("orders"),
1083 ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
1084 ValidateTarget::default(),
1085 )
1086 .expect("advisory untracked surplus must not flip the exit code");
1087
1088 let json: serde_json::Value =
1089 serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
1090 let warning = &json["warnings"][0]["warning"];
1091 assert_eq!(warning["kind"], "untracked_object");
1092 assert_eq!(warning["code"], "RIVET_VERIFY_UNTRACKED_OBJECT");
1093 assert_eq!(json["exports"][0]["verification"]["depth_level"], "full");
1095 }
1096}