use serde::{Deserialize, Serialize};
use crate::destination::Destination;
use crate::error::Result;
use crate::manifest::{
MANIFEST_FILENAME, RunManifest, SUCCESS_FILENAME, parse_success_marker, success_marker_body,
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestVerification {
pub manifest_found: bool,
pub legacy_run: bool,
pub parts_verified: usize,
pub parts_failed: usize,
pub success_marker_consistent: bool,
pub manifest_self_consistent: bool,
pub passed: bool,
pub failures: Vec<Failure>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Failure {
PartMissing { part_id: u32, path: String },
PartSizeMismatch {
part_id: u32,
path: String,
expected: u64,
actual: u64,
},
SuccessMarkerMalformed { body_preview: String },
SuccessMarkerStale {
marker_fingerprint: String,
manifest_fingerprint: String,
},
ManifestSelfInconsistent { detail: String },
ManifestReadError { detail: String },
SuccessMarkerReadError { detail: String },
ListPrefixError { detail: String },
UntrackedObject { key: String, size_bytes: u64 },
}
impl std::fmt::Display for Failure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Failure::PartMissing { part_id, path } => {
write!(f, "part {} missing at {}", part_id, path)
}
Failure::PartSizeMismatch {
part_id,
path,
expected,
actual,
} => write!(
f,
"part {} size mismatch at {}: manifest {}, dest {}",
part_id, path, expected, actual
),
Failure::SuccessMarkerMalformed { body_preview } => {
write!(f, "_SUCCESS body malformed: {body_preview:?}")
}
Failure::SuccessMarkerStale {
marker_fingerprint,
manifest_fingerprint,
} => write!(
f,
"_SUCCESS body {} != manifest fingerprint {} (stale marker)",
marker_fingerprint, manifest_fingerprint
),
Failure::ManifestSelfInconsistent { detail } => {
write!(f, "manifest self-consistency: {detail}")
}
Failure::ManifestReadError { detail } => {
write!(f, "manifest read error: {detail}")
}
Failure::SuccessMarkerReadError { detail } => {
write!(f, "_SUCCESS read error: {detail}")
}
Failure::ListPrefixError { detail } => {
write!(f, "destination listing error: {detail}")
}
Failure::UntrackedObject { key, size_bytes } => {
write!(f, "untracked object: {} ({} bytes)", key, size_bytes)
}
}
}
}
impl ManifestVerification {
pub fn legacy() -> Self {
Self {
manifest_found: false,
legacy_run: true,
parts_verified: 0,
parts_failed: 0,
success_marker_consistent: false,
manifest_self_consistent: false,
passed: false,
failures: Vec::new(),
}
}
pub fn has_failures(&self) -> bool {
!self.failures.is_empty()
}
}
pub fn verify_at_destination(
dest: &dyn Destination,
manifest_dir: &str,
) -> Result<ManifestVerification> {
let manifest_key = join_key(manifest_dir, MANIFEST_FILENAME);
let success_key = join_key(manifest_dir, SUCCESS_FILENAME);
let manifest_bytes = match dest.head(&manifest_key) {
Ok(None) => return Ok(ManifestVerification::legacy()),
Ok(Some(_)) => match dest.read(&manifest_key) {
Ok(b) => b,
Err(e) => {
let mut v = ManifestVerification::legacy();
v.legacy_run = false;
v.failures.push(Failure::ManifestReadError {
detail: format!("{e:#}"),
});
v.passed = false;
return Ok(v);
}
},
Err(e) => {
let mut v = ManifestVerification::legacy();
v.legacy_run = false;
v.failures.push(Failure::ManifestReadError {
detail: format!("manifest head failed: {e:#}"),
});
v.passed = false;
return Ok(v);
}
};
let manifest: RunManifest = match serde_json::from_slice(&manifest_bytes) {
Ok(m) => m,
Err(e) => {
return Ok(ManifestVerification {
manifest_found: true,
legacy_run: false,
parts_verified: 0,
parts_failed: 0,
success_marker_consistent: false,
manifest_self_consistent: false,
passed: false,
failures: vec![Failure::ManifestSelfInconsistent {
detail: format!("manifest.json parse failed: {e}"),
}],
});
}
};
let mut out = ManifestVerification {
manifest_found: true,
legacy_run: false,
parts_verified: 0,
parts_failed: 0,
success_marker_consistent: false,
manifest_self_consistent: true,
passed: true,
failures: Vec::new(),
};
if let Err(e) = manifest.validate_self_consistency() {
out.manifest_self_consistent = false;
out.passed = false;
out.failures.push(Failure::ManifestSelfInconsistent {
detail: format!("{e}"),
});
}
for part in &manifest.parts {
if part.status != crate::manifest::PartStatus::Committed {
continue; }
let part_key = join_key(manifest_dir, &part.path);
match dest.head(&part_key) {
Ok(Some(meta)) if meta.size_bytes == part.size_bytes => {
out.parts_verified += 1;
}
Ok(Some(meta)) => {
out.parts_failed += 1;
out.passed = false;
out.failures.push(Failure::PartSizeMismatch {
part_id: part.part_id,
path: part.path.clone(),
expected: part.size_bytes,
actual: meta.size_bytes,
});
}
Ok(None) => {
out.parts_failed += 1;
out.passed = false;
out.failures.push(Failure::PartMissing {
part_id: part.part_id,
path: part.path.clone(),
});
}
Err(e) => {
out.parts_failed += 1;
out.passed = false;
out.failures.push(Failure::PartMissing {
part_id: part.part_id,
path: format!("{} (head failed: {e})", part.path),
});
}
}
}
let success_head = match dest.head(&success_key) {
Ok(h) => h,
Err(e) => {
out.passed = false;
out.failures.push(Failure::SuccessMarkerReadError {
detail: format!("_SUCCESS head failed: {e:#}"),
});
return Ok(out);
}
};
match success_head {
None => {
}
Some(_) => match dest.read(&success_key) {
Err(e) => {
out.passed = false;
out.failures.push(Failure::SuccessMarkerReadError {
detail: format!("{e:#}"),
});
}
Ok(body) => {
let body_str = match std::str::from_utf8(&body) {
Ok(s) => s,
Err(_) => {
out.passed = false;
out.failures.push(Failure::SuccessMarkerMalformed {
body_preview: format!("(non-utf8, {} bytes)", body.len()),
});
return Ok(out);
}
};
match parse_success_marker(body_str) {
None => {
out.passed = false;
out.failures.push(Failure::SuccessMarkerMalformed {
body_preview: preview(body_str),
});
}
Some(marker_fp) => {
let manifest_fp = success_marker_body(&manifest_bytes);
let manifest_fp_trimmed = manifest_fp.trim_end_matches('\n');
if marker_fp == manifest_fp_trimmed {
out.success_marker_consistent = true;
} else {
out.passed = false;
out.failures.push(Failure::SuccessMarkerStale {
marker_fingerprint: marker_fp.to_string(),
manifest_fingerprint: manifest_fp_trimmed.to_string(),
});
}
}
}
}
},
}
match dest.list_prefix(manifest_dir) {
Err(e) => {
out.failures.push(Failure::ListPrefixError {
detail: format!("{e:#}"),
});
}
Ok(listing) => {
let known: std::collections::HashSet<String> = std::iter::once(manifest_key.clone())
.chain(std::iter::once(success_key.clone()))
.chain(
manifest
.parts
.iter()
.filter(|p| p.status == crate::manifest::PartStatus::Committed)
.map(|p| join_key(manifest_dir, &p.path)),
)
.collect();
for obj in listing {
if known.contains(&obj.key) {
continue;
}
if obj.key.contains(crate::manifest::QUARANTINE_PREFIX) {
continue;
}
out.failures.push(Failure::UntrackedObject {
key: obj.key,
size_bytes: obj.size_bytes,
});
}
}
}
Ok(out)
}
fn join_key(dir: &str, key: &str) -> String {
let dir = dir.trim_end_matches('/');
if dir.is_empty() {
key.to_string()
} else {
format!("{}/{}", dir, key)
}
}
fn preview(s: &str) -> String {
let trimmed: String = s.chars().take(40).collect();
if s.chars().count() > 40 {
format!("{trimmed}…")
} else {
trimmed
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{DestinationConfig, DestinationType};
use crate::destination::local::LocalDestination;
use crate::manifest::{
MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
PartStatus, RunManifest,
};
use std::path::Path;
fn local_dest(base: &Path) -> LocalDestination {
LocalDestination::new(&DestinationConfig {
destination_type: DestinationType::Local,
path: Some(base.to_string_lossy().into_owned()),
..Default::default()
})
.unwrap()
}
fn part(part_id: u32, rows: i64, size: u64, fp: &str) -> ManifestPart {
ManifestPart {
part_id,
path: format!("part-{part_id:06}.parquet"),
rows,
size_bytes: size,
content_fingerprint: fp.into(),
status: PartStatus::Committed,
}
}
fn build_manifest(parts: Vec<ManifestPart>, status: ManifestStatus) -> RunManifest {
let row_count: i64 = parts
.iter()
.filter(|p| p.status == PartStatus::Committed)
.map(|p| p.rows)
.sum();
let part_count = parts
.iter()
.filter(|p| p.status == PartStatus::Committed)
.count() as u32;
RunManifest {
manifest_version: MANIFEST_VERSION,
run_id: "r".into(),
export_name: "public.orders".into(),
started_at: "2026-05-21T12:00:00Z".into(),
finished_at: "2026-05-21T12:01:00Z".into(),
status,
source: ManifestSource {
engine: "postgres".into(),
schema: Some("public".into()),
table: Some("orders".into()),
},
destination: ManifestDestination {
kind: "local".into(),
uri: "file:///tmp/out".into(),
},
format: "parquet".into(),
compression: "zstd".into(),
schema_fingerprint: "xxh3:0123456789abcdef".into(),
row_count,
part_count,
parts,
}
}
fn write_dataset(dir: &Path, m: &RunManifest, parts_with_bytes: &[(&str, &[u8])]) {
for (name, bytes) in parts_with_bytes {
std::fs::write(dir.join(name), bytes).unwrap();
}
let body = serde_json::to_vec_pretty(m).unwrap();
std::fs::write(dir.join(MANIFEST_FILENAME), &body).unwrap();
if matches!(m.status, ManifestStatus::Success) {
std::fs::write(dir.join(SUCCESS_FILENAME), success_marker_body(&body)).unwrap();
}
}
#[test]
fn happy_path_verifies_all_parts_and_success_marker() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![
part(1, 10, 4, "xxh3:1111111111111111"),
part(2, 20, 5, "xxh3:2222222222222222"),
],
ManifestStatus::Success,
);
write_dataset(
dir.path(),
&m,
&[
("part-000001.parquet", b"AAAA"),
("part-000002.parquet", b"BBBBB"),
],
);
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(!v.legacy_run);
assert_eq!(v.parts_verified, 2);
assert_eq!(v.parts_failed, 0);
assert!(v.success_marker_consistent);
assert!(v.manifest_self_consistent);
assert!(v.passed);
assert!(v.failures.is_empty());
}
#[test]
fn no_manifest_returns_legacy_run_label() {
let dir = tempfile::tempdir().unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.manifest_found);
assert!(v.legacy_run);
assert_eq!(v.parts_verified, 0);
assert!(!v.passed);
assert!(v.failures.is_empty(), "no failures, just a legacy label");
}
#[test]
fn missing_part_is_flagged_with_part_id_and_path() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![
part(1, 10, 4, "xxh3:1111111111111111"),
part(2, 20, 5, "xxh3:2222222222222222"),
],
ManifestStatus::Success,
);
write_dataset(
dir.path(),
&m,
&[("part-000001.parquet", b"AAAA")], );
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert_eq!(v.parts_verified, 1);
assert_eq!(v.parts_failed, 1);
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::PartMissing { part_id: 2, .. }))
);
}
#[test]
fn part_size_mismatch_is_flagged_with_expected_and_actual() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"OOPSIE")]);
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.passed);
let mismatch = v
.failures
.iter()
.find_map(|f| match f {
Failure::PartSizeMismatch {
part_id,
expected,
actual,
..
} => Some((*part_id, *expected, *actual)),
_ => None,
})
.expect("must surface the size mismatch");
assert_eq!(mismatch, (1, 4, 6));
}
#[test]
fn stale_success_marker_is_flagged_as_inconsistent() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(
dir.path().join(SUCCESS_FILENAME),
success_marker_body(b"different manifest body"),
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.success_marker_consistent);
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::SuccessMarkerStale { .. }))
);
}
#[test]
fn malformed_success_marker_body_is_flagged() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(dir.path().join(SUCCESS_FILENAME), b"not even xxh3 shaped").unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::SuccessMarkerMalformed { .. }))
);
}
#[test]
fn absent_success_marker_does_not_fail_validation_alone() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Failed,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
assert!(!dir.path().join(SUCCESS_FILENAME).exists());
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(
!v.success_marker_consistent,
"no marker => false (no signal)"
);
assert!(v.passed);
assert!(v.failures.is_empty());
}
#[test]
fn self_inconsistent_manifest_is_flagged_but_part_check_still_runs() {
let dir = tempfile::tempdir().unwrap();
let mut m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
m.row_count = 9999;
let body = serde_json::to_vec_pretty(&m).unwrap();
std::fs::write(dir.path().join("part-000001.parquet"), b"AAAA").unwrap();
std::fs::write(dir.path().join(MANIFEST_FILENAME), &body).unwrap();
std::fs::write(
dir.path().join(SUCCESS_FILENAME),
success_marker_body(&body),
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(!v.manifest_self_consistent);
assert!(!v.passed);
assert_eq!(v.parts_verified, 1);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::ManifestSelfInconsistent { .. }))
);
}
#[test]
fn untracked_object_under_prefix_is_flagged() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(dir.path().join("rogue.parquet"), b"XX").unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(
v.failures.iter().any(
|f| matches!(f, Failure::UntrackedObject { key, .. } if key == "rogue.parquet")
)
);
assert!(v.passed);
}
#[test]
fn quarantine_prefix_objects_are_silently_ignored() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::create_dir_all(dir.path().join(crate::manifest::QUARANTINE_PREFIX)).unwrap();
std::fs::write(
dir.path()
.join(crate::manifest::QUARANTINE_PREFIX)
.join("old.parquet"),
b"OO",
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.passed);
assert!(
!v.failures
.iter()
.any(|f| matches!(f, Failure::UntrackedObject { .. })),
"quarantine_prefix is the legitimate home for these — must not flag"
);
}
#[test]
fn verifies_in_subdirectory_when_manifest_dir_is_non_empty() {
let outer = tempfile::tempdir().unwrap();
std::fs::create_dir_all(outer.path().join("sub/run")).unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
let body = serde_json::to_vec_pretty(&m).unwrap();
std::fs::write(outer.path().join("sub/run/part-000001.parquet"), b"AAAA").unwrap();
std::fs::write(outer.path().join("sub/run").join(MANIFEST_FILENAME), &body).unwrap();
std::fs::write(
outer.path().join("sub/run").join(SUCCESS_FILENAME),
success_marker_body(&body),
)
.unwrap();
let dest = local_dest(outer.path());
let v = verify_at_destination(&dest, "sub/run").unwrap();
assert!(v.passed);
assert_eq!(v.parts_verified, 1);
let v2 = verify_at_destination(&dest, "sub/run/").unwrap();
assert!(v2.passed);
}
}