Skip to main content

rivet/pipeline/
repair_cmd.rs

1//! `rivet repair` — targeted re-export of reconcile mismatches (Epic H).
2//!
3//! ## Flow
4//!
5//! 1. Build a `ReconcileReport` (either freshly, or load a previous JSON via
6//!    `--report`).
7//! 2. Derive a [`RepairPlan`](crate::plan::RepairPlan) — the set of chunk
8//!    ranges that need re-export.
9//! 3. Without `--execute` (default): emit the plan and exit.
10//! 4. With `--execute`: re-run just those chunks via
11//!    [`chunked::run_chunked_sequential`] using a `Precomputed` chunk source.
12//!    Output files are written with the standard `<export>_<ts>_chunk<idx>.<ext>`
13//!    naming — they are new files alongside the originals; Rivet does not
14//!    delete or overwrite the old files.
15//!
16//! Progression semantics (ADR-0008): repair does **not** advance
17//! `last_committed_*` — the committed boundary already covers the chunk index.
18//! Operator runs `rivet reconcile` afterwards to advance `last_verified_*`.
19
20use std::collections::HashMap;
21use std::path::Path;
22
23use crate::config::Config;
24use crate::error::Result;
25use crate::plan::{
26    ExtractionStrategy, ReconcileReport, RepairAction, RepairOutcome, RepairPlan, RepairReport,
27    ResolvedRunPlan, build_plan,
28};
29use crate::source;
30use crate::state::StateStore;
31
32use super::RunSummary;
33use super::chunked::{ChunkSource, run_chunked_sequential};
34use super::reconcile_cmd;
35
36/// Output format for the repair plan / report.
37pub enum RepairOutputFormat {
38    /// Human-readable summary to stdout.
39    Pretty,
40    /// Pretty-printed JSON to the given path (or stdout if `None`).
41    Json(Option<String>),
42}
43
44/// Source of the reconcile report used to derive the repair plan.
45pub enum RepairReportSource {
46    /// Read a reconcile report JSON from disk.
47    File(String),
48    /// Run reconcile in-process against the latest chunk run.
49    Auto,
50}
51
52pub fn run_repair_command(
53    config_path: &str,
54    export_name: &str,
55    params: Option<&HashMap<String, String>>,
56    report_source: RepairReportSource,
57    execute: bool,
58    format: RepairOutputFormat,
59) -> Result<()> {
60    let config = Config::load_with_params(config_path, params)?;
61    let config_dir = Path::new(config_path)
62        .parent()
63        .unwrap_or_else(|| Path::new("."));
64
65    let export = config
66        .exports
67        .iter()
68        .find(|e| e.name == export_name)
69        .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
70
71    let mut plan = build_plan(&config, export, config_dir, false, false, false, params)?;
72    if !matches!(plan.strategy, ExtractionStrategy::Chunked(_)) {
73        anyhow::bail!(
74            "repair: '{}' mode — only chunked exports are supported in v1 (Epic H)",
75            plan.strategy.mode_label()
76        );
77    }
78
79    let state_path = config_dir.join(".rivet_state.db");
80    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
81
82    let reconcile_report = load_or_build_reconcile(&plan, &state, report_source)?;
83    let repair_plan = RepairPlan::from_reconcile(&reconcile_report);
84
85    if !execute {
86        emit_plan(&repair_plan, &format)?;
87        return Ok(());
88    }
89
90    if repair_plan.is_empty() {
91        println!(
92            "repair: nothing to repair for '{}' (reconcile report is clean)",
93            export_name
94        );
95        return Ok(());
96    }
97
98    let report = execute_repair(&mut plan, &state, repair_plan)?;
99    emit_report(&report, &format)?;
100    Ok(())
101}
102
103fn load_or_build_reconcile(
104    plan: &ResolvedRunPlan,
105    state: &StateStore,
106    source: RepairReportSource,
107) -> Result<ReconcileReport> {
108    match source {
109        RepairReportSource::File(path) => {
110            let raw = std::fs::read_to_string(&path)
111                .map_err(|e| anyhow::anyhow!("cannot read reconcile report '{}': {}", path, e))?;
112            let r: ReconcileReport = serde_json::from_str(&raw)
113                .map_err(|e| anyhow::anyhow!("invalid reconcile report '{}': {}", path, e))?;
114            if r.export_name != plan.export_name {
115                anyhow::bail!(
116                    "repair: reconcile report is for export '{}' but config targets '{}'",
117                    r.export_name,
118                    plan.export_name
119                );
120            }
121            Ok(r)
122        }
123        RepairReportSource::Auto => reconcile_cmd::reconcile_chunked_fresh(plan, state),
124    }
125}
126
127fn execute_repair(
128    plan: &mut ResolvedRunPlan,
129    state: &StateStore,
130    repair_plan: RepairPlan,
131) -> Result<RepairReport> {
132    // Map start/end strings → i64. Skip ranges that don't parse (recorded as skipped outcomes).
133    let mut ranges: Vec<(i64, i64)> = Vec::with_capacity(repair_plan.actions.len());
134    let mut prebuilt_outcomes: Vec<(RepairAction, RepairOutcome)> = Vec::new();
135    for a in &repair_plan.actions {
136        match (a.start_key.parse::<i64>(), a.end_key.parse::<i64>()) {
137            (Ok(s), Ok(e)) => ranges.push((s, e)),
138            _ => prebuilt_outcomes.push((
139                a.clone(),
140                RepairOutcome::Skipped {
141                    reason: format!("unparseable chunk keys [{}..{}]", a.start_key, a.end_key),
142                },
143            )),
144        }
145    }
146
147    let mut results: Vec<(RepairAction, RepairOutcome)> =
148        Vec::with_capacity(repair_plan.actions.len());
149    results.extend(prebuilt_outcomes);
150
151    if !ranges.is_empty() {
152        let mut src = source::create_source(&plan.source)?;
153        let mut summary = RunSummary::new(plan);
154        let before = summary.total_rows;
155        let outcome = run_chunked_sequential(
156            &mut *src,
157            plan,
158            &mut summary,
159            Some(state),
160            ChunkSource::Precomputed(ranges.clone()),
161        );
162        let delta = summary.total_rows - before;
163        match outcome {
164            Ok(()) => {
165                // Sequential runs chunks in order; we do not track per-chunk row
166                // counts separately here, so attribute the delta to the set.
167                // If the set is a single chunk, the attribution is exact.
168                let executed_actions: Vec<_> = repair_plan
169                    .actions
170                    .iter()
171                    .filter(|a| {
172                        a.start_key.parse::<i64>().is_ok() && a.end_key.parse::<i64>().is_ok()
173                    })
174                    .cloned()
175                    .collect();
176                if executed_actions.len() == 1 {
177                    results.push((
178                        executed_actions[0].clone(),
179                        RepairOutcome::Executed {
180                            rows_written: delta,
181                        },
182                    ));
183                } else {
184                    // Even split is a lie — mark each as executed, attribute total to the first.
185                    let mut first = true;
186                    for a in executed_actions {
187                        let rows = if first { delta } else { 0 };
188                        first = false;
189                        results.push((a, RepairOutcome::Executed { rows_written: rows }));
190                    }
191                }
192            }
193            Err(e) => {
194                let msg = crate::redact::redact_error(&e);
195                for a in repair_plan.actions.iter().filter(|a| {
196                    a.start_key.parse::<i64>().is_ok() && a.end_key.parse::<i64>().is_ok()
197                }) {
198                    results.push((a.clone(), RepairOutcome::Failed { error: msg.clone() }));
199                }
200            }
201        }
202    }
203
204    Ok(RepairReport::new(
205        repair_plan,
206        format!("repair-{}", chrono::Utc::now().format("%Y%m%dT%H%M%S")),
207        results,
208    ))
209}
210
211fn emit_plan(plan: &RepairPlan, format: &RepairOutputFormat) -> Result<()> {
212    match format {
213        RepairOutputFormat::Pretty => print_plan_pretty(plan),
214        RepairOutputFormat::Json(None) => println!("{}", plan.to_json_pretty()?),
215        RepairOutputFormat::Json(Some(path)) => {
216            std::fs::write(path, plan.to_json_pretty()?)
217                .map_err(|e| anyhow::anyhow!("cannot write repair plan '{}': {}", path, e))?;
218            println!("Repair plan written to: {}", path);
219        }
220    }
221    Ok(())
222}
223
224fn emit_report(report: &RepairReport, format: &RepairOutputFormat) -> Result<()> {
225    match format {
226        RepairOutputFormat::Pretty => print_report_pretty(report),
227        RepairOutputFormat::Json(None) => println!("{}", report.to_json_pretty()?),
228        RepairOutputFormat::Json(Some(path)) => {
229            std::fs::write(path, report.to_json_pretty()?)
230                .map_err(|e| anyhow::anyhow!("cannot write repair report '{}': {}", path, e))?;
231            println!("Repair report written to: {}", path);
232        }
233    }
234    Ok(())
235}
236
237fn print_plan_pretty(plan: &RepairPlan) {
238    println!();
239    println!("  Export            : {}", plan.export_name);
240    println!("  Reconcile run     : {}", plan.reconcile_run_id);
241    println!("  Actions           : {}", plan.actions.len());
242    for a in &plan.actions {
243        println!(
244            "    • chunk {} [{}..{}] — {}",
245            a.chunk_index, a.start_key, a.end_key, a.reason
246        );
247    }
248    if !plan.skipped.is_empty() {
249        println!("  Skipped           :");
250        for s in &plan.skipped {
251            println!("    • {s}");
252        }
253    }
254    if plan.is_empty() && plan.skipped.is_empty() {
255        println!("  (nothing to repair)");
256    }
257    println!();
258}
259
260fn print_report_pretty(report: &RepairReport) {
261    println!();
262    println!("  Export       : {}", report.plan.export_name);
263    println!("  Repair run   : {}", report.repair_run_id);
264    println!(
265        "  Summary      : planned {} · executed {} · skipped {} · failed {} · rows {}",
266        report.summary.planned,
267        report.summary.executed,
268        report.summary.skipped,
269        report.summary.failed,
270        report.summary.rows_written,
271    );
272    for (a, out) in &report.results {
273        let tag = match out {
274            RepairOutcome::Executed { rows_written } => format!("executed ({rows_written} rows)"),
275            RepairOutcome::Skipped { reason } => format!("skipped ({reason})"),
276            RepairOutcome::Failed { error } => format!("failed ({error})"),
277        };
278        println!(
279            "    • chunk {} [{}..{}] — {tag}",
280            a.chunk_index, a.start_key, a.end_key
281        );
282    }
283    println!();
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use crate::plan::{PartitionKind, PartitionResult, ReconcileReport};
290
291    #[test]
292    fn plan_from_auto_would_derive_actions_from_reconcile() {
293        // Smoke-test the public derivation path without hitting the DB.
294        let partitions = vec![
295            PartitionResult::classify(
296                PartitionKind::Chunk,
297                "chunk 0 [1..100]".into(),
298                Some(100),
299                Some(100),
300            ),
301            PartitionResult::classify(
302                PartitionKind::Chunk,
303                "chunk 1 [101..200]".into(),
304                Some(100),
305                Some(90),
306            ),
307        ];
308        let r = ReconcileReport::new(
309            "orders".into(),
310            "rec-1".into(),
311            "chunked".into(),
312            partitions,
313        );
314        let plan = RepairPlan::from_reconcile(&r);
315        assert_eq!(plan.actions.len(), 1);
316        assert_eq!(plan.actions[0].chunk_index, 1);
317    }
318}