use std::collections::HashMap;
use std::path::Path;
use crate::config::Config;
use crate::error::Result;
use crate::manifest::{MANIFEST_FILENAME, ManifestPart, ManifestStatus, PartStatus, RunManifest};
use crate::plan::{
ExtractionStrategy, ReconcileReport, RepairAction, RepairOutcome, RepairPlan, RepairReport,
ResolvedRunPlan, build_plan,
};
use crate::source;
use crate::state::StateStore;
use super::RunSummary;
use super::chunked::{ChunkSource, run_chunked_sequential};
use super::reconcile_cmd;
pub enum RepairOutputFormat {
Pretty,
Json(Option<String>),
}
pub enum RepairReportSource {
File(String),
Auto,
}
pub fn run_repair_command(
config_path: &str,
export_name: &str,
params: Option<&HashMap<String, String>>,
report_source: RepairReportSource,
execute: bool,
format: RepairOutputFormat,
) -> Result<()> {
let config = Config::load_with_params(config_path, params)?;
let config_dir = Path::new(config_path)
.parent()
.unwrap_or_else(|| Path::new("."));
let export = config
.exports
.iter()
.find(|e| e.name == export_name)
.ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
let mut plan = build_plan(&config, export, config_dir, false, false, false, params)?;
if !matches!(plan.strategy, ExtractionStrategy::Chunked(_)) {
anyhow::bail!(
"repair: '{}' mode — only chunked exports are supported in v1 (Epic H)",
plan.strategy.mode_label()
);
}
let state_path = config_dir.join(".rivet_state.db");
let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
let reconcile_report = load_or_build_reconcile(&plan, &state, report_source)?;
let repair_plan = RepairPlan::from_reconcile(&reconcile_report);
if !execute {
emit_plan(&repair_plan, &format)?;
return Ok(());
}
if repair_plan.is_empty() {
println!(
"repair: nothing to repair for '{}' (reconcile report is clean)",
export_name
);
return Ok(());
}
let report = execute_repair(&mut plan, &state, repair_plan)?;
emit_report(&report, &format)?;
Ok(())
}
fn load_or_build_reconcile(
plan: &ResolvedRunPlan,
state: &StateStore,
source: RepairReportSource,
) -> Result<ReconcileReport> {
match source {
RepairReportSource::File(path) => {
let raw = std::fs::read_to_string(&path)
.map_err(|e| anyhow::anyhow!("cannot read reconcile report '{}': {}", path, e))?;
let r: ReconcileReport = serde_json::from_str(&raw)
.map_err(|e| anyhow::anyhow!("invalid reconcile report '{}': {}", path, e))?;
if r.export_name != plan.export_name {
anyhow::bail!(
"repair: reconcile report is for export '{}' but config targets '{}'",
r.export_name,
plan.export_name
);
}
Ok(r)
}
RepairReportSource::Auto => reconcile_cmd::reconcile_chunked_fresh(plan, state),
}
}
fn execute_repair(
plan: &mut ResolvedRunPlan,
state: &StateStore,
repair_plan: RepairPlan,
) -> Result<RepairReport> {
let mut results: Vec<(RepairAction, RepairOutcome)> =
Vec::with_capacity(repair_plan.actions.len());
let run_id = state
.get_latest_chunk_run(&plan.export_name)?
.map(|(rid, _, _, _)| rid);
let mut src = source::create_source(&plan.source)?;
let mut summary = RunSummary::new(plan);
let dest = crate::destination::create_destination(&plan.destination)?;
let mut new_parts: Vec<ManifestPart> = Vec::new();
for a in &repair_plan.actions {
let (start, end) = match (a.start_key.parse::<i64>(), a.end_key.parse::<i64>()) {
(Ok(s), Ok(e)) => (s, e),
_ => {
results.push((
a.clone(),
RepairOutcome::Skipped {
reason: format!("unparseable chunk keys [{}..{}]", a.start_key, a.end_key),
},
));
continue;
}
};
let rows_before = summary.total_rows;
let parts_before = summary.manifest_parts.len();
let outcome = run_chunked_sequential(
&mut *src,
plan,
&mut summary,
Some(state),
ChunkSource::Precomputed(vec![(start, end)]),
);
match outcome {
Ok(()) => {
let rows = summary.total_rows - rows_before;
let mut chunk_parts: Vec<ManifestPart> =
summary.manifest_parts[parts_before..].to_vec();
for p in &mut chunk_parts {
if let Some(renamed) = relabel_repair_chunk_index(&p.path, a.chunk_index) {
match dest.r#move(&p.path, &renamed) {
Ok(()) => p.path = renamed,
Err(e) => log::warn!(
"repair: chunk {} re-exported but could not rename \
'{}' → '{}' to carry the original chunk index \
(the file is durable under its chunk0 name): {:#}",
a.chunk_index,
p.path,
renamed,
e
),
}
}
}
if let Some(rid) = &run_id {
let file_name = chunk_parts.last().map(|p| p.path.as_str());
if let Err(e) = state.complete_chunk_task(rid, a.chunk_index, rows, file_name) {
log::warn!(
"repair: chunk {} re-exported but chunk_task update failed — \
reconcile will still report the old mismatch: {:#}",
a.chunk_index,
e
);
}
} else {
log::warn!(
"repair: chunk {} re-exported but no chunk run is recorded for export \
'{}' — chunk_task could not be updated; reconcile will not converge",
a.chunk_index,
plan.export_name
);
}
new_parts.extend(chunk_parts);
results.push((a.clone(), RepairOutcome::Executed { rows_written: rows }));
}
Err(e) => {
let msg = crate::redact::redact_error(&e);
results.push((a.clone(), RepairOutcome::Failed { error: msg }));
}
}
}
if !new_parts.is_empty()
&& let Err(e) = record_repair_parts_in_manifest(plan, &new_parts)
{
log::warn!(
"repair: re-exported parts were written but the destination manifest could not be \
updated (the files are durable; `rivet validate` may flag them as untracked): {:#}",
e
);
}
Ok(RepairReport::new(
repair_plan,
format!("repair-{}", chrono::Utc::now().format("%Y%m%dT%H%M%S")),
results,
))
}
fn record_repair_parts_in_manifest(
plan: &ResolvedRunPlan,
new_parts: &[ManifestPart],
) -> Result<()> {
let dest = crate::destination::create_destination(&plan.destination)?;
let raw = match dest.head(MANIFEST_FILENAME)? {
Some(_) => crate::pipeline::validate_manifest::read_capped(
&*dest,
MANIFEST_FILENAME,
crate::pipeline::validate_manifest::MANIFEST_MAX_BYTES,
)?,
None => anyhow::bail!(
"no manifest.json at the destination prefix — cannot record repair parts \
(was the original export finalized?)"
),
};
let mut manifest: RunManifest = serde_json::from_slice(&raw)
.map_err(|e| anyhow::anyhow!("destination manifest.json is unparseable: {e}"))?;
let mut next_id = manifest.parts.iter().map(|p| p.part_id).max().unwrap_or(0) + 1;
for p in new_parts {
manifest.parts.push(ManifestPart {
part_id: next_id,
path: p.path.clone(),
rows: p.rows,
size_bytes: p.size_bytes,
content_fingerprint: p.content_fingerprint.clone(),
content_md5: p.content_md5.clone(),
status: PartStatus::Committed,
});
next_id += 1;
}
manifest.row_count = manifest.committed_rows();
manifest.part_count = manifest.committed_part_count() as u32;
manifest.finished_at = chrono::Utc::now().to_rfc3339();
let bytes = serde_json::to_vec_pretty(&manifest)?;
let _ = ManifestStatus::Success; let tmp = tempfile::NamedTempFile::new()?;
std::fs::write(tmp.path(), &bytes)?;
dest.write(tmp.path(), MANIFEST_FILENAME)?;
Ok(())
}
fn relabel_repair_chunk_index(path: &str, original_chunk_index: i64) -> Option<String> {
if original_chunk_index == 0 {
return None;
}
let token = "_chunk0_";
let at = path.rfind(token)?;
Some(format!(
"{}_chunk{}_{}",
&path[..at],
original_chunk_index,
&path[at + token.len()..],
))
}
fn emit_plan(plan: &RepairPlan, format: &RepairOutputFormat) -> Result<()> {
match format {
RepairOutputFormat::Pretty => print_plan_pretty(plan),
RepairOutputFormat::Json(None) => println!("{}", plan.to_json_pretty()?),
RepairOutputFormat::Json(Some(path)) => {
std::fs::write(path, plan.to_json_pretty()?)
.map_err(|e| anyhow::anyhow!("cannot write repair plan '{}': {}", path, e))?;
println!("Repair plan written to: {}", path);
}
}
Ok(())
}
fn emit_report(report: &RepairReport, format: &RepairOutputFormat) -> Result<()> {
match format {
RepairOutputFormat::Pretty => print_report_pretty(report),
RepairOutputFormat::Json(None) => println!("{}", report.to_json_pretty()?),
RepairOutputFormat::Json(Some(path)) => {
std::fs::write(path, report.to_json_pretty()?)
.map_err(|e| anyhow::anyhow!("cannot write repair report '{}': {}", path, e))?;
println!("Repair report written to: {}", path);
}
}
Ok(())
}
fn print_plan_pretty(plan: &RepairPlan) {
println!();
println!(" Export : {}", plan.export_name);
println!(" Reconcile run : {}", plan.reconcile_run_id);
println!(" Actions : {}", plan.actions.len());
for a in &plan.actions {
println!(
" • chunk {} [{}..{}] — {}",
a.chunk_index, a.start_key, a.end_key, a.reason
);
}
if !plan.skipped.is_empty() {
println!(" Skipped :");
for s in &plan.skipped {
println!(" • {s}");
}
}
if plan.is_empty() && plan.skipped.is_empty() {
println!(" (nothing to repair)");
}
println!();
}
fn print_report_pretty(report: &RepairReport) {
println!();
println!(" Export : {}", report.plan.export_name);
println!(" Repair run : {}", report.repair_run_id);
println!(
" Summary : planned {} · executed {} · skipped {} · failed {} · rows {}",
report.summary.planned,
report.summary.executed,
report.summary.skipped,
report.summary.failed,
report.summary.rows_written,
);
for (a, out) in &report.results {
let tag = match out {
RepairOutcome::Executed { rows_written } => format!("executed ({rows_written} rows)"),
RepairOutcome::Skipped { reason } => format!("skipped ({reason})"),
RepairOutcome::Failed { error } => format!("failed ({error})"),
};
println!(
" • chunk {} [{}..{}] — {tag}",
a.chunk_index, a.start_key, a.end_key
);
}
println!();
}
#[cfg(test)]
mod tests {
use super::*;
use crate::plan::{PartitionKind, PartitionResult, ReconcileReport};
#[test]
fn plan_from_auto_would_derive_actions_from_reconcile() {
let partitions = vec![
PartitionResult::classify(
PartitionKind::Chunk,
"chunk 0 [1..100]".into(),
Some(100),
Some(100),
),
PartitionResult::classify(
PartitionKind::Chunk,
"chunk 1 [101..200]".into(),
Some(100),
Some(90),
),
];
let r = ReconcileReport::new(
"orders".into(),
"rec-1".into(),
"chunked".into(),
partitions,
);
let plan = RepairPlan::from_reconcile(&r);
assert_eq!(plan.actions.len(), 1);
assert_eq!(plan.actions[0].chunk_index, 1);
}
#[test]
fn relabel_repair_chunk_index_rewrites_chunk0_to_original() {
let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8.parquet";
let renamed = relabel_repair_chunk_index(written, 2)
.expect("a non-zero chunk index must produce a renamed path");
assert_eq!(
renamed,
"orders_20260611_120000_chunk2_a1b2c3d4e5f6a7b8.parquet"
);
assert!(!renamed.contains("_chunk0_"), "no chunk0 token survives");
}
#[test]
fn relabel_repair_chunk_index_handles_rotated_part_suffix() {
let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8_p1.parquet";
let renamed = relabel_repair_chunk_index(written, 3).unwrap();
assert_eq!(
renamed,
"orders_20260611_120000_chunk3_a1b2c3d4e5f6a7b8_p1.parquet"
);
}
#[test]
fn relabel_repair_chunk_index_is_noop_for_chunk_zero() {
let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8.parquet";
assert!(relabel_repair_chunk_index(written, 0).is_none());
}
#[test]
fn relabel_repair_chunk_index_leaves_unexpected_shapes_untouched() {
assert!(relabel_repair_chunk_index("orders_no_chunk_token.parquet", 5).is_none());
}
}