1use std::collections::HashMap;
40use std::path::Path;
41
42use crate::config::Config;
43use crate::error::Result;
44use crate::manifest::{MANIFEST_FILENAME, ManifestPart, ManifestStatus, PartStatus, RunManifest};
45use crate::plan::{
46 ExtractionStrategy, ReconcileReport, RepairAction, RepairOutcome, RepairPlan, RepairReport,
47 ResolvedRunPlan, build_plan,
48};
49use crate::source;
50use crate::state::StateStore;
51
52use super::RunSummary;
53use super::chunked::{ChunkSource, run_chunked_sequential};
54use super::reconcile_cmd;
55
56pub enum RepairOutputFormat {
58 Pretty,
60 Json(Option<String>),
62}
63
64pub enum RepairReportSource {
66 File(String),
68 Auto,
70}
71
72pub fn run_repair_command(
73 config_path: &str,
74 export_name: &str,
75 params: Option<&HashMap<String, String>>,
76 report_source: RepairReportSource,
77 execute: bool,
78 format: RepairOutputFormat,
79) -> Result<()> {
80 let config = Config::load_with_params(config_path, params)?;
81 let config_dir = Path::new(config_path)
82 .parent()
83 .unwrap_or_else(|| Path::new("."));
84
85 let export = config
86 .exports
87 .iter()
88 .find(|e| e.name == export_name)
89 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
90
91 let mut plan = build_plan(&config, export, config_dir, false, false, false, params)?;
92 if !matches!(plan.strategy, ExtractionStrategy::Chunked(_)) {
93 anyhow::bail!(
94 "repair: '{}' mode — only chunked exports are supported in v1 (Epic H)",
95 plan.strategy.mode_label()
96 );
97 }
98
99 let state_path = config_dir.join(".rivet_state.db");
100 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
101
102 let reconcile_report = load_or_build_reconcile(&plan, &state, report_source)?;
103 let repair_plan = RepairPlan::from_reconcile(&reconcile_report);
104
105 if !execute {
106 emit_plan(&repair_plan, &format)?;
107 return Ok(());
108 }
109
110 if repair_plan.is_empty() {
111 println!(
112 "repair: nothing to repair for '{}' (reconcile report is clean)",
113 export_name
114 );
115 return Ok(());
116 }
117
118 let report = execute_repair(&mut plan, &state, repair_plan)?;
119 emit_report(&report, &format)?;
120 Ok(())
121}
122
123fn load_or_build_reconcile(
124 plan: &ResolvedRunPlan,
125 state: &StateStore,
126 source: RepairReportSource,
127) -> Result<ReconcileReport> {
128 match source {
129 RepairReportSource::File(path) => {
130 let raw = std::fs::read_to_string(&path)
131 .map_err(|e| anyhow::anyhow!("cannot read reconcile report '{}': {}", path, e))?;
132 let r: ReconcileReport = serde_json::from_str(&raw)
133 .map_err(|e| anyhow::anyhow!("invalid reconcile report '{}': {}", path, e))?;
134 if r.export_name != plan.export_name {
135 anyhow::bail!(
136 "repair: reconcile report is for export '{}' but config targets '{}'",
137 r.export_name,
138 plan.export_name
139 );
140 }
141 Ok(r)
142 }
143 RepairReportSource::Auto => reconcile_cmd::reconcile_chunked_fresh(plan, state),
144 }
145}
146
147fn execute_repair(
148 plan: &mut ResolvedRunPlan,
149 state: &StateStore,
150 repair_plan: RepairPlan,
151) -> Result<RepairReport> {
152 let mut results: Vec<(RepairAction, RepairOutcome)> =
153 Vec::with_capacity(repair_plan.actions.len());
154
155 let run_id = state
161 .get_latest_chunk_run(&plan.export_name)?
162 .map(|(rid, _, _, _)| rid);
163
164 let mut src = source::create_source(&plan.source)?;
170 let mut summary = RunSummary::new(plan);
171
172 let dest = crate::destination::create_destination(&plan.destination)?;
180
181 let mut new_parts: Vec<ManifestPart> = Vec::new();
183
184 for a in &repair_plan.actions {
185 let (start, end) = match (a.start_key.parse::<i64>(), a.end_key.parse::<i64>()) {
186 (Ok(s), Ok(e)) => (s, e),
187 _ => {
188 results.push((
189 a.clone(),
190 RepairOutcome::Skipped {
191 reason: format!("unparseable chunk keys [{}..{}]", a.start_key, a.end_key),
192 },
193 ));
194 continue;
195 }
196 };
197
198 let rows_before = summary.total_rows;
199 let parts_before = summary.manifest_parts.len();
200 let outcome = run_chunked_sequential(
201 &mut *src,
202 plan,
203 &mut summary,
204 Some(state),
205 ChunkSource::Precomputed(vec![(start, end)]),
206 );
207 match outcome {
208 Ok(()) => {
209 let rows = summary.total_rows - rows_before;
210 let mut chunk_parts: Vec<ManifestPart> =
215 summary.manifest_parts[parts_before..].to_vec();
216
217 for p in &mut chunk_parts {
227 if let Some(renamed) = relabel_repair_chunk_index(&p.path, a.chunk_index) {
228 match dest.r#move(&p.path, &renamed) {
229 Ok(()) => p.path = renamed,
230 Err(e) => log::warn!(
231 "repair: chunk {} re-exported but could not rename \
232 '{}' → '{}' to carry the original chunk index \
233 (the file is durable under its chunk0 name): {:#}",
234 a.chunk_index,
235 p.path,
236 renamed,
237 e
238 ),
239 }
240 }
241 }
242
243 if let Some(rid) = &run_id {
251 let file_name = chunk_parts.last().map(|p| p.path.as_str());
252 if let Err(e) = state.complete_chunk_task(rid, a.chunk_index, rows, file_name) {
253 log::warn!(
257 "repair: chunk {} re-exported but chunk_task update failed — \
258 reconcile will still report the old mismatch: {:#}",
259 a.chunk_index,
260 e
261 );
262 }
263 } else {
264 log::warn!(
265 "repair: chunk {} re-exported but no chunk run is recorded for export \
266 '{}' — chunk_task could not be updated; reconcile will not converge",
267 a.chunk_index,
268 plan.export_name
269 );
270 }
271
272 new_parts.extend(chunk_parts);
273 results.push((a.clone(), RepairOutcome::Executed { rows_written: rows }));
274 }
275 Err(e) => {
276 let msg = crate::redact::redact_error(&e);
277 results.push((a.clone(), RepairOutcome::Failed { error: msg }));
278 }
279 }
280 }
281
282 if !new_parts.is_empty()
289 && let Err(e) = record_repair_parts_in_manifest(plan, &new_parts)
290 {
291 log::warn!(
292 "repair: re-exported parts were written but the destination manifest could not be \
293 updated (the files are durable; `rivet validate` may flag them as untracked): {:#}",
294 e
295 );
296 }
297
298 Ok(RepairReport::new(
299 repair_plan,
300 format!("repair-{}", chrono::Utc::now().format("%Y%m%dT%H%M%S")),
301 results,
302 ))
303}
304
305fn record_repair_parts_in_manifest(
315 plan: &ResolvedRunPlan,
316 new_parts: &[ManifestPart],
317) -> Result<()> {
318 let dest = crate::destination::create_destination(&plan.destination)?;
319
320 let raw = match dest.head(MANIFEST_FILENAME)? {
324 Some(_) => crate::pipeline::validate_manifest::read_capped(
325 &*dest,
326 MANIFEST_FILENAME,
327 crate::pipeline::validate_manifest::MANIFEST_MAX_BYTES,
328 )?,
329 None => anyhow::bail!(
330 "no manifest.json at the destination prefix — cannot record repair parts \
331 (was the original export finalized?)"
332 ),
333 };
334 let mut manifest: RunManifest = serde_json::from_slice(&raw)
335 .map_err(|e| anyhow::anyhow!("destination manifest.json is unparseable: {e}"))?;
336
337 let mut next_id = manifest.parts.iter().map(|p| p.part_id).max().unwrap_or(0) + 1;
339 for p in new_parts {
340 manifest.parts.push(ManifestPart {
341 part_id: next_id,
342 path: p.path.clone(),
343 rows: p.rows,
344 size_bytes: p.size_bytes,
345 content_fingerprint: p.content_fingerprint.clone(),
346 content_md5: p.content_md5.clone(),
347 status: PartStatus::Committed,
348 });
349 next_id += 1;
350 }
351
352 manifest.row_count = manifest.committed_rows();
355 manifest.part_count = manifest.committed_part_count() as u32;
356 manifest.finished_at = chrono::Utc::now().to_rfc3339();
357
358 let bytes = serde_json::to_vec_pretty(&manifest)?;
364 let _ = ManifestStatus::Success; let tmp = tempfile::NamedTempFile::new()?;
366 std::fs::write(tmp.path(), &bytes)?;
367 dest.write(tmp.path(), MANIFEST_FILENAME)?;
368 Ok(())
369}
370
371fn relabel_repair_chunk_index(path: &str, original_chunk_index: i64) -> Option<String> {
386 if original_chunk_index == 0 {
387 return None;
388 }
389 let token = "_chunk0_";
390 let at = path.rfind(token)?;
391 Some(format!(
392 "{}_chunk{}_{}",
393 &path[..at],
394 original_chunk_index,
395 &path[at + token.len()..],
396 ))
397}
398
399fn emit_plan(plan: &RepairPlan, format: &RepairOutputFormat) -> Result<()> {
400 match format {
401 RepairOutputFormat::Pretty => print_plan_pretty(plan),
402 RepairOutputFormat::Json(None) => println!("{}", plan.to_json_pretty()?),
403 RepairOutputFormat::Json(Some(path)) => {
404 std::fs::write(path, plan.to_json_pretty()?)
405 .map_err(|e| anyhow::anyhow!("cannot write repair plan '{}': {}", path, e))?;
406 println!("Repair plan written to: {}", path);
407 }
408 }
409 Ok(())
410}
411
412fn emit_report(report: &RepairReport, format: &RepairOutputFormat) -> Result<()> {
413 match format {
414 RepairOutputFormat::Pretty => print_report_pretty(report),
415 RepairOutputFormat::Json(None) => println!("{}", report.to_json_pretty()?),
416 RepairOutputFormat::Json(Some(path)) => {
417 std::fs::write(path, report.to_json_pretty()?)
418 .map_err(|e| anyhow::anyhow!("cannot write repair report '{}': {}", path, e))?;
419 println!("Repair report written to: {}", path);
420 }
421 }
422 Ok(())
423}
424
425fn print_plan_pretty(plan: &RepairPlan) {
426 println!();
427 println!(" Export : {}", plan.export_name);
428 println!(" Reconcile run : {}", plan.reconcile_run_id);
429 println!(" Actions : {}", plan.actions.len());
430 for a in &plan.actions {
431 println!(
432 " • chunk {} [{}..{}] — {}",
433 a.chunk_index, a.start_key, a.end_key, a.reason
434 );
435 }
436 if !plan.skipped.is_empty() {
437 println!(" Skipped :");
438 for s in &plan.skipped {
439 println!(" • {s}");
440 }
441 }
442 if plan.is_empty() && plan.skipped.is_empty() {
443 println!(" (nothing to repair)");
444 }
445 println!();
446}
447
448fn print_report_pretty(report: &RepairReport) {
449 println!();
450 println!(" Export : {}", report.plan.export_name);
451 println!(" Repair run : {}", report.repair_run_id);
452 println!(
453 " Summary : planned {} · executed {} · skipped {} · failed {} · rows {}",
454 report.summary.planned,
455 report.summary.executed,
456 report.summary.skipped,
457 report.summary.failed,
458 report.summary.rows_written,
459 );
460 for (a, out) in &report.results {
461 let tag = match out {
462 RepairOutcome::Executed { rows_written } => format!("executed ({rows_written} rows)"),
463 RepairOutcome::Skipped { reason } => format!("skipped ({reason})"),
464 RepairOutcome::Failed { error } => format!("failed ({error})"),
465 };
466 println!(
467 " • chunk {} [{}..{}] — {tag}",
468 a.chunk_index, a.start_key, a.end_key
469 );
470 }
471 println!();
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477 use crate::plan::{PartitionKind, PartitionResult, ReconcileReport};
478
479 #[test]
480 fn plan_from_auto_would_derive_actions_from_reconcile() {
481 let partitions = vec![
483 PartitionResult::classify(
484 PartitionKind::Chunk,
485 "chunk 0 [1..100]".into(),
486 Some(100),
487 Some(100),
488 ),
489 PartitionResult::classify(
490 PartitionKind::Chunk,
491 "chunk 1 [101..200]".into(),
492 Some(100),
493 Some(90),
494 ),
495 ];
496 let r = ReconcileReport::new(
497 "orders".into(),
498 "rec-1".into(),
499 "chunked".into(),
500 partitions,
501 );
502 let plan = RepairPlan::from_reconcile(&r);
503 assert_eq!(plan.actions.len(), 1);
504 assert_eq!(plan.actions[0].chunk_index, 1);
505 }
506
507 #[test]
510 fn relabel_repair_chunk_index_rewrites_chunk0_to_original() {
511 let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8.parquet";
514 let renamed = relabel_repair_chunk_index(written, 2)
515 .expect("a non-zero chunk index must produce a renamed path");
516 assert_eq!(
517 renamed,
518 "orders_20260611_120000_chunk2_a1b2c3d4e5f6a7b8.parquet"
519 );
520 assert!(!renamed.contains("_chunk0_"), "no chunk0 token survives");
521 }
522
523 #[test]
524 fn relabel_repair_chunk_index_handles_rotated_part_suffix() {
525 let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8_p1.parquet";
528 let renamed = relabel_repair_chunk_index(written, 3).unwrap();
529 assert_eq!(
530 renamed,
531 "orders_20260611_120000_chunk3_a1b2c3d4e5f6a7b8_p1.parquet"
532 );
533 }
534
535 #[test]
536 fn relabel_repair_chunk_index_is_noop_for_chunk_zero() {
537 let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8.parquet";
539 assert!(relabel_repair_chunk_index(written, 0).is_none());
540 }
541
542 #[test]
543 fn relabel_repair_chunk_index_leaves_unexpected_shapes_untouched() {
544 assert!(relabel_repair_chunk_index("orders_no_chunk_token.parquet", 5).is_none());
547 }
548}