use std::path::Path;
use crate::config::{Config, ExportConfig};
use crate::error::{DataIntegrityError, Result};
use crate::plan::{
DiagnosticLevel, ExtractionStrategy, ResolvedRunPlan, build_plan, validate_plan,
};
use crate::state::StateStore;
use super::RunOptions;
use super::chunked::{self, run_chunked_parallel_checkpoint};
use super::single::run_with_reconnect;
use super::summary::RunSummary;
use crate::journal::RunEvent;
fn build_metric_row(
summary: &RunSummary,
plan: &ResolvedRunPlan,
tuning_class: &str,
) -> crate::state::MetricRow {
let (chunk_size, parallel) = match &plan.strategy {
crate::plan::ExtractionStrategy::Chunked(cp) => {
(Some(cp.chunk_size as i64), Some(cp.parallel as i64))
}
_ => (None, None),
};
crate::state::MetricRow {
export_name: summary.export_name.clone(),
run_id: summary.run_id.clone(),
duration_ms: summary.duration_ms,
total_rows: summary.total_rows,
peak_rss_mb: Some(summary.peak_rss_mb),
status: summary.status.clone(),
error_message: summary.error_message.clone(),
tuning_profile: Some(tuning_class.to_string()),
format: Some(summary.format.clone()),
mode: Some(summary.mode.clone()),
files_produced: summary.files_produced as i64,
bytes_written: summary.bytes_written as i64,
retries: summary.retries as i64,
validated: summary.validated,
schema_changed: summary.schema_changed,
files_committed: summary.files_committed as i64,
reconciled: summary.reconciled,
source_count: summary.source_count,
quality_passed: summary.quality_passed,
pg_temp_bytes_delta: summary.pg_temp_bytes_delta,
batch_size: summary.batch_size as i64,
batch_size_memory_mb: summary.batch_size_memory_mb.map(|m| m as i64),
skip_reason: summary.skip_reason.clone(),
schema_fingerprint: summary.schema_fingerprint.clone(),
chunk_size,
parallel,
source_type: Some(format!("{:?}", plan.source.source_type).to_lowercase()),
destination_type: Some(plan.destination.destination_type.label().to_string()),
rivet_version: Some(env!("CARGO_PKG_VERSION").to_string()),
longest_chunk_ms: summary.journal.longest_chunk_ms(),
}
}
fn run_chunked_quality_gate(
result: Result<()>,
plan: &ResolvedRunPlan,
summary: &mut RunSummary,
) -> Result<()> {
result?;
if !matches!(plan.strategy, ExtractionStrategy::Chunked(_)) {
return Ok(());
}
let qc = match &plan.quality {
Some(q) => q,
None => return Ok(()),
};
let total = summary.total_rows as usize;
let row_issues = crate::quality::check_row_count(total, qc);
let has_unsupported = !qc.null_ratio_max.is_empty() || !qc.unique_columns.is_empty();
if has_unsupported {
log::warn!(
"export '{}': quality checks null_ratio_max and unique_columns are not supported in chunked mode (each chunk processes independently); only row_count bounds are checked",
plan.export_name
);
}
if !row_issues.is_empty() {
for issue in &row_issues {
log::warn!("quality FAIL: {}", issue.message);
}
summary.quality_passed = Some(false);
let fails: Vec<&str> = row_issues.iter().map(|i| i.message.as_str()).collect();
return Err(DataIntegrityError::new(crate::quality::failure_message(
&plan.export_name,
Some("chunked aggregate"),
&fails,
))
.into());
}
summary.quality_passed = Some(true);
Ok(())
}
fn pg_temp_bytes_snapshot(plan: &ResolvedRunPlan) -> Option<i64> {
if !matches!(plan.source.source_type, crate::config::SourceType::Postgres) {
return None;
}
let url = plan.source.resolve_url().ok()?;
crate::source::postgres::sample_temp_bytes(&url, plan.source.tls.as_ref())
}
fn harm_snapshot(plan: &ResolvedRunPlan) -> Option<Vec<(String, i64)>> {
let url = plan.source.resolve_url().ok()?;
let tls = plan.source.tls.as_ref();
match plan.source.source_type {
crate::config::SourceType::Postgres => {
crate::source::postgres::sample_harm_counters(&url, tls)
}
crate::config::SourceType::Mysql => crate::source::mysql::sample_harm_counters(&url, tls),
crate::config::SourceType::Mssql => crate::source::mssql::sample_harm_counters(&url, tls),
}
}
fn harm_deltas(before: &[(String, i64)], after: &[(String, i64)]) -> Vec<(String, i64)> {
let bmap: std::collections::HashMap<&str, i64> =
before.iter().map(|(k, v)| (k.as_str(), *v)).collect();
after
.iter()
.filter_map(|(k, after_v)| {
bmap.get(k.as_str())
.map(|b| (k.clone(), (after_v - b).max(0)))
})
.collect()
}
fn reconcile_source_count(plan: &ResolvedRunPlan, summary: &mut RunSummary) {
if let Some(col) = plan.strategy.cursor_column() {
log::info!(
"reconcile: skipping for incremental export '{}' (cursor column '{}', count may differ)",
plan.export_name,
col
);
return;
}
let count_sql = format!(
"SELECT COUNT(*) FROM ({}) AS _rivet_reconcile",
plan.base_query
);
log::info!(
"reconcile: running source count query for '{}'",
plan.export_name
);
let mut src = match crate::source::create_source(&plan.source) {
Ok(s) => s,
Err(e) => {
log::warn!("reconcile: could not connect to source: {:#}", e);
return;
}
};
match src.query_scalar(&count_sql) {
Ok(Some(val)) => {
if let Ok(count) = val.parse::<i64>() {
summary.source_count = Some(count);
let committed_rows: i64 = summary.manifest_parts.iter().map(|p| p.rows).sum();
let exported_total = if committed_rows > 0 {
committed_rows
} else {
summary.total_rows
};
summary.reconciled = Some(exported_total == count);
if exported_total != count {
log::warn!(
"reconcile MISMATCH for '{}': committed {} rows, source has {}",
plan.export_name,
exported_total,
count
);
} else {
log::info!(
"reconcile MATCH for '{}': {}/{}",
plan.export_name,
exported_total,
count
);
}
} else {
log::warn!(
"reconcile: could not parse count result '{}' as integer",
val
);
}
}
Ok(None) => {
log::warn!(
"reconcile: COUNT(*) returned NULL for '{}'",
plan.export_name
);
}
Err(e) => {
log::warn!(
"reconcile: count query failed for '{}': {:#}",
plan.export_name,
e
);
}
}
}
pub(crate) fn synthetic_failed_summary(export_name: &str, err: &anyhow::Error) -> RunSummary {
let run_id = format!(
"{}_{}",
export_name,
chrono::Utc::now().format("%Y%m%dT%H%M%S%3f"),
);
let journal = crate::journal::RunJournal::new(&run_id, export_name);
RunSummary {
run_id,
export_name: export_name.to_string(),
status: "failed".into(),
total_rows: 0,
files_produced: 0,
bytes_written: 0,
files_committed: 0,
duration_ms: 0,
peak_rss_mb: 0,
retries: 0,
validated: None,
schema_changed: None,
quality_passed: None,
error_message: Some(crate::redact::redact_error(err)),
tuning_profile: "balanced (default)".into(),
batch_size: 0,
batch_size_memory_mb: None,
format: String::new(),
mode: String::new(),
compression: String::new(),
destination_uri: None,
source_count: None,
pg_temp_bytes_delta: None,
skip_reason: None,
reconciled: None,
manifest_parts: Vec::new(),
schema_fingerprint: None,
manifest_verification: None,
apply_context: None,
journal,
}
}
pub(super) fn run_export_job(
config_path: &str,
config: &Config,
export: &ExportConfig,
state: &StateStore,
config_dir: &Path,
opts: &RunOptions<'_>,
) -> (Result<()>, RunSummary) {
if export.mode == crate::config::ExportMode::Cdc {
return super::cdc_job::run_cdc_export(config_path, config, export, state);
}
let plan = match build_plan(
config,
export,
config_dir,
opts.validate,
opts.reconcile,
opts.resume,
opts.params,
) {
Ok(p) => p,
Err(e) => {
let summary = synthetic_failed_summary(&export.name, &e);
return (Err(e), summary);
}
};
let diags = validate_plan(&plan);
let mut rejected: Vec<String> = Vec::new();
for d in &diags {
match d.level {
DiagnosticLevel::Rejected => {
log::error!("[{}] plan validation rejected: {}", d.rule, d.message);
rejected.push(d.message.clone());
}
DiagnosticLevel::Warning => {
log::warn!("[{}] plan validation warning: {}", d.rule, d.message);
}
DiagnosticLevel::Degraded => {
log::info!("[{}] plan validation degraded: {}", d.rule, d.message);
}
}
}
if !rejected.is_empty() {
let err = anyhow::anyhow!(
"export '{}': plan validation failed:\n {}",
plan.export_name,
rejected.join("\n ")
);
let summary = synthetic_failed_summary(&export.name, &err);
return (Err(err), summary);
}
if opts.resume
&& !opts.force
&& let Err(e) = check_success_gate_for_resume(&plan)
{
let summary = synthetic_failed_summary(&export.name, &e);
return (Err(e), summary);
}
if !opts.resume && !opts.force {
warn_if_prefix_has_completed_run(&plan);
}
log::info!(
"starting export '{}' (effective tuning: {})",
plan.export_name,
plan.tuning
);
let start = std::time::Instant::now();
let rss_before = crate::resource::get_rss_mb();
let rss_sampler = crate::resource::RssPeakSampler::start(rss_before, 100);
let mut summary = RunSummary::new(&plan);
let pg_temp_bytes_before = pg_temp_bytes_snapshot(&plan);
let harm_before = harm_snapshot(&plan);
for d in &diags {
if matches!(
d.level,
DiagnosticLevel::Warning | DiagnosticLevel::Degraded
) {
summary.journal.record(RunEvent::PlanWarning {
rule: d.rule.to_string(),
message: d.message.clone(),
});
}
}
let result = if plan.strategy.requires_parallel_execution() {
if plan.strategy.is_resumable() {
run_chunked_parallel_checkpoint(
config_path,
state,
&plan,
&mut summary,
chunked::ChunkSource::Detect,
)
} else {
chunked::run_chunked_parallel(state, &plan, &mut summary, chunked::ChunkSource::Detect)
}
} else {
run_with_reconnect(state, &plan, &mut summary, config_path)
};
let rss_peak = rss_sampler.stop();
let rss_after = crate::resource::get_rss_mb();
summary.duration_ms = start.elapsed().as_millis() as i64;
summary.peak_rss_mb = rss_peak.max(rss_after).max(rss_before) as i64;
if let Some(before) = pg_temp_bytes_before
&& let Some(after) = pg_temp_bytes_snapshot(&plan)
{
let delta = (after - before).max(0);
summary.pg_temp_bytes_delta = Some(delta);
if delta > 100 * 1024 * 1024 {
log::warn!(
"export '{}': PG temp_bytes spill +{:.1} MB during run — cursor / sort overflow. \
Consider lowering `tuning.batch_size` or setting `tuning.batch_size_memory_mb` \
below PG's `work_mem`.",
plan.export_name,
delta as f64 / (1024.0 * 1024.0),
);
}
}
if let Some(before) = &harm_before
&& let Some(after) = harm_snapshot(&plan)
{
let deltas = harm_deltas(before, &after);
if let Err(e) = state.record_harm(&summary.run_id, &summary.export_name, &deltas) {
log::debug!(
"export '{}': harm metrics write failed (informational): {:#}",
summary.export_name,
e
);
}
}
let tuning_class = plan.tuning.profile_name().to_string();
let result = run_chunked_quality_gate(result, &plan, &mut summary);
let failed = result.is_err();
match &result {
Ok(()) => {
if summary.status == "running" {
summary.status = "success".into();
}
}
Err(e) => {
summary.status = "failed".into();
let redacted = crate::redact::redact_error(e);
summary.error_message = Some(redacted.clone());
log::error!("export '{}' failed: {}", plan.export_name, redacted);
}
}
if plan.reconcile && !failed {
reconcile_source_count(&plan, &mut summary);
if let (Some(source_count), Some(matched)) = (summary.source_count, summary.reconciled) {
summary.journal.record(RunEvent::ReconciliationResult {
source_count,
exported_rows: summary.total_rows,
matched,
});
}
}
summary.journal.record(RunEvent::RunCompleted {
status: summary.status.clone(),
error_message: summary.error_message.clone(),
duration_ms: summary.duration_ms,
});
if let Err(e) = state.store_journal(&summary.journal) {
log::warn!(
"export '{}': journal persist failed (run history not stored): {:#}",
summary.export_name,
e
);
}
summary.print();
finalize_manifest(&plan, state, &summary, "export");
if plan.validate {
finalize_validate_manifest(&plan, &mut summary, "export");
}
if let Err(e) = state.record_metric_full(&build_metric_row(&summary, &plan, &tuning_class)) {
log::warn!(
"export '{}': metrics write failed (run outcome not stored): {:#}",
summary.export_name,
e
);
}
finalize_run_report(config_path, &summary, "export");
crate::notify::maybe_send(config.notifications.as_ref(), &summary);
let final_result = if failed { result } else { Ok(()) };
(final_result, summary)
}
use super::finalize::{
check_success_gate_for_resume, finalize_manifest, finalize_run_report,
finalize_validate_manifest, warn_if_prefix_has_completed_run,
};
pub(crate) fn run_export_job_with_chunk_source(
plan: &ResolvedRunPlan,
state: &StateStore,
chunk_source: chunked::ChunkSource,
config_path: &str,
apply_context: Option<crate::pipeline::summary::ApplyContext>,
) -> Result<()> {
let diags = validate_plan(plan);
for d in &diags {
match d.level {
DiagnosticLevel::Rejected => {
anyhow::bail!(
"export '{}': plan validation rejected: {}",
plan.export_name,
d.message
);
}
DiagnosticLevel::Warning => {
log::warn!("[{}] plan validation warning: {}", d.rule, d.message);
}
DiagnosticLevel::Degraded => {
log::info!("[{}] plan validation degraded: {}", d.rule, d.message);
}
}
}
log::info!(
"apply: starting export '{}' (tuning: {})",
plan.export_name,
plan.tuning
);
let start = std::time::Instant::now();
let rss_before = crate::resource::get_rss_mb();
let rss_sampler = crate::resource::RssPeakSampler::start(rss_before, 100);
let mut summary = RunSummary::new(plan);
summary.apply_context = apply_context;
let result = if plan.strategy.requires_parallel_execution() {
if plan.strategy.is_resumable() {
run_chunked_parallel_checkpoint("", state, plan, &mut summary, chunk_source)
} else {
chunked::run_chunked_parallel(state, plan, &mut summary, chunk_source)
}
} else {
run_with_reconnect(state, plan, &mut summary, "")
};
let rss_peak = rss_sampler.stop();
let rss_after = crate::resource::get_rss_mb();
summary.duration_ms = start.elapsed().as_millis() as i64;
summary.peak_rss_mb = rss_peak.max(rss_after).max(rss_before) as i64;
let tuning_class = plan.tuning.profile_name().to_string();
let result = run_chunked_quality_gate(result, plan, &mut summary);
let failed = result.is_err();
match &result {
Ok(()) => {
if summary.status == "running" {
summary.status = "success".into();
}
}
Err(e) => {
summary.status = "failed".into();
let redacted = crate::redact::redact_error(e);
summary.error_message = Some(redacted.clone());
log::error!("apply '{}' failed: {}", plan.export_name, redacted);
}
}
summary.print();
finalize_manifest(plan, state, &summary, "apply");
if plan.validate {
finalize_validate_manifest(plan, &mut summary, "apply");
}
if let Err(e) = state.record_metric_full(&build_metric_row(&summary, plan, &tuning_class)) {
log::warn!(
"apply '{}': metrics write failed: {:#}",
summary.export_name,
e
);
}
finalize_run_report(config_path, &summary, "apply");
if failed { result } else { Ok(()) }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn synthetic_failed_summary_fields() {
let err = anyhow::anyhow!("connection refused");
let summary = synthetic_failed_summary("my_export", &err);
assert_eq!(summary.export_name, "my_export");
assert_eq!(summary.status, "failed");
assert_eq!(summary.total_rows, 0);
assert_eq!(summary.files_produced, 0);
assert_eq!(summary.bytes_written, 0);
assert!(
summary
.error_message
.as_ref()
.unwrap()
.contains("connection refused")
);
}
#[test]
fn synthetic_failed_summary_run_id_contains_export_name() {
let err = anyhow::anyhow!("boom");
let summary = synthetic_failed_summary("orders", &err);
assert!(
summary.run_id.starts_with("orders_"),
"run_id was: {}",
summary.run_id
);
}
#[test]
fn synthetic_failed_summary_journal_is_empty() {
let err = anyhow::anyhow!("boom");
let summary = synthetic_failed_summary("orders", &err);
assert!(summary.journal.entries.is_empty());
}
#[test]
fn synthetic_failed_summary_no_quality_or_reconcile_state() {
let err = anyhow::anyhow!("boom");
let summary = synthetic_failed_summary("orders", &err);
assert!(summary.quality_passed.is_none());
assert!(summary.reconciled.is_none());
assert!(summary.validated.is_none());
}
use crate::config::QualityConfig;
use crate::config::{
CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
SourceType,
};
use crate::plan::{ChunkedPlan, ExtractionStrategy, ResolvedRunPlan};
use crate::tuning::SourceTuning;
fn chunked_plan_with_quality(quality: Option<QualityConfig>) -> ResolvedRunPlan {
ResolvedRunPlan {
export_name: "orders".into(),
base_query: "SELECT id FROM orders".into(),
strategy: ExtractionStrategy::Chunked(ChunkedPlan {
column: "id".into(),
chunk_size: 100,
chunk_count: None,
parallel: 1,
dense: false,
by_days: None,
checkpoint: false,
max_attempts: 3,
}),
format: FormatType::Parquet,
compression: CompressionType::None,
compression_level: None,
max_file_size_bytes: None,
skip_empty: false,
meta_columns: MetaColumns::default(),
destination: DestinationConfig {
destination_type: DestinationType::Local,
path: Some("/tmp".into()),
..Default::default()
},
quality,
tuning: SourceTuning::from_config(None),
tuning_profile_label: "balanced".into(),
validate: false,
reconcile: false,
resume: false,
source: SourceConfig {
source_type: SourceType::Postgres,
url: Some("postgresql://nobody@127.0.0.1:9999/x".into()),
url_env: None,
url_file: None,
host: None,
port: None,
user: None,
password: None,
password_env: None,
database: None,
environment: None,
tuning: None,
tls: None,
},
column_overrides: Default::default(),
verify: crate::config::VerifyMode::Size,
schema_drift_policy: Default::default(),
shape_drift_warn_factor: 0.0,
parquet: None,
}
}
fn fresh_summary(plan: &ResolvedRunPlan, total_rows: i64) -> RunSummary {
let mut s = RunSummary::stub_for_testing("r", plan.export_name.clone());
s.total_rows = total_rows;
s.batch_size = 10_000;
s.mode = "chunked".into();
s.compression = "none".into();
s
}
#[test]
fn chunked_quality_gate_passes_through_existing_error() {
let plan = chunked_plan_with_quality(None);
let mut summary = fresh_summary(&plan, 0);
let result = run_chunked_quality_gate(
Err(anyhow::anyhow!("chunk 3 failed to write")),
&plan,
&mut summary,
);
let err = result.unwrap_err();
assert!(
err.to_string().contains("chunk 3 failed"),
"must propagate original error: {err}"
);
assert!(summary.quality_passed.is_none());
}
#[test]
fn chunked_quality_gate_no_quality_config_marks_no_decision() {
let plan = chunked_plan_with_quality(None);
let mut summary = fresh_summary(&plan, 5_000);
run_chunked_quality_gate(Ok(()), &plan, &mut summary).expect("must pass");
assert!(summary.quality_passed.is_none());
}
#[test]
fn chunked_quality_gate_row_count_within_bounds_passes() {
let plan = chunked_plan_with_quality(Some(QualityConfig {
row_count_min: Some(100),
row_count_max: Some(10_000),
null_ratio_max: Default::default(),
unique_columns: Vec::new(),
unique_max_entries: None,
}));
let mut summary = fresh_summary(&plan, 5_000);
run_chunked_quality_gate(Ok(()), &plan, &mut summary).expect("in bounds must pass");
assert_eq!(summary.quality_passed, Some(true));
}
#[test]
fn chunked_quality_gate_row_count_below_min_fails() {
let plan = chunked_plan_with_quality(Some(QualityConfig {
row_count_min: Some(100),
row_count_max: None,
null_ratio_max: Default::default(),
unique_columns: Vec::new(),
unique_max_entries: None,
}));
let mut summary = fresh_summary(&plan, 42);
let err =
run_chunked_quality_gate(Ok(()), &plan, &mut summary).expect_err("below min must fail");
let msg = err.to_string();
assert!(
msg.contains("quality check(s) failed") && msg.contains("chunked aggregate"),
"error must name the failed quality gate: {err}"
);
assert!(
msg.contains(" - "),
"error must surface the specific failing check(s), not just a generic message: {err}"
);
assert!(
err.downcast_ref::<DataIntegrityError>().is_some(),
"chunked quality-gate failure must be a typed data-integrity error"
);
assert_eq!(crate::error::classify_exit(&err), 3);
assert_eq!(summary.quality_passed, Some(false));
}
#[test]
fn chunked_quality_gate_row_count_above_max_fails() {
let plan = chunked_plan_with_quality(Some(QualityConfig {
row_count_min: None,
row_count_max: Some(1_000),
null_ratio_max: Default::default(),
unique_columns: Vec::new(),
unique_max_entries: None,
}));
let mut summary = fresh_summary(&plan, 50_000);
let err =
run_chunked_quality_gate(Ok(()), &plan, &mut summary).expect_err("above max must fail");
assert!(err.to_string().contains("quality"), "error: {err}");
assert_eq!(summary.quality_passed, Some(false));
}
#[test]
fn chunked_quality_gate_skips_unsupported_checks_with_warning() {
let plan = chunked_plan_with_quality(Some(QualityConfig {
row_count_min: Some(10),
row_count_max: None,
null_ratio_max: [("name".into(), 0.1)].into_iter().collect(),
unique_columns: vec!["id".into()],
unique_max_entries: None,
}));
let mut summary = fresh_summary(&plan, 1_000);
run_chunked_quality_gate(Ok(()), &plan, &mut summary)
.expect("unsupported checks must not fail in chunked mode");
assert_eq!(summary.quality_passed, Some(true));
}
#[test]
fn chunked_quality_gate_inactive_on_non_chunked_strategy() {
let mut plan = chunked_plan_with_quality(Some(QualityConfig {
row_count_min: Some(99_999), row_count_max: None,
null_ratio_max: Default::default(),
unique_columns: Vec::new(),
unique_max_entries: None,
}));
plan.strategy = ExtractionStrategy::Snapshot;
let mut summary = fresh_summary(&plan, 10);
run_chunked_quality_gate(Ok(()), &plan, &mut summary)
.expect("non-chunked strategy must skip the gate");
assert!(summary.quality_passed.is_none());
}
#[test]
fn build_metric_row_maps_every_summary_and_plan_field() {
let mut summary = RunSummary::stub_for_testing("run-bmr", "orders");
summary.duration_ms = 1234;
summary.total_rows = 50_000;
summary.peak_rss_mb = 142;
summary.status = "success".into();
summary.error_message = Some("boom".into());
summary.format = "parquet".into();
summary.mode = "chunked".into();
summary.files_produced = 7;
summary.bytes_written = 4096;
summary.retries = 2;
summary.validated = Some(true);
summary.schema_changed = Some(false);
summary.files_committed = 6; summary.reconciled = Some(true);
summary.source_count = Some(49_999); summary.quality_passed = Some(true);
summary.pg_temp_bytes_delta = Some(1_048_576);
summary.batch_size = 32_000;
summary.batch_size_memory_mb = Some(256);
summary.skip_reason = Some("manual".into());
summary.schema_fingerprint = Some("fp-abc".into());
summary.journal.push_test_chunk_span(0, 640);
let mut plan = chunked_plan_with_quality(None);
plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
column: "id".into(),
chunk_size: 100_000,
chunk_count: None,
parallel: 4,
dense: false,
by_days: None,
checkpoint: false,
max_attempts: 3,
});
let crate::state::MetricRow {
export_name,
run_id,
duration_ms,
total_rows,
peak_rss_mb,
status,
error_message,
tuning_profile,
format,
mode,
files_produced,
bytes_written,
retries,
validated,
schema_changed,
files_committed,
reconciled,
source_count,
quality_passed,
pg_temp_bytes_delta,
batch_size,
batch_size_memory_mb,
skip_reason,
schema_fingerprint,
chunk_size,
parallel,
source_type,
destination_type,
rivet_version,
longest_chunk_ms,
} = build_metric_row(&summary, &plan, "safe");
assert_eq!(export_name, "orders");
assert_eq!(run_id, "run-bmr");
assert_eq!(duration_ms, 1234);
assert_eq!(total_rows, 50_000);
assert_eq!(peak_rss_mb, Some(142));
assert_eq!(status, "success");
assert_eq!(error_message.as_deref(), Some("boom"));
assert_eq!(tuning_profile.as_deref(), Some("safe")); assert_eq!(format.as_deref(), Some("parquet"));
assert_eq!(mode.as_deref(), Some("chunked"));
assert_eq!(files_produced, 7);
assert_eq!(bytes_written, 4096);
assert_eq!(retries, 2);
assert_eq!(validated, Some(true));
assert_eq!(schema_changed, Some(false));
assert_eq!(files_committed, 6);
assert_eq!(reconciled, Some(true));
assert_eq!(source_count, Some(49_999));
assert_eq!(quality_passed, Some(true));
assert_eq!(pg_temp_bytes_delta, Some(1_048_576));
assert_eq!(batch_size, 32_000);
assert_eq!(batch_size_memory_mb, Some(256));
assert_eq!(skip_reason.as_deref(), Some("manual"));
assert_eq!(schema_fingerprint.as_deref(), Some("fp-abc"));
assert_eq!(chunk_size, Some(100_000));
assert_eq!(parallel, Some(4));
assert_eq!(source_type.as_deref(), Some("postgres"));
assert_eq!(destination_type.as_deref(), Some("local"));
assert_eq!(rivet_version.as_deref(), Some(env!("CARGO_PKG_VERSION")));
assert_eq!(longest_chunk_ms, Some(640));
assert_eq!(longest_chunk_ms, summary.journal.longest_chunk_ms());
}
#[test]
fn build_metric_row_non_chunked_has_no_chunk_dims() {
let mut plan = chunked_plan_with_quality(None);
plan.strategy = ExtractionStrategy::Snapshot;
let summary = RunSummary::stub_for_testing("run-snap", "orders");
let row = build_metric_row(&summary, &plan, "balanced");
assert!(row.chunk_size.is_none(), "snapshot has no chunk_size");
assert!(row.parallel.is_none(), "snapshot has no parallel");
assert_eq!(row.source_type.as_deref(), Some("postgres"));
assert_eq!(row.destination_type.as_deref(), Some("local"));
}
#[test]
fn harm_deltas_subtracts_matched_counters() {
let before = vec![
("pg_tup_returned".to_string(), 100),
("pg_blks_read".to_string(), 5),
];
let after = vec![
("pg_tup_returned".to_string(), 150),
("pg_blks_read".to_string(), 9),
];
let mut got = harm_deltas(&before, &after);
got.sort();
assert_eq!(
got,
vec![
("pg_blks_read".to_string(), 4),
("pg_tup_returned".to_string(), 50)
]
);
}
#[test]
fn harm_deltas_floors_counter_reset_at_zero() {
let before = vec![("pg_tup_returned".to_string(), 1_000)];
let after = vec![("pg_tup_returned".to_string(), 40)];
assert_eq!(
harm_deltas(&before, &after),
vec![("pg_tup_returned".to_string(), 0)]
);
}
#[test]
fn harm_deltas_intersects_counter_names() {
let before = vec![("shared".to_string(), 10), ("only_before".to_string(), 1)];
let after = vec![("shared".to_string(), 25), ("only_after".to_string(), 7)];
assert_eq!(
harm_deltas(&before, &after),
vec![("shared".to_string(), 15)]
);
}
}