use std::io::Read;
use std::path::Path;
use crate::destination::{Destination, WriteCommitProtocol};
use crate::error::Result;
use crate::journal::PlanSnapshot;
use crate::manifest::{
MANIFEST_FILENAME, ManifestDestination, ManifestPart, ManifestSource, ManifestStatus,
PartStatus, RunManifest, SUCCESS_FILENAME, success_marker_body,
};
use crate::pipeline::summary::RunSummary;
pub struct ManifestBuilder {
run_id: String,
export_name: String,
started_at: chrono::DateTime<chrono::Utc>,
source: ManifestSource,
destination: ManifestDestination,
format: String,
compression: String,
schema_fingerprint: String,
parts: Vec<ManifestPart>,
}
impl ManifestBuilder {
#[allow(clippy::too_many_arguments)]
pub fn new(
plan: &PlanSnapshot,
run_id: &str,
started_at: chrono::DateTime<chrono::Utc>,
schema_fingerprint: String,
source_engine: &str,
source_schema: Option<String>,
source_table: Option<String>,
destination_uri: String,
) -> Self {
Self {
run_id: run_id.to_string(),
export_name: plan.export_name.clone(),
started_at,
source: ManifestSource {
engine: source_engine.to_string(),
schema: source_schema,
table: source_table,
},
destination: ManifestDestination {
kind: plan.destination_type.clone(),
uri: destination_uri,
},
format: plan.format.clone(),
compression: plan.compression.clone(),
schema_fingerprint,
parts: Vec::new(),
}
}
pub fn record_part(
&mut self,
part_id: u32,
relative_path: String,
rows: i64,
size_bytes: u64,
content_fingerprint: String,
content_md5: String,
) {
self.parts.push(ManifestPart {
part_id,
path: relative_path,
rows,
size_bytes,
content_fingerprint,
content_md5,
status: PartStatus::Committed,
});
}
pub fn finalize(self, status: ManifestStatus) -> RunManifest {
let row_count: i64 = self
.parts
.iter()
.filter(|p| p.status == PartStatus::Committed)
.map(|p| p.rows)
.sum();
let part_count = self
.parts
.iter()
.filter(|p| p.status == PartStatus::Committed)
.count() as u32;
let finished_at = chrono::Utc::now();
RunManifest {
manifest_version: crate::manifest::MANIFEST_VERSION,
run_id: self.run_id,
export_name: self.export_name,
started_at: self.started_at.to_rfc3339(),
finished_at: finished_at.to_rfc3339(),
status,
source: self.source,
destination: self.destination,
format: self.format,
compression: self.compression,
schema_fingerprint: self.schema_fingerprint,
row_count,
part_count,
parts: self.parts,
}
}
}
pub fn compute_part_checksums(path: &Path) -> Result<(String, String)> {
use base64::Engine as _;
use md5::{Digest, Md5};
use xxhash_rust::xxh3::Xxh3;
let mut f = std::fs::File::open(path)?;
let mut xxh = Xxh3::new();
let mut md5 = Md5::new();
let mut buf = [0u8; 64 * 1024];
loop {
let n = f.read(&mut buf)?;
if n == 0 {
break;
}
xxh.update(&buf[..n]);
md5.update(&buf[..n]);
}
Ok((
format!("xxh3:{:016x}", xxh.digest()),
base64::engine::general_purpose::STANDARD.encode(md5.finalize()),
))
}
pub fn record_run_schema_fingerprint(
summary: &mut crate::pipeline::summary::RunSummary,
dest_schema: &arrow::datatypes::Schema,
) {
if summary.schema_fingerprint.is_some() {
return;
}
let columns = crate::state::arrow_schema_to_columns(dest_schema);
summary.schema_fingerprint = Some(crate::state::schema_fingerprint(&columns));
}
pub fn record_committed_part_with_fingerprint(
summary: &mut RunSummary,
relative_path: String,
rows: i64,
size_bytes: u64,
content_fingerprint: String,
content_md5: String,
) {
let part_id = summary
.manifest_parts
.iter()
.map(|p| p.part_id)
.max()
.map(|m| m + 1)
.unwrap_or(1);
summary.manifest_parts.push(ManifestPart {
part_id,
path: relative_path,
rows,
size_bytes,
content_fingerprint,
content_md5,
status: PartStatus::Committed,
});
}
#[derive(Debug)]
pub enum WriteOutcome {
Written { success_marker: bool },
SkippedStreaming,
}
pub fn write_manifest(dest: &dyn Destination, manifest: &RunManifest) -> Result<WriteOutcome> {
if dest.capabilities().commit_protocol == WriteCommitProtocol::Streaming {
log::info!(
"destination is streaming; manifest.json / _SUCCESS not written (ADR-0012 §Artifacts)"
);
return Ok(WriteOutcome::SkippedStreaming);
}
let bytes = serde_json::to_vec_pretty(manifest)?;
let manifest_tmp = tempfile::NamedTempFile::new()?;
std::fs::write(manifest_tmp.path(), &bytes)?;
dest.write(manifest_tmp.path(), MANIFEST_FILENAME)?;
let success_marker = matches!(manifest.status, ManifestStatus::Success);
if success_marker {
let marker_body = success_marker_body(&bytes);
let success_tmp = tempfile::NamedTempFile::new()?;
std::fs::write(success_tmp.path(), marker_body.as_bytes())?;
dest.write(success_tmp.path(), SUCCESS_FILENAME)?;
}
Ok(WriteOutcome::Written { success_marker })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{DestinationConfig, DestinationType};
use crate::destination::local::LocalDestination;
use std::io::Write;
fn plan_snapshot() -> PlanSnapshot {
PlanSnapshot {
export_name: "public.orders".into(),
base_query: "SELECT * FROM orders".into(),
strategy: "snapshot".into(),
format: "parquet".into(),
compression: "zstd".into(),
destination_type: "local".into(),
tuning_profile: "balanced".into(),
batch_size: 1000,
validate: false,
reconcile: false,
resume: false,
}
}
fn local_dest(base: &Path) -> LocalDestination {
LocalDestination::new(&DestinationConfig {
destination_type: DestinationType::Local,
path: Some(base.to_string_lossy().into_owned()),
..Default::default()
})
.expect("build LocalDestination")
}
#[test]
fn builder_starts_empty() {
let b = ManifestBuilder::new(
&plan_snapshot(),
"run_001",
chrono::Utc::now(),
"xxh3:0000000000000000".into(),
"postgres",
Some("public".into()),
Some("orders".into()),
"file:///tmp/out/".into(),
);
let m = b.finalize(ManifestStatus::Success);
assert_eq!(m.part_count, 0);
assert_eq!(m.row_count, 0);
assert!(m.parts.is_empty());
assert_eq!(m.validate_self_consistency(), Ok(()));
}
#[test]
fn builder_aggregates_parts_into_self_consistent_manifest() {
let mut b = ManifestBuilder::new(
&plan_snapshot(),
"run_002",
chrono::Utc::now(),
"xxh3:0123456789abcdef".into(),
"postgres",
None,
None,
"file:///tmp/out/".into(),
);
b.record_part(
1,
"part-000001.parquet".into(),
50_000,
4096,
"xxh3:aaaaaaaaaaaaaaaa".into(),
String::new(),
);
b.record_part(
2,
"part-000002.parquet".into(),
25_000,
2048,
"xxh3:bbbbbbbbbbbbbbbb".into(),
String::new(),
);
let m = b.finalize(ManifestStatus::Success);
assert_eq!(m.part_count, 2);
assert_eq!(m.row_count, 75_000);
assert_eq!(m.parts.len(), 2);
assert_eq!(m.validate_self_consistency(), Ok(()));
}
#[test]
fn builder_records_started_and_finished_in_order() {
let b = ManifestBuilder::new(
&plan_snapshot(),
"run_003",
chrono::Utc::now(),
"xxh3:0".into(),
"postgres",
None,
None,
"file:///x".into(),
);
std::thread::sleep(std::time::Duration::from_millis(2));
let m = b.finalize(ManifestStatus::Success);
let started = chrono::DateTime::parse_from_rfc3339(&m.started_at).unwrap();
let finished = chrono::DateTime::parse_from_rfc3339(&m.finished_at).unwrap();
assert!(finished >= started, "{started:?} > {finished:?}");
}
#[test]
fn builder_carries_status_through_finalize() {
let b = ManifestBuilder::new(
&plan_snapshot(),
"run_004",
chrono::Utc::now(),
"xxh3:0".into(),
"postgres",
None,
None,
"file:///x".into(),
);
let m = b.finalize(ManifestStatus::Failed);
assert_eq!(m.status, ManifestStatus::Failed);
}
#[test]
fn single_pass_checksums_equal_independent_recompute() {
use base64::Engine as _;
use md5::{Digest, Md5};
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("part.bin");
let data = b"the quick brown fox jumps over the lazy dog";
std::fs::write(&p, data).unwrap();
let (fp, md5) = compute_part_checksums(&p).unwrap();
assert_eq!(
fp,
format!("xxh3:{:016x}", xxhash_rust::xxh3::xxh3_64(data))
);
let mut h = Md5::new();
h.update(data);
assert_eq!(
md5,
base64::engine::general_purpose::STANDARD.encode(h.finalize())
);
}
#[test]
fn fingerprint_format_matches_adr_0012() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("part.bin");
std::fs::write(&p, b"hello world").unwrap();
let (fp, _) = compute_part_checksums(&p).unwrap();
assert!(fp.starts_with("xxh3:"));
assert_eq!(fp.len(), "xxh3:".len() + 16);
let hex = &fp["xxh3:".len()..];
assert!(
hex.chars()
.all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())
);
}
#[test]
fn fingerprint_is_content_dependent() {
let dir = tempfile::tempdir().unwrap();
let a = dir.path().join("a.bin");
let b = dir.path().join("b.bin");
std::fs::write(&a, b"alpha").unwrap();
std::fs::write(&b, b"beta").unwrap();
assert_ne!(
compute_part_checksums(&a).unwrap().0,
compute_part_checksums(&b).unwrap().0
);
}
#[test]
fn fingerprint_is_deterministic_for_same_content() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("part.bin");
std::fs::write(&p, b"deterministic").unwrap();
let fp1 = compute_part_checksums(&p).unwrap().0;
let fp2 = compute_part_checksums(&p).unwrap().0;
assert_eq!(fp1, fp2);
}
#[test]
fn fingerprint_streams_files_larger_than_buffer() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("big.bin");
let payload: Vec<u8> = (0..(128 * 1024)).map(|i| (i % 251) as u8).collect();
{
let mut f = std::fs::File::create(&p).unwrap();
f.write_all(&payload).unwrap();
}
let (fp, _) = compute_part_checksums(&p).unwrap();
let one_shot = format!("xxh3:{:016x}", xxhash_rust::xxh3::xxh3_64(&payload));
assert_eq!(fp, one_shot);
}
fn build_manifest(status: ManifestStatus) -> RunManifest {
let mut b = ManifestBuilder::new(
&plan_snapshot(),
"run_001",
chrono::Utc::now(),
"xxh3:0123456789abcdef".into(),
"postgres",
Some("public".into()),
Some("orders".into()),
"file:///tmp/out/".into(),
);
b.record_part(
1,
"part-000001.parquet".into(),
100,
4096,
"xxh3:1111111111111111".into(),
String::new(),
);
b.record_part(
2,
"part-000002.parquet".into(),
200,
8192,
"xxh3:2222222222222222".into(),
String::new(),
);
b.finalize(status)
}
#[test]
fn write_manifest_creates_manifest_json_on_local() {
let dir = tempfile::tempdir().unwrap();
let dest = local_dest(dir.path());
let m = build_manifest(ManifestStatus::Success);
let outcome = write_manifest(&dest, &m).unwrap();
assert!(matches!(
outcome,
WriteOutcome::Written {
success_marker: true
}
));
assert!(dir.path().join(MANIFEST_FILENAME).exists());
assert!(dir.path().join(SUCCESS_FILENAME).exists());
}
#[test]
fn manifest_json_is_parseable_and_matches_input() {
let dir = tempfile::tempdir().unwrap();
let dest = local_dest(dir.path());
let m = build_manifest(ManifestStatus::Success);
write_manifest(&dest, &m).unwrap();
let read = std::fs::read_to_string(dir.path().join(MANIFEST_FILENAME)).unwrap();
let parsed: RunManifest = serde_json::from_str(&read).unwrap();
assert_eq!(parsed, m);
assert_eq!(parsed.validate_self_consistency(), Ok(()));
}
#[test]
fn success_marker_carries_correct_fingerprint_for_manifest_bytes() {
let dir = tempfile::tempdir().unwrap();
let dest = local_dest(dir.path());
let m = build_manifest(ManifestStatus::Success);
write_manifest(&dest, &m).unwrap();
let bytes = std::fs::read(dir.path().join(MANIFEST_FILENAME)).unwrap();
let marker = std::fs::read_to_string(dir.path().join(SUCCESS_FILENAME)).unwrap();
let expected = success_marker_body(&bytes);
assert_eq!(marker, expected);
}
#[test]
fn failed_status_writes_manifest_but_not_success_marker() {
let dir = tempfile::tempdir().unwrap();
let dest = local_dest(dir.path());
let m = build_manifest(ManifestStatus::Failed);
let outcome = write_manifest(&dest, &m).unwrap();
assert!(matches!(
outcome,
WriteOutcome::Written {
success_marker: false
}
));
assert!(dir.path().join(MANIFEST_FILENAME).exists());
assert!(
!dir.path().join(SUCCESS_FILENAME).exists(),
"_SUCCESS must be absent for Failed status"
);
}
#[test]
fn interrupted_status_writes_manifest_but_not_success_marker() {
let dir = tempfile::tempdir().unwrap();
let dest = local_dest(dir.path());
let m = build_manifest(ManifestStatus::Interrupted);
let outcome = write_manifest(&dest, &m).unwrap();
assert!(matches!(
outcome,
WriteOutcome::Written {
success_marker: false
}
));
assert!(!dir.path().join(SUCCESS_FILENAME).exists());
}
#[test]
fn streaming_destination_skips_manifest() {
use crate::destination::stdout::StdoutDestination;
let dest = StdoutDestination::new().unwrap();
let m = build_manifest(ManifestStatus::Success);
let outcome = write_manifest(&dest, &m).unwrap();
assert!(matches!(outcome, WriteOutcome::SkippedStreaming));
}
fn dummy_summary() -> crate::pipeline::summary::RunSummary {
crate::pipeline::summary::RunSummary::stub_for_testing("r", "orders")
}
fn schema_with(fields: &[(&str, arrow::datatypes::DataType)]) -> arrow::datatypes::Schema {
let f: Vec<arrow::datatypes::Field> = fields
.iter()
.map(|(n, t)| arrow::datatypes::Field::new(*n, t.clone(), false))
.collect();
arrow::datatypes::Schema::new(f)
}
#[test]
fn record_run_schema_fingerprint_sets_field_on_first_call() {
use arrow::datatypes::DataType;
let mut s = dummy_summary();
let schema = schema_with(&[("id", DataType::Int64), ("name", DataType::Utf8)]);
record_run_schema_fingerprint(&mut s, &schema);
let fp = s.schema_fingerprint.as_deref().expect("must be set");
assert!(fp.starts_with("xxh3:"));
assert_eq!(fp.len(), "xxh3:".len() + 16);
}
#[test]
fn record_run_schema_fingerprint_is_idempotent() {
use arrow::datatypes::DataType;
let mut s = dummy_summary();
let schema_a = schema_with(&[("id", DataType::Int64)]);
record_run_schema_fingerprint(&mut s, &schema_a);
let first = s.schema_fingerprint.clone();
let schema_b = schema_with(&[("id", DataType::Int64), ("extra", DataType::Utf8)]);
record_run_schema_fingerprint(&mut s, &schema_b);
assert_eq!(s.schema_fingerprint, first, "later call must not overwrite");
}
#[test]
fn record_run_schema_fingerprint_matches_state_helper_output() {
use arrow::datatypes::DataType;
let mut s = dummy_summary();
let schema = schema_with(&[("id", DataType::Int64), ("email", DataType::Utf8)]);
record_run_schema_fingerprint(&mut s, &schema);
let cols = vec![
crate::state::SchemaColumn {
name: "id".into(),
data_type: format!("{:?}", DataType::Int64),
},
crate::state::SchemaColumn {
name: "email".into(),
data_type: format!("{:?}", DataType::Utf8),
},
];
assert_eq!(
s.schema_fingerprint.unwrap(),
crate::state::schema_fingerprint(&cols)
);
}
#[test]
fn record_run_schema_fingerprint_is_order_insensitive() {
use arrow::datatypes::DataType;
let mut s1 = dummy_summary();
let mut s2 = dummy_summary();
record_run_schema_fingerprint(
&mut s1,
&schema_with(&[("id", DataType::Int64), ("name", DataType::Utf8)]),
);
record_run_schema_fingerprint(
&mut s2,
&schema_with(&[("name", DataType::Utf8), ("id", DataType::Int64)]),
);
assert_eq!(s1.schema_fingerprint, s2.schema_fingerprint);
}
}