use std::path::Path;
use chrono::NaiveDate;
use crate::config::Config;
use crate::destination::placeholder::PlaceholderContext;
use crate::error::Result;
use crate::pipeline::ManifestVerification;
use crate::pipeline::validate_manifest::verify_at_destination;
pub enum ValidateOutputFormat {
Pretty,
Json(Option<String>),
}
#[derive(Debug, Default, Clone)]
pub struct ValidateTarget {
pub date: Option<NaiveDate>,
pub run_id: Option<String>,
pub prefix_override: Option<String>,
}
impl ValidateTarget {
fn placeholder_context(&self, export_name: &str) -> PlaceholderContext {
let mut ctx = match self.date {
Some(d) => PlaceholderContext::for_date(d, export_name),
None => PlaceholderContext::for_today(export_name),
};
if let Some(rid) = &self.run_id {
ctx = ctx.with_run_id(rid.clone());
}
ctx
}
}
pub fn run_validate_command(
config_path: &str,
export_name: Option<&str>,
format: ValidateOutputFormat,
target: ValidateTarget,
) -> Result<()> {
let config = Config::load_with_params(config_path, None)?;
let exports: Vec<&crate::config::ExportConfig> = match export_name {
Some(name) => match config.exports.iter().find(|e| e.name == name) {
Some(e) => vec![e],
None => anyhow::bail!("export '{}' not found in config", name),
},
None => config.exports.iter().collect(),
};
if exports.is_empty() {
anyhow::bail!("no exports defined in config — nothing to validate");
}
if target.prefix_override.is_some() && exports.len() > 1 {
anyhow::bail!(
"--prefix requires --export <name>: cannot apply one override to {} exports",
exports.len()
);
}
let mut all_results: Vec<ExportVerdict> = Vec::with_capacity(exports.len());
let mut hard_failures: Vec<String> = Vec::new();
for export in &exports {
let ctx = target.placeholder_context(&export.name);
let mut expanded_dest =
crate::destination::placeholder::expand_destination(export.destination.clone(), &ctx);
if let Some(p) = &target.prefix_override {
expanded_dest.path = Some(p.clone());
expanded_dest.prefix = Some(p.clone());
}
let resolved_prefix = resolved_prefix_for_display(&expanded_dest);
let dest = match crate::destination::create_destination(&expanded_dest) {
Ok(d) => d,
Err(e) => {
let msg = format!(
"export '{}' (prefix: {}): could not open destination: {:#}",
export.name, resolved_prefix, e
);
hard_failures.push(msg);
continue;
}
};
if dest.capabilities().commit_protocol == crate::destination::WriteCommitProtocol::Streaming
{
log::info!(
"export '{}': streaming destination, skipping (nothing to verify)",
export.name
);
continue;
}
match verify_at_destination(&*dest, "") {
Ok(mut v) => {
v.enforce_content_policy(export.verify.requires_content());
if target.prefix_override.is_some() {
v.require_manifest_present(&resolved_prefix);
}
all_results.push(ExportVerdict {
name: export.name.clone(),
resolved_prefix,
verification: v,
});
if export.mode == crate::config::ExportMode::Cdc
&& export.format == crate::config::FormatType::Parquet
{
match crate::source::cdc::validate::check_positions(&*dest, "") {
Ok(pc) if pc.is_ok() => log::info!(
"export '{}': cdc __pos continuity OK — {} changes across {} parts, range {:?}..{:?}",
export.name,
pc.rows,
pc.parts,
pc.first,
pc.last
),
Ok(pc) => {
for v in &pc.violations {
hard_failures
.push(format!("export '{}': cdc __pos: {}", export.name, v));
}
}
Err(e) => hard_failures.push(format!(
"export '{}': cdc __pos check failed: {:#}",
export.name, e
)),
}
}
}
Err(e) => {
hard_failures.push(format!(
"export '{}' (prefix: {}): verify_at_destination failed: {:#}",
export.name, resolved_prefix, e
));
}
}
}
match format {
ValidateOutputFormat::Pretty => render_pretty(&all_results, &hard_failures),
ValidateOutputFormat::Json(out_path) => {
render_json(&all_results, &hard_failures, out_path)?
}
}
let failed_verdicts = all_results
.iter()
.filter(|r| verdict_fails_exit(&r.verification))
.count();
if failed_verdicts > 0 {
return Err(crate::error::DataIntegrityError::new(format!(
"rivet validate: {} export(s) failed verification",
hard_failures.len() + failed_verdicts
))
.into());
}
if !hard_failures.is_empty() {
anyhow::bail!(
"rivet validate: {} export(s) failed verification",
hard_failures.len()
);
}
Ok(())
}
fn verdict_fails_exit(v: &ManifestVerification) -> bool {
!v.passed && v.has_failures()
}
struct ExportVerdict {
name: String,
resolved_prefix: String,
verification: ManifestVerification,
}
fn resolved_prefix_for_display(dest: &crate::config::DestinationConfig) -> String {
dest.prefix
.clone()
.or_else(|| dest.path.clone())
.unwrap_or_else(|| "<unresolved>".into())
}
fn render_pretty(results: &[ExportVerdict], hard_failures: &[String]) {
use std::io::Write;
let stdout = std::io::stdout();
let mut h = stdout.lock();
for r in results {
let _ = writeln!(h, "── {} ──", r.name);
let _ = writeln!(h, " prefix: {}", r.resolved_prefix);
let v = &r.verification;
if v.legacy_run {
let _ = writeln!(
h,
" status: legacy_run (no manifest at destination — pre-0.7.0 prefix)"
);
continue;
}
if !v.manifest_found {
let _ = writeln!(h, " status: NO MANIFEST");
for failure in &v.failures {
let _ = writeln!(h, " failure: {}", failure);
}
continue;
}
let _ = writeln!(
h,
" status: {}",
if v.passed { "PASSED" } else { "FAILED" }
);
let _ = writeln!(
h,
" parts: {} verified ({} md5, {} size-only), {} failed",
v.parts_verified,
v.parts_md5_verified,
v.parts_verified.saturating_sub(v.parts_md5_verified),
v.parts_failed
);
let _ = writeln!(
h,
" _SUCCESS: {}",
if v.success_marker_consistent {
"consistent"
} else if v.failures.iter().any(|f| matches!(
f,
crate::pipeline::ManifestVerificationFailure::SuccessMarkerStale { .. }
| crate::pipeline::ManifestVerificationFailure::SuccessMarkerMalformed { .. }
| crate::pipeline::ManifestVerificationFailure::SuccessMarkerReadError { .. }
)) {
"INCONSISTENT (see failures)"
} else {
"absent (no signal)"
}
);
let _ = writeln!(
h,
" manifest: {}",
if v.manifest_self_consistent {
"self-consistent"
} else {
"INCONSISTENT (see failures)"
}
);
for failure in &v.failures {
let label = if failure.is_fatal() {
"failure:"
} else {
"warning:"
};
let _ = writeln!(h, " {} {}", label, failure);
}
}
if !hard_failures.is_empty() {
let _ = writeln!(h);
let _ = writeln!(h, "── errors ──");
for e in hard_failures {
let _ = writeln!(h, " {}", e);
}
}
let _ = h.flush();
}
fn render_json(
results: &[ExportVerdict],
hard_failures: &[String],
out_path: Option<String>,
) -> Result<()> {
let warnings: Vec<serde_json::Value> = results
.iter()
.flat_map(|r| {
r.verification
.failures
.iter()
.filter(|f| !f.is_fatal())
.map(move |f| {
serde_json::json!({
"export_name": r.name,
"warning": f,
})
})
})
.collect();
let payload = serde_json::json!({
"exports": results
.iter()
.map(|r| {
serde_json::json!({
"export_name": r.name,
"resolved_prefix": r.resolved_prefix,
"verification": r.verification,
})
})
.collect::<Vec<_>>(),
"warnings": warnings,
"errors": hard_failures,
});
let serialized = serde_json::to_string_pretty(&payload)?;
match out_path {
Some(p) => {
std::fs::write(Path::new(&p), &serialized)?;
log::info!("rivet validate: wrote JSON report to {}", p);
}
None => {
println!("{}", serialized);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn target_default_uses_today() {
let target = ValidateTarget::default();
let ctx = target.placeholder_context("orders");
assert_eq!(ctx.date, chrono::Utc::now().date_naive());
assert_eq!(ctx.export_name, "orders");
assert!(ctx.run_id.is_none());
}
#[test]
fn target_with_date_overrides_today() {
let target = ValidateTarget {
date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
..Default::default()
};
let ctx = target.placeholder_context("orders");
assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
assert!(ctx.run_id.is_none());
}
#[test]
fn target_composes_date_and_run_id() {
let target = ValidateTarget {
date: Some(NaiveDate::from_ymd_opt(2026, 5, 21).unwrap()),
run_id: Some("r-abc123".into()),
prefix_override: None,
};
let ctx = target.placeholder_context("orders");
assert_eq!(ctx.date, NaiveDate::from_ymd_opt(2026, 5, 21).unwrap());
assert_eq!(ctx.run_id.as_deref(), Some("r-abc123"));
}
#[test]
fn resolved_prefix_prefers_cloud_prefix_over_path() {
let dest = crate::config::DestinationConfig {
destination_type: crate::config::DestinationType::S3,
prefix: Some("exports/2026-05-21/orders/".into()),
path: Some("/scratch".into()),
..Default::default()
};
assert_eq!(
resolved_prefix_for_display(&dest),
"exports/2026-05-21/orders/",
);
}
#[test]
fn resolved_prefix_falls_back_to_path_when_prefix_missing() {
let dest = crate::config::DestinationConfig {
destination_type: crate::config::DestinationType::Local,
prefix: None,
path: Some("/data/out".into()),
..Default::default()
};
assert_eq!(resolved_prefix_for_display(&dest), "/data/out");
}
use crate::pipeline::ManifestVerificationFailure as VFailure;
fn read_error_verdict() -> ManifestVerification {
ManifestVerification {
legacy_run: false,
failures: vec![VFailure::ManifestReadError {
detail: "permission denied".into(),
}],
..ManifestVerification::legacy()
}
}
#[test]
fn exit_gate_counts_manifest_read_error_as_failure() {
assert!(verdict_fails_exit(&read_error_verdict()));
}
#[test]
fn exit_gate_keeps_legacy_run_at_zero() {
assert!(!verdict_fails_exit(&ManifestVerification::legacy()));
}
#[test]
fn exit_gate_keeps_advisory_untracked_at_zero() {
let v = ManifestVerification {
manifest_found: true,
legacy_run: false,
passed: true,
parts_verified: 1,
failures: vec![VFailure::UntrackedObject {
key: "stray.parquet".into(),
size_bytes: 9,
}],
..ManifestVerification::legacy()
};
assert!(!verdict_fails_exit(&v));
}
#[test]
fn exit_gate_counts_fatal_failure_on_found_manifest() {
let v = ManifestVerification {
manifest_found: true,
legacy_run: false,
failures: vec![VFailure::PartMissing {
part_id: 1,
path: "part-000001.parquet".into(),
}],
..ManifestVerification::legacy()
};
assert!(verdict_fails_exit(&v));
}
use crate::manifest::{
MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
PartStatus, RunManifest,
};
fn success_manifest(parts: Vec<ManifestPart>) -> RunManifest {
let row_count: i64 = parts.iter().map(|p| p.rows).sum();
let part_count = parts.len() as u32;
RunManifest {
manifest_version: MANIFEST_VERSION,
run_id: "r-validate-cmd".into(),
export_name: "orders".into(),
started_at: "2026-06-09T12:00:00Z".into(),
finished_at: "2026-06-09T12:01:00Z".into(),
status: ManifestStatus::Success,
source: ManifestSource {
engine: "postgres".into(),
schema: Some("public".into()),
table: Some("orders".into()),
},
destination: ManifestDestination {
kind: "local".into(),
uri: "file:///tmp/out".into(),
},
format: "parquet".into(),
compression: "zstd".into(),
schema_fingerprint: "xxh3:0123456789abcdef".into(),
row_count,
part_count,
parts,
}
}
fn stage_dataset(prefix: &Path, m: &RunManifest) {
std::fs::create_dir_all(prefix).unwrap();
let dest = crate::destination::create_destination(&crate::config::DestinationConfig {
destination_type: crate::config::DestinationType::Local,
path: Some(prefix.to_string_lossy().into_owned()),
..Default::default()
})
.unwrap();
crate::pipeline::write_manifest(&*dest, m).unwrap();
}
fn write_cfg(dir: &Path, prefix: &Path) -> std::path::PathBuf {
let cfg = dir.join("rivet.yaml");
let yaml = format!(
"source:\n type: postgres\n url: postgresql://nobody@localhost/nope\nexports:\n - name: orders\n query: \"SELECT 1\"\n mode: full\n format: parquet\n destination:\n type: local\n path: \"{}\"\n",
prefix.to_string_lossy()
);
std::fs::write(&cfg, yaml).unwrap();
cfg
}
#[cfg(unix)]
#[test]
fn unreadable_manifest_fails_the_command() {
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::tempdir().unwrap();
let prefix = dir.path().join("out");
stage_dataset(&prefix, &success_manifest(Vec::new()));
let cfg = write_cfg(dir.path(), &prefix);
let manifest_path = prefix.join(crate::manifest::MANIFEST_FILENAME);
std::fs::set_permissions(&manifest_path, std::fs::Permissions::from_mode(0o000)).unwrap();
if std::fs::read(&manifest_path).is_ok() {
eprintln!("skipping unreadable_manifest_fails_the_command: running as root");
return;
}
let report = dir.path().join("report.json");
let err = run_validate_command(
cfg.to_str().unwrap(),
Some("orders"),
ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
ValidateTarget::default(),
)
.expect_err("an unreadable manifest is an explicit failure, not exit 0");
assert!(
format!("{err:#}").contains("1 export(s) failed verification"),
"got: {err:#}"
);
let json: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
let verification = &json["exports"][0]["verification"];
assert_eq!(verification["manifest_found"], false);
assert_eq!(verification["legacy_run"], false);
assert_eq!(verification["failures"][0]["kind"], "manifest_read_error");
}
#[test]
fn untracked_surplus_alone_keeps_exit_zero() {
let dir = tempfile::tempdir().unwrap();
let prefix = dir.path().join("out");
stage_dataset(&prefix, &success_manifest(Vec::new()));
std::fs::write(prefix.join("rogue.parquet"), b"XX").unwrap();
let cfg = write_cfg(dir.path(), &prefix);
let report = dir.path().join("report.json");
run_validate_command(
cfg.to_str().unwrap(),
Some("orders"),
ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
ValidateTarget::default(),
)
.expect("advisory untracked surplus must not flip the exit code");
let json: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
let verification = &json["exports"][0]["verification"];
assert_eq!(verification["passed"], true);
assert_eq!(verification["failures"][0]["kind"], "untracked_object");
let warnings = json["warnings"].as_array().expect("warnings array present");
assert_eq!(warnings.len(), 1, "the untracked surplus is one warning");
assert_eq!(warnings[0]["export_name"], "orders");
assert_eq!(warnings[0]["warning"]["kind"], "untracked_object");
assert_eq!(warnings[0]["warning"]["key"], "rogue.parquet");
}
#[test]
fn json_warnings_array_is_empty_when_no_advisory_failures() {
let dir = tempfile::tempdir().unwrap();
let prefix = dir.path().join("out");
stage_dataset(&prefix, &success_manifest(Vec::new()));
let cfg = write_cfg(dir.path(), &prefix);
let report = dir.path().join("report.json");
run_validate_command(
cfg.to_str().unwrap(),
Some("orders"),
ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
ValidateTarget::default(),
)
.expect("a clean dataset must pass");
let json: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
assert_eq!(
json["warnings"]
.as_array()
.expect("warnings array present")
.len(),
0,
"no surplus → no warnings"
);
}
#[test]
fn missing_part_fails_the_command() {
let dir = tempfile::tempdir().unwrap();
let prefix = dir.path().join("out");
let m = success_manifest(vec![ManifestPart {
part_id: 1,
path: "part-000001.parquet".into(),
rows: 10,
size_bytes: 4,
content_fingerprint: "xxh3:1111111111111111".into(),
content_md5: String::new(),
status: PartStatus::Committed,
}]);
stage_dataset(&prefix, &m); let cfg = write_cfg(dir.path(), &prefix);
let err = run_validate_command(
cfg.to_str().unwrap(),
Some("orders"),
ValidateOutputFormat::Json(None),
ValidateTarget::default(),
)
.expect_err("a missing committed part must fail verification");
assert!(
format!("{err:#}").contains("1 export(s) failed verification"),
"got: {err:#}"
);
}
#[test]
fn prefix_override_with_real_manifest_passes() {
let dir = tempfile::tempdir().unwrap();
let prefix = dir.path().join("out");
stage_dataset(&prefix, &success_manifest(Vec::new()));
let cfg = write_cfg(dir.path(), &prefix);
run_validate_command(
cfg.to_str().unwrap(),
Some("orders"),
ValidateOutputFormat::Json(None),
ValidateTarget {
prefix_override: Some(prefix.to_string_lossy().into_owned()),
..Default::default()
},
)
.expect("a real dataset under a pinned --prefix must pass");
}
#[test]
fn prefix_override_at_absent_manifest_fails() {
let dir = tempfile::tempdir().unwrap();
let cfg_prefix = dir.path().join("cfg_dest");
std::fs::create_dir_all(&cfg_prefix).unwrap();
let cfg = write_cfg(dir.path(), &cfg_prefix);
let empty_prefix = dir.path().join("never_written");
std::fs::create_dir_all(&empty_prefix).unwrap();
let report = dir.path().join("report.json");
let err = run_validate_command(
cfg.to_str().unwrap(),
Some("orders"),
ValidateOutputFormat::Json(Some(report.to_string_lossy().into_owned())),
ValidateTarget {
prefix_override: Some(empty_prefix.to_string_lossy().into_owned()),
..Default::default()
},
)
.expect_err("a never-written prefix pinned via --prefix must fail, not legacy-pass");
assert!(
format!("{err:#}").contains("1 export(s) failed verification"),
"got: {err:#}"
);
let json: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(&report).unwrap()).unwrap();
let verification = &json["exports"][0]["verification"];
assert_eq!(verification["manifest_found"], false);
assert_eq!(verification["legacy_run"], false);
assert_eq!(
verification["failures"][0]["kind"],
"manifest_required_but_absent"
);
}
#[test]
fn absent_manifest_without_prefix_override_stays_legacy_pass() {
let dir = tempfile::tempdir().unwrap();
let prefix = dir.path().join("out");
std::fs::create_dir_all(&prefix).unwrap(); let cfg = write_cfg(dir.path(), &prefix);
run_validate_command(
cfg.to_str().unwrap(),
Some("orders"),
ValidateOutputFormat::Json(None),
ValidateTarget::default(), )
.expect("an absent manifest with no pinned --prefix is a legacy pass (exit 0)");
}
}