use std::path::Path;
use super::manifest_writer;
use super::summary::RunSummary;
use crate::destination::Destination;
use crate::error::Result;
use crate::journal::RunEvent;
use crate::plan::ResolvedRunPlan;
use crate::state::StateStore;
pub(crate) struct PartRecord {
pub file_name: String,
pub rows: i64,
pub bytes: u64,
pub fingerprint: String,
pub md5: String,
}
pub(crate) enum PartKind {
File { part_index: usize },
Chunk { chunk_index: i64 },
Page { page_index: i64 },
}
pub(crate) fn write_part_file(
dest: &dyn Destination,
tmp_path: &Path,
rows: i64,
file_name: String,
) -> Result<PartRecord> {
let bytes = std::fs::metadata(tmp_path).map(|m| m.len()).unwrap_or(0);
let outcome = dest.write(tmp_path, &file_name)?;
let (fingerprint, md5) =
manifest_writer::compute_part_checksums(tmp_path).unwrap_or_else(|e| {
log::warn!("part checksums failed for '{file_name}' (not fatal): {e:#}");
("xxh3:0000000000000000".to_string(), String::new())
});
if let Some(stored) = &outcome.content_md5 {
use crate::pipeline::manifest_reconcile::md5_digest_bytes;
if let (Some(local), Some(remote)) = (md5_digest_bytes(&md5), md5_digest_bytes(stored))
&& local != remote
{
anyhow::bail!(
"upload integrity check failed for '{file_name}': local MD5 differs from \
the store-reported checksum — the part corrupted in transit"
);
}
}
Ok(PartRecord {
file_name,
rows,
bytes,
fingerprint,
md5,
})
}
pub(crate) fn write_sink_parts(
dest: &dyn Destination,
sink: &mut crate::pipeline::sink::ExportSink,
validate: Option<crate::config::FormatType>,
name_for: impl Fn(usize, usize) -> String,
) -> Result<Vec<PartRecord>> {
if let Some(w) = sink.writer.take() {
w.finish()?;
}
if sink.part_rows > 0 {
sink.completed_parts
.push(crate::pipeline::sink::CompletedPart {
tmp: std::mem::replace(&mut sink.tmp, tempfile::NamedTempFile::new()?),
rows: sink.part_rows,
});
sink.part_rows = 0;
}
let count = sink.completed_parts.len();
let mut recs = Vec::with_capacity(count);
for (idx, part) in sink.completed_parts.drain(..).enumerate() {
if let Some(fmt) = validate {
crate::pipeline::validate::validate_output(part.tmp.path(), fmt, part.rows)?;
}
recs.push(write_part_file(
dest,
part.tmp.path(),
part.rows as i64,
name_for(idx, count),
)?);
}
Ok(recs)
}
pub(crate) fn part_indexed_name(base: &str, idx: usize, count: usize) -> String {
if count <= 1 {
return base.to_string();
}
match base.rsplit_once('.') {
Some((stem, ext)) => format!("{stem}_p{idx}.{ext}"),
None => format!("{base}_p{idx}"),
}
}
pub(crate) fn record_part(
plan: &ResolvedRunPlan,
summary: &mut RunSummary,
state: Option<&StateStore>,
part: &PartRecord,
kind: PartKind,
) {
crate::test_hook::maybe_panic_at("after_file_write");
summary.bytes_written += part.bytes;
summary.files_produced += 1;
summary.files_committed += 1;
manifest_writer::record_committed_part_with_fingerprint(
summary,
part.file_name.clone(),
part.rows,
part.bytes,
part.fingerprint.clone(),
part.md5.clone(),
);
match kind {
PartKind::File { part_index } => summary.journal.record(RunEvent::FileWritten {
file_name: part.file_name.clone(),
rows: part.rows,
bytes: part.bytes,
part_index,
}),
PartKind::Chunk { chunk_index } => summary.journal.record(RunEvent::ChunkCompleted {
chunk_index,
rows: part.rows,
file_name: Some(part.file_name.clone()),
}),
PartKind::Page { page_index } => summary.journal.record(RunEvent::ChunkCompleted {
chunk_index: page_index,
rows: part.rows,
file_name: Some(part.file_name.clone()),
}),
}
if let Some(st) = state
&& let Err(e) = st.record_file(
&summary.run_id,
&plan.export_name,
&part.file_name,
part.rows,
part.bytes as i64,
plan.format.label(),
Some(plan.compression.label()),
)
{
log::warn!(
"export '{}': file_log write failed for '{}' (file was produced): {:#}",
plan.export_name,
part.file_name,
e
);
}
crate::test_hook::maybe_panic_at("after_manifest_update");
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CompressionType, DestinationConfig, DestinationType, FormatType, SourceConfig, SourceType,
};
use crate::destination::local::LocalDestination;
use crate::journal::RunEvent;
use crate::pipeline::summary::RunSummary;
use crate::plan::{ExtractionStrategy, ResolvedRunPlan};
use crate::state::StateStore;
use crate::tuning::SourceTuning;
use std::io::Write;
fn test_plan() -> ResolvedRunPlan {
ResolvedRunPlan {
export_name: "orders".into(),
base_query: "SELECT 1".into(),
strategy: ExtractionStrategy::Snapshot,
format: FormatType::Parquet,
compression: CompressionType::None,
compression_level: None,
max_file_size_bytes: None,
skip_empty: false,
meta_columns: Default::default(),
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("/tmp".into()),
..Default::default()
},
quality: None,
tuning: SourceTuning::from_config(None),
tuning_profile_label: "balanced".into(),
validate: false,
reconcile: false,
resume: false,
source: SourceConfig {
source_type: SourceType::Postgres,
url: Some("postgresql://nobody@127.0.0.1:9999/nonexistent".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
},
column_overrides: Default::default(),
verify: crate::config::VerifyMode::Size,
schema_drift_policy: Default::default(),
shape_drift_warn_factor: 0.0,
parquet: None,
}
}
fn test_summary(plan: &ResolvedRunPlan) -> RunSummary {
let mut s = RunSummary::stub_for_testing("test_run", plan.export_name.clone());
s.batch_size = 10_000;
s.mode = "snapshot".into();
s.compression = "none".into();
s
}
fn test_part(file_name: &str) -> PartRecord {
PartRecord {
file_name: file_name.into(),
rows: 42,
bytes: 1024,
fingerprint: "xxh3:1234567890abcdef".into(),
md5: String::new(),
}
}
#[test]
fn part_indexed_name_keeps_legacy_name_for_single_part_and_suffixes_siblings() {
assert_eq!(
part_indexed_name("orders_ts_chunk3.parquet", 0, 1),
"orders_ts_chunk3.parquet"
);
assert_eq!(
part_indexed_name("orders_ts_chunk3.parquet", 0, 3),
"orders_ts_chunk3_p0.parquet"
);
assert_eq!(
part_indexed_name("orders_ts_chunk3.parquet", 2, 3),
"orders_ts_chunk3_p2.parquet"
);
assert_eq!(part_indexed_name("orders_chunk3", 1, 2), "orders_chunk3_p1");
}
#[test]
fn write_part_file_copies_to_destination_and_returns_real_bytes_and_fingerprint() {
let src_dir = tempfile::tempdir().unwrap();
let dst_dir = tempfile::tempdir().unwrap();
let src_path = src_dir.path().join("part.parquet");
let payload: &[u8] = b"hello rivet";
std::fs::File::create(&src_path)
.unwrap()
.write_all(payload)
.unwrap();
let dest = LocalDestination::new(&DestinationConfig {
destination_type: DestinationType::Local,
path: Some(dst_dir.path().to_string_lossy().into_owned()),
..Default::default()
})
.unwrap();
let rec =
write_part_file(&dest, &src_path, 7, "out/part.parquet".into()).expect("write ok");
assert_eq!(rec.file_name, "out/part.parquet");
assert_eq!(rec.rows, 7);
assert_eq!(rec.bytes, payload.len() as u64);
assert!(
rec.fingerprint.starts_with("xxh3:") && rec.fingerprint.len() == 21,
"fingerprint should be xxh3:<16 hex chars>, got {:?}",
rec.fingerprint
);
let written = dst_dir.path().join("out").join("part.parquet");
assert_eq!(std::fs::read(&written).unwrap(), payload);
}
struct ChecksumDest(Option<String>);
impl crate::destination::Destination for ChecksumDest {
fn write(&self, _p: &Path, _k: &str) -> Result<crate::destination::WriteOutcome> {
Ok(crate::destination::WriteOutcome {
content_md5: self.0.clone(),
})
}
fn capabilities(&self) -> crate::destination::DestinationCapabilities {
crate::destination::DestinationCapabilities {
commit_protocol: crate::destination::WriteCommitProtocol::FinalizeOnClose,
idempotent_overwrite: true,
retry_safe: true,
partial_write_risk: false,
}
}
}
fn stage(payload: &[u8]) -> (tempfile::TempDir, std::path::PathBuf) {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("part.parquet");
std::fs::write(&p, payload).unwrap();
(dir, p)
}
#[test]
fn transit_check_bails_when_store_checksum_differs() {
let (_d, src) = stage(b"hello rivet");
let dest = ChecksumDest(Some("AAAAAAAAAAAAAAAAAAAAAA==".into()));
match write_part_file(&dest, &src, 1, "part.parquet".into()) {
Ok(_) => panic!("mismatched checksum must fail"),
Err(e) => assert!(
e.to_string().contains("transit"),
"expected a transit-corruption error, got: {e}"
),
}
}
#[test]
fn transit_check_passes_on_match_and_when_store_is_silent() {
use base64::Engine as _;
use md5::{Digest, Md5};
let payload = b"hello rivet";
let (_d, src) = stage(payload);
let mut h = Md5::new();
h.update(payload);
let real = base64::engine::general_purpose::STANDARD.encode(h.finalize());
write_part_file(&ChecksumDest(Some(real)), &src, 1, "p.parquet".into())
.expect("matching checksum passes");
write_part_file(&ChecksumDest(None), &src, 1, "p.parquet".into())
.expect("silent store passes");
}
#[test]
fn record_part_file_kind_bumps_counters_and_journals_file_written() {
let plan = test_plan();
let mut summary = test_summary(&plan);
let part = test_part("orders_chunk0.parquet");
record_part(
&plan,
&mut summary,
None,
&part,
PartKind::File { part_index: 0 },
);
assert_eq!(summary.bytes_written, part.bytes);
assert_eq!(summary.files_produced, 1);
assert_eq!(summary.files_committed, 1);
assert_eq!(summary.manifest_parts.len(), 1);
assert_eq!(summary.manifest_parts[0].path, part.file_name);
assert_eq!(summary.manifest_parts[0].rows, part.rows);
let file_events = summary.journal.files();
assert_eq!(file_events.len(), 1, "must journal one FileWritten");
assert!(
matches!(
&file_events[0].event,
RunEvent::FileWritten { part_index: 0, .. }
),
"expected FileWritten{{part_index:0}}, got {:?}",
file_events[0].event
);
assert!(
summary.journal.chunk_events().is_empty(),
"File kind must not journal ChunkCompleted"
);
}
#[test]
fn record_part_chunk_kind_journals_chunk_completed_with_file_name() {
let plan = test_plan();
let mut summary = test_summary(&plan);
let part = test_part("orders_chunk7.parquet");
record_part(
&plan,
&mut summary,
None,
&part,
PartKind::Chunk { chunk_index: 7 },
);
let events = summary.journal.chunk_events();
assert_eq!(events.len(), 1, "must journal one ChunkCompleted");
match &events[0].event {
RunEvent::ChunkCompleted {
chunk_index,
rows,
file_name,
} => {
assert_eq!(*chunk_index, 7);
assert_eq!(*rows, part.rows);
assert_eq!(file_name.as_deref(), Some(part.file_name.as_str()));
}
other => panic!("expected ChunkCompleted, got {other:?}"),
}
assert!(
summary.journal.files().is_empty(),
"Chunk kind must not journal FileWritten"
);
}
#[test]
fn record_part_with_state_writes_file_log_entry() {
let plan = test_plan();
let mut summary = test_summary(&plan);
let state = StateStore::open_in_memory().expect("in-memory state");
let part = test_part("orders_chunk0.parquet");
record_part(
&plan,
&mut summary,
Some(&state),
&part,
PartKind::Chunk { chunk_index: 0 },
);
let files = state.get_files(Some(&plan.export_name), 16).unwrap();
assert_eq!(files.len(), 1, "I7: file_log must carry exactly one entry");
assert_eq!(files[0].file_name, part.file_name);
assert_eq!(files[0].row_count, part.rows);
}
#[test]
fn record_part_with_none_state_is_a_bypass_not_a_failure() {
let plan = test_plan();
let mut summary = test_summary(&plan);
let part = test_part("orders_chunk0.parquet");
record_part(
&plan,
&mut summary,
None,
&part,
PartKind::Chunk { chunk_index: 0 },
);
assert_eq!(summary.files_committed, 1);
assert_eq!(summary.manifest_parts.len(), 1);
assert_eq!(summary.journal.chunk_events().len(), 1);
}
fn synthetic_parts(n: usize) -> Vec<PartRecord> {
(0..n)
.map(|i| PartRecord {
file_name: format!("part_{i}.parquet"),
rows: 100 + (i as i64) * 10,
bytes: 1024 * ((i as u64) + 1),
fingerprint: format!("xxh3:{i:016x}"),
md5: String::new(),
})
.collect()
}
#[test]
fn record_part_keeps_summary_aggregates_coherent_with_manifest_parts() {
let plan = test_plan();
let mut summary = test_summary(&plan);
let parts = synthetic_parts(5);
for (i, p) in parts.iter().enumerate() {
summary.total_rows += p.rows;
record_part(
&plan,
&mut summary,
None,
p,
PartKind::Chunk {
chunk_index: i as i64,
},
);
}
let parts_rows: i64 = summary.manifest_parts.iter().map(|p| p.rows).sum();
let parts_bytes: u64 = summary.manifest_parts.iter().map(|p| p.size_bytes).sum();
assert_eq!(
summary.total_rows, parts_rows,
"non-resume run: summary.total_rows must equal sum(manifest_parts.rows)"
);
assert_eq!(
summary.bytes_written, parts_bytes,
"non-resume run: summary.bytes_written must equal sum(manifest_parts.size_bytes)"
);
assert_eq!(
summary.files_produced,
summary.manifest_parts.len(),
"files_produced must equal manifest_parts.len() (record_part bumps both)"
);
assert_eq!(
summary.files_committed,
summary.manifest_parts.len(),
"files_committed must equal manifest_parts.len() (record_part bumps both)"
);
}
#[test]
fn nonempty_successful_run_must_have_nonempty_manifest_parts() {
let plan = test_plan();
let mut summary = test_summary(&plan);
let part = test_part("orders_chunk0.parquet");
summary.total_rows += part.rows;
record_part(
&plan,
&mut summary,
None,
&part,
PartKind::Chunk { chunk_index: 0 },
);
summary.status = "success".into();
assert!(summary.files_committed > 0, "test premise: work committed");
assert!(
!summary.manifest_parts.is_empty(),
"non-empty success run must surface files in manifest_parts"
);
assert_eq!(
summary.files_committed,
summary.manifest_parts.len(),
"files_committed and manifest_parts.len() locked together by record_part"
);
}
}