use serde::{Deserialize, Serialize};
use crate::destination::Destination;
use crate::error::Result;
use crate::manifest::{
MANIFEST_FILENAME, RunManifest, SUCCESS_FILENAME, join_key, parse_success_marker,
success_marker_body,
};
use crate::pipeline::manifest_reconcile::{PartPresence, reconcile_manifest_against_listing};
pub(crate) const MANIFEST_MAX_BYTES: u64 = 64 * 1024 * 1024;
pub(crate) fn read_capped(dest: &dyn Destination, key: &str, max_bytes: u64) -> Result<Vec<u8>> {
match dest.head(key)? {
None => anyhow::bail!("'{key}' not found at the destination"),
Some(meta) => {
if meta.size_bytes > max_bytes {
anyhow::bail!(
"'{key}' is {} bytes, exceeding the {max_bytes}-byte control-artifact \
read cap — refusing to load it into memory (possible tampering)",
meta.size_bytes
);
}
dest.read(key)
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestVerification {
pub manifest_found: bool,
pub legacy_run: bool,
pub parts_verified: usize,
#[serde(default)]
pub parts_md5_verified: usize,
pub parts_failed: usize,
pub success_marker_consistent: bool,
pub manifest_self_consistent: bool,
pub passed: bool,
pub failures: Vec<Failure>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Failure {
PartMissing { part_id: u32, path: String },
PartSizeMismatch {
part_id: u32,
path: String,
expected: u64,
actual: u64,
},
PartChecksumMismatch {
part_id: u32,
path: String,
expected: String,
actual: String,
},
SuccessMarkerMalformed { body_preview: String },
SuccessMarkerStale {
marker_fingerprint: String,
manifest_fingerprint: String,
},
ManifestSelfInconsistent { detail: String },
ManifestReadError { detail: String },
SuccessMarkerReadError { detail: String },
ListPrefixError { detail: String },
UntrackedObject { key: String, size_bytes: u64 },
ContentVerificationUnmet { size_only: usize, total: usize },
ManifestRequiredButAbsent { prefix: String },
}
impl Failure {
pub fn is_fatal(&self) -> bool {
!matches!(self, Failure::UntrackedObject { .. })
}
}
impl std::fmt::Display for Failure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Failure::PartMissing { part_id, path } => {
write!(f, "part {} missing at {}", part_id, path)
}
Failure::PartSizeMismatch {
part_id,
path,
expected,
actual,
} => write!(
f,
"part {} size mismatch at {}: manifest {}, dest {}",
part_id, path, expected, actual
),
Failure::PartChecksumMismatch {
part_id,
path,
expected,
actual,
} => write!(
f,
"part {} content mismatch at {}: manifest md5 {}, dest {}",
part_id, path, expected, actual
),
Failure::SuccessMarkerMalformed { body_preview } => {
write!(f, "_SUCCESS body malformed: {body_preview:?}")
}
Failure::SuccessMarkerStale {
marker_fingerprint,
manifest_fingerprint,
} => write!(
f,
"_SUCCESS body {} != manifest fingerprint {} (stale marker)",
marker_fingerprint, manifest_fingerprint
),
Failure::ManifestSelfInconsistent { detail } => {
write!(f, "manifest self-consistency: {detail}")
}
Failure::ManifestReadError { detail } => {
write!(f, "manifest read error: {detail}")
}
Failure::SuccessMarkerReadError { detail } => {
write!(f, "_SUCCESS read error: {detail}")
}
Failure::ListPrefixError { detail } => {
write!(f, "destination listing error: {detail}")
}
Failure::UntrackedObject { key, size_bytes } => {
write!(f, "untracked object: {} ({} bytes)", key, size_bytes)
}
Failure::ContentVerificationUnmet { size_only, total } => write!(
f,
"verify: content not met — {size_only} of {total} part(s) only \
size-verified (no store checksum); lower max_file_size so parts \
upload as a single PUT, or the backend exposes no checksum"
),
Failure::ManifestRequiredButAbsent { prefix } => write!(
f,
"no manifest at {prefix}: a manifest was required here (operator \
pinned --prefix) but none was found — this prefix was never \
written, or the data was relocated. Run the export first, or \
drop --prefix to validate the config-resolved destination."
),
}
}
}
impl ManifestVerification {
fn empty() -> Self {
Self {
manifest_found: false,
legacy_run: false,
parts_verified: 0,
parts_md5_verified: 0,
parts_failed: 0,
success_marker_consistent: false,
manifest_self_consistent: false,
passed: false,
failures: Vec::new(),
}
}
fn recompute_passed(&mut self) {
self.passed = self.manifest_found && !self.failures.iter().any(Failure::is_fatal);
}
pub fn enforce_content_policy(&mut self, require_content: bool) {
if require_content && self.manifest_found {
let size_only = self.parts_verified.saturating_sub(self.parts_md5_verified);
if size_only > 0 {
self.failures.push(Failure::ContentVerificationUnmet {
size_only,
total: self.parts_verified,
});
self.recompute_passed();
}
}
}
pub fn require_manifest_present(&mut self, prefix: &str) {
if !self.manifest_found && !self.has_failures() {
self.legacy_run = false;
self.failures.push(Failure::ManifestRequiredButAbsent {
prefix: prefix.to_string(),
});
self.recompute_passed();
}
}
pub fn legacy() -> Self {
Self {
legacy_run: true,
..Self::empty()
}
}
pub fn has_failures(&self) -> bool {
!self.failures.is_empty()
}
}
pub fn verify_at_destination(
dest: &dyn Destination,
manifest_dir: &str,
) -> Result<ManifestVerification> {
let manifest_key = join_key(manifest_dir, MANIFEST_FILENAME);
let success_key = join_key(manifest_dir, SUCCESS_FILENAME);
let manifest_bytes = match dest.head(&manifest_key) {
Ok(None) => return Ok(ManifestVerification::legacy()),
Ok(Some(_)) => match read_capped(dest, &manifest_key, MANIFEST_MAX_BYTES) {
Ok(b) => b,
Err(e) => {
let mut v = ManifestVerification::legacy();
v.legacy_run = false;
v.failures.push(Failure::ManifestReadError {
detail: format!("{e:#}"),
});
v.passed = false;
return Ok(v);
}
},
Err(e) => {
let mut v = ManifestVerification::legacy();
v.legacy_run = false;
v.failures.push(Failure::ManifestReadError {
detail: format!("manifest head failed: {e:#}"),
});
v.passed = false;
return Ok(v);
}
};
let manifest: RunManifest = match serde_json::from_slice(&manifest_bytes) {
Ok(m) => m,
Err(e) => {
return Ok(ManifestVerification {
manifest_found: true,
failures: vec![Failure::ManifestSelfInconsistent {
detail: format!("manifest.json parse failed: {e}"),
}],
..ManifestVerification::empty()
});
}
};
let mut out = ManifestVerification {
manifest_found: true,
manifest_self_consistent: true,
passed: true,
..ManifestVerification::empty()
};
if let Err(e) = manifest.validate_self_consistency() {
out.manifest_self_consistent = false;
out.failures.push(Failure::ManifestSelfInconsistent {
detail: format!("{e}"),
});
}
let reconciliation = match dest.list_prefix(manifest_dir) {
Ok(listing) => Some(reconcile_manifest_against_listing(
&manifest,
&listing,
manifest_dir,
)),
Err(e) => {
out.failures.push(Failure::ListPrefixError {
detail: format!("{e:#}"),
});
None
}
};
if let Some(rec) = &reconciliation {
for check in &rec.per_part {
match &check.presence {
PartPresence::Present { md5_verified } => {
out.parts_verified += 1;
if *md5_verified {
out.parts_md5_verified += 1;
}
}
PartPresence::SizeMismatch { expected, actual } => {
out.parts_failed += 1;
out.failures.push(Failure::PartSizeMismatch {
part_id: check.part_id,
path: check.path.clone(),
expected: *expected,
actual: *actual,
});
}
PartPresence::Missing => {
out.parts_failed += 1;
out.failures.push(Failure::PartMissing {
part_id: check.part_id,
path: check.path.clone(),
});
}
PartPresence::ChecksumMismatch { expected, actual } => {
out.parts_failed += 1;
out.failures.push(Failure::PartChecksumMismatch {
part_id: check.part_id,
path: check.path.clone(),
expected: expected.clone(),
actual: actual.clone(),
});
}
}
}
}
let success_head = match dest.head(&success_key) {
Ok(h) => h,
Err(e) => {
out.failures.push(Failure::SuccessMarkerReadError {
detail: format!("_SUCCESS head failed: {e:#}"),
});
out.recompute_passed();
return Ok(out);
}
};
match success_head {
None => {
}
Some(_) => match dest.read(&success_key) {
Err(e) => {
out.failures.push(Failure::SuccessMarkerReadError {
detail: format!("{e:#}"),
});
}
Ok(body) => {
let body_str = match std::str::from_utf8(&body) {
Ok(s) => s,
Err(_) => {
out.failures.push(Failure::SuccessMarkerMalformed {
body_preview: format!("(non-utf8, {} bytes)", body.len()),
});
out.recompute_passed();
return Ok(out);
}
};
match parse_success_marker(body_str) {
None => {
out.failures.push(Failure::SuccessMarkerMalformed {
body_preview: preview(body_str),
});
}
Some(marker_fp) => {
let manifest_fp = success_marker_body(&manifest_bytes);
let manifest_fp_trimmed = manifest_fp.trim_end_matches('\n');
if marker_fp == manifest_fp_trimmed {
out.success_marker_consistent = true;
} else {
out.failures.push(Failure::SuccessMarkerStale {
marker_fingerprint: marker_fp.to_string(),
manifest_fingerprint: manifest_fp_trimmed.to_string(),
});
}
}
}
}
},
}
if let Some(rec) = reconciliation {
for obj in rec.untracked {
out.failures.push(Failure::UntrackedObject {
key: obj.key,
size_bytes: obj.size_bytes,
});
}
}
out.recompute_passed();
Ok(out)
}
fn preview(s: &str) -> String {
let trimmed: String = s.chars().take(40).collect();
if s.chars().count() > 40 {
format!("{trimmed}…")
} else {
trimmed
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{DestinationConfig, DestinationType};
use crate::destination::local::LocalDestination;
use crate::manifest::{
MANIFEST_VERSION, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
PartStatus, RunManifest,
};
use std::path::Path;
fn local_dest(base: &Path) -> LocalDestination {
LocalDestination::new(&DestinationConfig {
destination_type: DestinationType::Local,
path: Some(base.to_string_lossy().into_owned()),
..Default::default()
})
.unwrap()
}
fn part(part_id: u32, rows: i64, size: u64, fp: &str) -> ManifestPart {
ManifestPart {
part_id,
path: format!("part-{part_id:06}.parquet"),
rows,
size_bytes: size,
content_fingerprint: fp.into(),
content_md5: String::new(),
status: PartStatus::Committed,
}
}
fn build_manifest(parts: Vec<ManifestPart>, status: ManifestStatus) -> RunManifest {
let row_count: i64 = parts
.iter()
.filter(|p| p.status == PartStatus::Committed)
.map(|p| p.rows)
.sum();
let part_count = parts
.iter()
.filter(|p| p.status == PartStatus::Committed)
.count() as u32;
RunManifest {
manifest_version: MANIFEST_VERSION,
run_id: "r".into(),
export_name: "public.orders".into(),
started_at: "2026-05-21T12:00:00Z".into(),
finished_at: "2026-05-21T12:01:00Z".into(),
status,
source: ManifestSource {
engine: "postgres".into(),
schema: Some("public".into()),
table: Some("orders".into()),
},
destination: ManifestDestination {
kind: "local".into(),
uri: "file:///tmp/out".into(),
},
format: "parquet".into(),
compression: "zstd".into(),
schema_fingerprint: "xxh3:0123456789abcdef".into(),
row_count,
part_count,
parts,
}
}
fn write_dataset(dir: &Path, m: &RunManifest, parts_with_bytes: &[(&str, &[u8])]) {
for (name, bytes) in parts_with_bytes {
std::fs::write(dir.join(name), bytes).unwrap();
}
let body = serde_json::to_vec_pretty(m).unwrap();
std::fs::write(dir.join(MANIFEST_FILENAME), &body).unwrap();
if matches!(m.status, ManifestStatus::Success) {
std::fs::write(dir.join(SUCCESS_FILENAME), success_marker_body(&body)).unwrap();
}
}
#[test]
fn happy_path_verifies_all_parts_and_success_marker() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![
part(1, 10, 4, "xxh3:1111111111111111"),
part(2, 20, 5, "xxh3:2222222222222222"),
],
ManifestStatus::Success,
);
write_dataset(
dir.path(),
&m,
&[
("part-000001.parquet", b"AAAA"),
("part-000002.parquet", b"BBBBB"),
],
);
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(!v.legacy_run);
assert_eq!(v.parts_verified, 2);
assert_eq!(v.parts_failed, 0);
assert!(v.success_marker_consistent);
assert!(v.manifest_self_consistent);
assert!(v.passed);
assert!(v.failures.is_empty());
}
#[test]
fn no_manifest_returns_legacy_run_label() {
let dir = tempfile::tempdir().unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.manifest_found);
assert!(v.legacy_run);
assert_eq!(v.parts_verified, 0);
assert!(!v.passed);
assert!(v.failures.is_empty(), "no failures, just a legacy label");
}
#[test]
fn missing_part_is_flagged_with_part_id_and_path() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![
part(1, 10, 4, "xxh3:1111111111111111"),
part(2, 20, 5, "xxh3:2222222222222222"),
],
ManifestStatus::Success,
);
write_dataset(
dir.path(),
&m,
&[("part-000001.parquet", b"AAAA")], );
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert_eq!(v.parts_verified, 1);
assert_eq!(v.parts_failed, 1);
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::PartMissing { part_id: 2, .. }))
);
}
#[test]
fn part_size_mismatch_is_flagged_with_expected_and_actual() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"OOPSIE")]);
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.passed);
let mismatch = v
.failures
.iter()
.find_map(|f| match f {
Failure::PartSizeMismatch {
part_id,
expected,
actual,
..
} => Some((*part_id, *expected, *actual)),
_ => None,
})
.expect("must surface the size mismatch");
assert_eq!(mismatch, (1, 4, 6));
}
#[test]
fn stale_success_marker_is_flagged_as_inconsistent() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(
dir.path().join(SUCCESS_FILENAME),
success_marker_body(b"different manifest body"),
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.success_marker_consistent);
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::SuccessMarkerStale { .. }))
);
}
#[test]
fn malformed_success_marker_body_is_flagged() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(dir.path().join(SUCCESS_FILENAME), b"not even xxh3 shaped").unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.passed);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::SuccessMarkerMalformed { .. }))
);
}
#[test]
fn absent_success_marker_does_not_fail_validation_alone() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Failed,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
assert!(!dir.path().join(SUCCESS_FILENAME).exists());
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(
!v.success_marker_consistent,
"no marker => false (no signal)"
);
assert!(v.passed);
assert!(v.failures.is_empty());
}
#[test]
fn self_inconsistent_manifest_is_flagged_but_part_check_still_runs() {
let dir = tempfile::tempdir().unwrap();
let mut m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
m.row_count = 9999;
let body = serde_json::to_vec_pretty(&m).unwrap();
std::fs::write(dir.path().join("part-000001.parquet"), b"AAAA").unwrap();
std::fs::write(dir.path().join(MANIFEST_FILENAME), &body).unwrap();
std::fs::write(
dir.path().join(SUCCESS_FILENAME),
success_marker_body(&body),
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(!v.manifest_self_consistent);
assert!(!v.passed);
assert_eq!(v.parts_verified, 1);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::ManifestSelfInconsistent { .. }))
);
}
#[test]
fn untracked_object_under_prefix_is_flagged() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(dir.path().join("rogue.parquet"), b"XX").unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(
v.failures.iter().any(
|f| matches!(f, Failure::UntrackedObject { key, .. } if key == "rogue.parquet")
)
);
assert!(v.passed);
}
#[test]
fn quarantine_prefix_objects_are_silently_ignored() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::create_dir_all(dir.path().join(crate::manifest::QUARANTINE_PREFIX)).unwrap();
std::fs::write(
dir.path()
.join(crate::manifest::QUARANTINE_PREFIX)
.join("old.parquet"),
b"OO",
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.passed);
assert!(
!v.failures
.iter()
.any(|f| matches!(f, Failure::UntrackedObject { .. })),
"quarantine_prefix is the legitimate home for these — must not flag"
);
}
#[test]
fn doctor_probe_is_not_flagged_as_untracked() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
std::fs::write(
dir.path().join(crate::manifest::DOCTOR_PROBE_FILENAME),
b"ok",
)
.unwrap();
let dest = local_dest(dir.path());
let v = verify_at_destination(&dest, "").unwrap();
assert!(
!v.has_failures(),
"doctor probe must not surface as a failure: {:?}",
v.failures
);
assert!(v.passed);
}
#[test]
fn verifies_in_subdirectory_when_manifest_dir_is_non_empty() {
let outer = tempfile::tempdir().unwrap();
std::fs::create_dir_all(outer.path().join("sub/run")).unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
let body = serde_json::to_vec_pretty(&m).unwrap();
std::fs::write(outer.path().join("sub/run/part-000001.parquet"), b"AAAA").unwrap();
std::fs::write(outer.path().join("sub/run").join(MANIFEST_FILENAME), &body).unwrap();
std::fs::write(
outer.path().join("sub/run").join(SUCCESS_FILENAME),
success_marker_body(&body),
)
.unwrap();
let dest = local_dest(outer.path());
let v = verify_at_destination(&dest, "sub/run").unwrap();
assert!(v.passed);
assert_eq!(v.parts_verified, 1);
let v2 = verify_at_destination(&dest, "sub/run/").unwrap();
assert!(v2.passed);
}
struct ListFails(LocalDestination);
impl crate::destination::Destination for ListFails {
fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
self.0.write(p, k)
}
fn capabilities(&self) -> crate::destination::DestinationCapabilities {
self.0.capabilities()
}
fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
self.0.head(k)
}
fn read(&self, k: &str) -> Result<Vec<u8>> {
self.0.read(k)
}
fn list_prefix(&self, _: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
anyhow::bail!("listing unavailable")
}
}
#[test]
fn list_failure_cannot_certify_parts_and_fails_the_audit() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(vec![part(0, 3, 3, "xxh3:0")], ManifestStatus::Success);
write_dataset(dir.path(), &m, &[("part-000000.parquet", b"abc")]);
let dest = ListFails(local_dest(dir.path()));
let v = verify_at_destination(&dest, "").unwrap();
assert!(v.manifest_found);
assert!(v.manifest_self_consistent);
assert!(
!v.passed,
"an audit that cannot list the prefix must not pass"
);
assert_eq!(v.parts_verified, 0);
assert!(
v.failures
.iter()
.any(|f| matches!(f, Failure::ListPrefixError { .. })),
"expected a ListPrefixError, got: {:?}",
v.failures
);
}
struct ManifestReadFails(LocalDestination);
impl crate::destination::Destination for ManifestReadFails {
fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
self.0.write(p, k)
}
fn capabilities(&self) -> crate::destination::DestinationCapabilities {
self.0.capabilities()
}
fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
self.0.head(k)
}
fn read(&self, k: &str) -> Result<Vec<u8>> {
if k.ends_with(MANIFEST_FILENAME) {
anyhow::bail!("permission denied (simulated)")
}
self.0.read(k)
}
fn list_prefix(&self, p: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
self.0.list_prefix(p)
}
}
#[test]
fn unreadable_manifest_is_explicit_failure_not_legacy() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
let dest = ManifestReadFails(local_dest(dir.path()));
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.manifest_found);
assert!(!v.legacy_run, "a read error is not the M6 legacy label");
assert!(!v.passed);
assert!(v.has_failures(), "orchestrators need a reason to refuse");
assert!(
matches!(v.failures.as_slice(), [Failure::ManifestReadError { .. }]),
"expected exactly one ManifestReadError, got: {:?}",
v.failures
);
}
struct ManifestHeadFails(LocalDestination);
impl crate::destination::Destination for ManifestHeadFails {
fn write(&self, p: &Path, k: &str) -> Result<crate::destination::WriteOutcome> {
self.0.write(p, k)
}
fn capabilities(&self) -> crate::destination::DestinationCapabilities {
self.0.capabilities()
}
fn head(&self, k: &str) -> Result<Option<crate::destination::ObjectMeta>> {
if k.ends_with(MANIFEST_FILENAME) {
anyhow::bail!("io timeout (simulated)")
}
self.0.head(k)
}
fn read(&self, k: &str) -> Result<Vec<u8>> {
self.0.read(k)
}
fn list_prefix(&self, p: &str) -> Result<Vec<crate::destination::ObjectMeta>> {
self.0.list_prefix(p)
}
}
#[test]
fn manifest_head_error_is_explicit_failure_not_legacy() {
let dir = tempfile::tempdir().unwrap();
let m = build_manifest(
vec![part(1, 10, 4, "xxh3:1111111111111111")],
ManifestStatus::Success,
);
write_dataset(dir.path(), &m, &[("part-000001.parquet", b"AAAA")]);
let dest = ManifestHeadFails(local_dest(dir.path()));
let v = verify_at_destination(&dest, "").unwrap();
assert!(!v.manifest_found);
assert!(!v.legacy_run);
assert!(!v.passed);
assert!(
matches!(
v.failures.as_slice(),
[Failure::ManifestReadError { detail }] if detail.contains("manifest head failed")
),
"expected one ManifestReadError naming the head step, got: {:?}",
v.failures
);
}
#[test]
fn passed_is_derived_advisory_failures_do_not_fail() {
let mut v = ManifestVerification {
manifest_found: true,
..ManifestVerification::empty()
};
v.failures.push(Failure::UntrackedObject {
key: "stray.parquet".into(),
size_bytes: 9,
});
v.recompute_passed();
assert!(v.passed, "untracked surplus is advisory, not fatal");
v.failures.push(Failure::PartMissing {
part_id: 1,
path: "part-000001.parquet".into(),
});
v.recompute_passed();
assert!(!v.passed, "a missing part is fatal");
let mut legacy = ManifestVerification::empty();
legacy.recompute_passed();
assert!(!legacy.passed, "no manifest found → cannot certify");
}
#[test]
fn verify_content_policy_fails_only_size_only_parts() {
let base = ManifestVerification {
manifest_found: true,
parts_verified: 3,
parts_md5_verified: 2,
..ManifestVerification::empty()
};
let mut sz = base.clone();
sz.recompute_passed();
sz.enforce_content_policy(false);
assert!(sz.passed, "size-only OK under verify: size");
let mut ct = base.clone();
ct.recompute_passed();
ct.enforce_content_policy(true);
assert!(!ct.passed, "a size-only part fails verify: content");
assert!(
ct.failures.iter().any(|f| matches!(
f,
Failure::ContentVerificationUnmet {
size_only: 1,
total: 3
}
)),
"expected ContentVerificationUnmet, got: {:?}",
ct.failures
);
let mut all = ManifestVerification {
parts_md5_verified: 3,
..base
};
all.recompute_passed();
all.enforce_content_policy(true);
assert!(
all.passed && all.failures.is_empty(),
"all md5 meets verify: content"
);
}
#[test]
fn require_manifest_escalates_legacy_to_fatal_absent() {
let mut v = ManifestVerification::legacy();
assert!(v.legacy_run && !v.has_failures());
v.require_manifest_present("exports/2026-06-09/orders/");
assert!(!v.legacy_run, "no longer the benign legacy-run label");
assert!(!v.passed, "an absent-but-required manifest cannot pass");
assert!(
matches!(
v.failures.as_slice(),
[Failure::ManifestRequiredButAbsent { prefix }]
if prefix == "exports/2026-06-09/orders/"
),
"expected one ManifestRequiredButAbsent naming the prefix, got: {:?}",
v.failures
);
}
#[test]
fn require_manifest_is_noop_on_a_real_passing_manifest() {
let mut v = ManifestVerification {
manifest_found: true,
manifest_self_consistent: true,
parts_verified: 1,
passed: true,
..ManifestVerification::empty()
};
v.require_manifest_present("exports/orders/");
assert!(
v.passed && v.failures.is_empty(),
"real dataset still passes"
);
}
#[test]
fn require_manifest_does_not_double_flag_a_read_error() {
let mut v = ManifestVerification::legacy();
v.legacy_run = false;
v.failures.push(Failure::ManifestReadError {
detail: "permission denied".into(),
});
v.recompute_passed();
v.require_manifest_present("exports/orders/");
assert!(
matches!(v.failures.as_slice(), [Failure::ManifestReadError { .. }]),
"must leave the existing read-error verdict alone, got: {:?}",
v.failures
);
}
}