1use std::collections::HashMap;
21use std::path::Path;
22
23use crate::config::Config;
24use crate::error::Result;
25use crate::plan::{
26 ExtractionStrategy, ReconcileReport, RepairAction, RepairOutcome, RepairPlan, RepairReport,
27 ResolvedRunPlan, build_plan,
28};
29use crate::source;
30use crate::state::StateStore;
31
32use super::RunSummary;
33use super::chunked::{ChunkSource, run_chunked_sequential};
34use super::reconcile_cmd;
35
36pub enum RepairOutputFormat {
38 Pretty,
40 Json(Option<String>),
42}
43
44pub enum RepairReportSource {
46 File(String),
48 Auto,
50}
51
52pub fn run_repair_command(
53 config_path: &str,
54 export_name: &str,
55 params: Option<&HashMap<String, String>>,
56 report_source: RepairReportSource,
57 execute: bool,
58 format: RepairOutputFormat,
59) -> Result<()> {
60 let config = Config::load_with_params(config_path, params)?;
61 let config_dir = Path::new(config_path)
62 .parent()
63 .unwrap_or_else(|| Path::new("."));
64
65 let export = config
66 .exports
67 .iter()
68 .find(|e| e.name == export_name)
69 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
70
71 let mut plan = build_plan(&config, export, config_dir, false, false, false, params)?;
72 if !matches!(plan.strategy, ExtractionStrategy::Chunked(_)) {
73 anyhow::bail!(
74 "repair: '{}' mode — only chunked exports are supported in v1 (Epic H)",
75 plan.strategy.mode_label()
76 );
77 }
78
79 let state_path = config_dir.join(".rivet_state.db");
80 let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
81
82 let reconcile_report = load_or_build_reconcile(&plan, &state, report_source)?;
83 let repair_plan = RepairPlan::from_reconcile(&reconcile_report);
84
85 if !execute {
86 emit_plan(&repair_plan, &format)?;
87 return Ok(());
88 }
89
90 if repair_plan.is_empty() {
91 println!(
92 "repair: nothing to repair for '{}' (reconcile report is clean)",
93 export_name
94 );
95 return Ok(());
96 }
97
98 let report = execute_repair(&mut plan, &state, repair_plan)?;
99 emit_report(&report, &format)?;
100 Ok(())
101}
102
103fn load_or_build_reconcile(
104 plan: &ResolvedRunPlan,
105 state: &StateStore,
106 source: RepairReportSource,
107) -> Result<ReconcileReport> {
108 match source {
109 RepairReportSource::File(path) => {
110 let raw = std::fs::read_to_string(&path)
111 .map_err(|e| anyhow::anyhow!("cannot read reconcile report '{}': {}", path, e))?;
112 let r: ReconcileReport = serde_json::from_str(&raw)
113 .map_err(|e| anyhow::anyhow!("invalid reconcile report '{}': {}", path, e))?;
114 if r.export_name != plan.export_name {
115 anyhow::bail!(
116 "repair: reconcile report is for export '{}' but config targets '{}'",
117 r.export_name,
118 plan.export_name
119 );
120 }
121 Ok(r)
122 }
123 RepairReportSource::Auto => reconcile_cmd::reconcile_chunked_fresh(plan, state),
124 }
125}
126
127fn execute_repair(
128 plan: &mut ResolvedRunPlan,
129 state: &StateStore,
130 repair_plan: RepairPlan,
131) -> Result<RepairReport> {
132 let mut ranges: Vec<(i64, i64)> = Vec::with_capacity(repair_plan.actions.len());
134 let mut prebuilt_outcomes: Vec<(RepairAction, RepairOutcome)> = Vec::new();
135 for a in &repair_plan.actions {
136 match (a.start_key.parse::<i64>(), a.end_key.parse::<i64>()) {
137 (Ok(s), Ok(e)) => ranges.push((s, e)),
138 _ => prebuilt_outcomes.push((
139 a.clone(),
140 RepairOutcome::Skipped {
141 reason: format!("unparseable chunk keys [{}..{}]", a.start_key, a.end_key),
142 },
143 )),
144 }
145 }
146
147 let mut results: Vec<(RepairAction, RepairOutcome)> =
148 Vec::with_capacity(repair_plan.actions.len());
149 results.extend(prebuilt_outcomes);
150
151 if !ranges.is_empty() {
152 let mut src = source::create_source(&plan.source)?;
153 let mut summary = RunSummary::new(plan);
154 let before = summary.total_rows;
155 let outcome = run_chunked_sequential(
156 &mut *src,
157 plan,
158 &mut summary,
159 Some(state),
160 ChunkSource::Precomputed(ranges.clone()),
161 );
162 let delta = summary.total_rows - before;
163 match outcome {
164 Ok(()) => {
165 let executed_actions: Vec<_> = repair_plan
169 .actions
170 .iter()
171 .filter(|a| {
172 a.start_key.parse::<i64>().is_ok() && a.end_key.parse::<i64>().is_ok()
173 })
174 .cloned()
175 .collect();
176 if executed_actions.len() == 1 {
177 results.push((
178 executed_actions[0].clone(),
179 RepairOutcome::Executed {
180 rows_written: delta,
181 },
182 ));
183 } else {
184 let mut first = true;
186 for a in executed_actions {
187 let rows = if first { delta } else { 0 };
188 first = false;
189 results.push((a, RepairOutcome::Executed { rows_written: rows }));
190 }
191 }
192 }
193 Err(e) => {
194 let msg = crate::redact::redact_error(&e);
195 for a in repair_plan.actions.iter().filter(|a| {
196 a.start_key.parse::<i64>().is_ok() && a.end_key.parse::<i64>().is_ok()
197 }) {
198 results.push((a.clone(), RepairOutcome::Failed { error: msg.clone() }));
199 }
200 }
201 }
202 }
203
204 Ok(RepairReport::new(
205 repair_plan,
206 format!("repair-{}", chrono::Utc::now().format("%Y%m%dT%H%M%S")),
207 results,
208 ))
209}
210
211fn emit_plan(plan: &RepairPlan, format: &RepairOutputFormat) -> Result<()> {
212 match format {
213 RepairOutputFormat::Pretty => print_plan_pretty(plan),
214 RepairOutputFormat::Json(None) => println!("{}", plan.to_json_pretty()?),
215 RepairOutputFormat::Json(Some(path)) => {
216 std::fs::write(path, plan.to_json_pretty()?)
217 .map_err(|e| anyhow::anyhow!("cannot write repair plan '{}': {}", path, e))?;
218 println!("Repair plan written to: {}", path);
219 }
220 }
221 Ok(())
222}
223
224fn emit_report(report: &RepairReport, format: &RepairOutputFormat) -> Result<()> {
225 match format {
226 RepairOutputFormat::Pretty => print_report_pretty(report),
227 RepairOutputFormat::Json(None) => println!("{}", report.to_json_pretty()?),
228 RepairOutputFormat::Json(Some(path)) => {
229 std::fs::write(path, report.to_json_pretty()?)
230 .map_err(|e| anyhow::anyhow!("cannot write repair report '{}': {}", path, e))?;
231 println!("Repair report written to: {}", path);
232 }
233 }
234 Ok(())
235}
236
237fn print_plan_pretty(plan: &RepairPlan) {
238 println!();
239 println!(" Export : {}", plan.export_name);
240 println!(" Reconcile run : {}", plan.reconcile_run_id);
241 println!(" Actions : {}", plan.actions.len());
242 for a in &plan.actions {
243 println!(
244 " • chunk {} [{}..{}] — {}",
245 a.chunk_index, a.start_key, a.end_key, a.reason
246 );
247 }
248 if !plan.skipped.is_empty() {
249 println!(" Skipped :");
250 for s in &plan.skipped {
251 println!(" • {s}");
252 }
253 }
254 if plan.is_empty() && plan.skipped.is_empty() {
255 println!(" (nothing to repair)");
256 }
257 println!();
258}
259
260fn print_report_pretty(report: &RepairReport) {
261 println!();
262 println!(" Export : {}", report.plan.export_name);
263 println!(" Repair run : {}", report.repair_run_id);
264 println!(
265 " Summary : planned {} · executed {} · skipped {} · failed {} · rows {}",
266 report.summary.planned,
267 report.summary.executed,
268 report.summary.skipped,
269 report.summary.failed,
270 report.summary.rows_written,
271 );
272 for (a, out) in &report.results {
273 let tag = match out {
274 RepairOutcome::Executed { rows_written } => format!("executed ({rows_written} rows)"),
275 RepairOutcome::Skipped { reason } => format!("skipped ({reason})"),
276 RepairOutcome::Failed { error } => format!("failed ({error})"),
277 };
278 println!(
279 " • chunk {} [{}..{}] — {tag}",
280 a.chunk_index, a.start_key, a.end_key
281 );
282 }
283 println!();
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use crate::plan::{PartitionKind, PartitionResult, ReconcileReport};
290
291 #[test]
292 fn plan_from_auto_would_derive_actions_from_reconcile() {
293 let partitions = vec![
295 PartitionResult::classify(
296 PartitionKind::Chunk,
297 "chunk 0 [1..100]".into(),
298 Some(100),
299 Some(100),
300 ),
301 PartitionResult::classify(
302 PartitionKind::Chunk,
303 "chunk 1 [101..200]".into(),
304 Some(100),
305 Some(90),
306 ),
307 ];
308 let r = ReconcileReport::new(
309 "orders".into(),
310 "rec-1".into(),
311 "chunked".into(),
312 partitions,
313 );
314 let plan = RepairPlan::from_reconcile(&r);
315 assert_eq!(plan.actions.len(), 1);
316 assert_eq!(plan.actions[0].chunk_index, 1);
317 }
318}