Skip to main content

rivet/pipeline/
repair_cmd.rs

1//! `rivet repair` — targeted re-export of reconcile mismatches (Epic H).
2//!
3//! ## Flow
4//!
5//! 1. Build a `ReconcileReport` (either freshly, or load a previous JSON via
6//!    `--report`).
7//! 2. Derive a [`RepairPlan`](crate::plan::RepairPlan) — the set of chunk
8//!    ranges that need re-export.
9//! 3. Without `--execute` (default): emit the plan and exit.
10//! 4. With `--execute`: re-run just those chunks via
11//!    [`chunked::run_chunked_sequential`] using a `Precomputed` chunk source.
12//!    Output files are written with collision-proof naming — they are new
13//!    files alongside the originals; Rivet does not delete or overwrite the
14//!    old files.
15//!
16//! ## Closing the trust loop
17//!
18//! A re-export that lands a fresh file but leaves the recorded state stale
19//! breaks the operator's trust loop: `reconcile` still recounts the source
20//! against the *old* `chunk_task.rows_written` and reports the same mismatch
21//! (the loop never converges), and `rivet validate` lists the prefix and flags
22//! the un-recorded repair file as an `untracked_object`. So after a successful
23//! per-chunk re-export this path:
24//!
25//! 1. updates that chunk's `chunk_task` row — `rows_written` is set to the
26//!    freshly-exported count and the task is re-marked `completed` — so the
27//!    next `reconcile` compares the live source count against the repaired
28//!    count and converges to a match; and
29//! 2. appends the repair-written part(s) to the destination `manifest.json`
30//!    (read → append → rewrite) so the new file is tracked and `validate` no
31//!    longer reports it as untracked. The originals are left in place and in
32//!    the manifest (repair is additive, ADR-0009 RR5 / ADR-0012).
33//!
34//! Progression semantics (ADR-0008): repair does **not** advance
35//! `last_committed_*` — the committed boundary already covers the chunk index.
36//! The reconcile the operator runs (or that the repaired state now passes)
37//! advances `last_verified_*`.
38
39use std::collections::HashMap;
40use std::path::Path;
41
42use crate::config::Config;
43use crate::error::Result;
44use crate::manifest::{MANIFEST_FILENAME, ManifestPart, ManifestStatus, PartStatus, RunManifest};
45use crate::plan::{
46    ExtractionStrategy, ReconcileReport, RepairAction, RepairOutcome, RepairPlan, RepairReport,
47    ResolvedRunPlan, build_plan,
48};
49use crate::source;
50use crate::state::StateStore;
51
52use super::RunSummary;
53use super::chunked::{ChunkSource, run_chunked_sequential};
54use super::reconcile_cmd;
55
56/// Output format for the repair plan / report.
57pub enum RepairOutputFormat {
58    /// Human-readable summary to stdout.
59    Pretty,
60    /// Pretty-printed JSON to the given path (or stdout if `None`).
61    Json(Option<String>),
62}
63
64/// Source of the reconcile report used to derive the repair plan.
65pub enum RepairReportSource {
66    /// Read a reconcile report JSON from disk.
67    File(String),
68    /// Run reconcile in-process against the latest chunk run.
69    Auto,
70}
71
72pub fn run_repair_command(
73    config_path: &str,
74    export_name: &str,
75    params: Option<&HashMap<String, String>>,
76    report_source: RepairReportSource,
77    execute: bool,
78    format: RepairOutputFormat,
79) -> Result<()> {
80    let config = Config::load_with_params(config_path, params)?;
81    let config_dir = Path::new(config_path)
82        .parent()
83        .unwrap_or_else(|| Path::new("."));
84
85    let export = config
86        .exports
87        .iter()
88        .find(|e| e.name == export_name)
89        .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
90
91    let mut plan = build_plan(&config, export, config_dir, false, false, false, params)?;
92    if !matches!(plan.strategy, ExtractionStrategy::Chunked(_)) {
93        anyhow::bail!(
94            "repair: '{}' mode — only chunked exports are supported in v1 (Epic H)",
95            plan.strategy.mode_label()
96        );
97    }
98
99    let state_path = config_dir.join(".rivet_state.db");
100    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
101
102    let reconcile_report = load_or_build_reconcile(&plan, &state, report_source)?;
103    let repair_plan = RepairPlan::from_reconcile(&reconcile_report);
104
105    if !execute {
106        emit_plan(&repair_plan, &format)?;
107        return Ok(());
108    }
109
110    if repair_plan.is_empty() {
111        println!(
112            "repair: nothing to repair for '{}' (reconcile report is clean)",
113            export_name
114        );
115        return Ok(());
116    }
117
118    let report = execute_repair(&mut plan, &state, repair_plan)?;
119    emit_report(&report, &format)?;
120    Ok(())
121}
122
123fn load_or_build_reconcile(
124    plan: &ResolvedRunPlan,
125    state: &StateStore,
126    source: RepairReportSource,
127) -> Result<ReconcileReport> {
128    match source {
129        RepairReportSource::File(path) => {
130            let raw = std::fs::read_to_string(&path)
131                .map_err(|e| anyhow::anyhow!("cannot read reconcile report '{}': {}", path, e))?;
132            let r: ReconcileReport = serde_json::from_str(&raw)
133                .map_err(|e| anyhow::anyhow!("invalid reconcile report '{}': {}", path, e))?;
134            if r.export_name != plan.export_name {
135                anyhow::bail!(
136                    "repair: reconcile report is for export '{}' but config targets '{}'",
137                    r.export_name,
138                    plan.export_name
139                );
140            }
141            Ok(r)
142        }
143        RepairReportSource::Auto => reconcile_cmd::reconcile_chunked_fresh(plan, state),
144    }
145}
146
147fn execute_repair(
148    plan: &mut ResolvedRunPlan,
149    state: &StateStore,
150    repair_plan: RepairPlan,
151) -> Result<RepairReport> {
152    let mut results: Vec<(RepairAction, RepairOutcome)> =
153        Vec::with_capacity(repair_plan.actions.len());
154
155    // The chunk run whose `chunk_task` rows reconcile reads. Repair re-exports
156    // against the latest run for this export — the same run reconcile counted.
157    // Without it we can re-export the data but cannot point the recorded state
158    // at the fresh count, so `reconcile → repair → reconcile` could never
159    // converge (audit finding #7).
160    let run_id = state
161        .get_latest_chunk_run(&plan.export_name)?
162        .map(|(rid, _, _, _)| rid);
163
164    // One summary across the whole repair (matches the original single
165    // `RunSummary::new`): `record_part` appends every freshly-written part to
166    // `summary.manifest_parts`. We snapshot its length and `total_rows` around
167    // each single-chunk re-export to attribute the exact rows and the exact
168    // new file(s) to that chunk — no even-split lie.
169    let mut src = source::create_source(&plan.source)?;
170    let mut summary = RunSummary::new(plan);
171
172    // One destination handle for the whole repair: used to rename each
173    // repair-written part so its filename carries the ORIGINAL chunk index
174    // (audit L15). The single-chunk `Precomputed` source restarts enumeration
175    // at 0, so the writer always names the file `..._chunk0_...`; without this
176    // the file repairing chunk 2 would land as `chunk0`, no longer reflecting
177    // the logical chunk it repairs. Built once here (re-`create_destination`d
178    // again only in the manifest-rewrite step, which runs at most once).
179    let dest = crate::destination::create_destination(&plan.destination)?;
180
181    // Repair-written parts to record in the destination manifest, in order.
182    let mut new_parts: Vec<ManifestPart> = Vec::new();
183
184    for a in &repair_plan.actions {
185        let (start, end) = match (a.start_key.parse::<i64>(), a.end_key.parse::<i64>()) {
186            (Ok(s), Ok(e)) => (s, e),
187            _ => {
188                results.push((
189                    a.clone(),
190                    RepairOutcome::Skipped {
191                        reason: format!("unparseable chunk keys [{}..{}]", a.start_key, a.end_key),
192                    },
193                ));
194                continue;
195            }
196        };
197
198        let rows_before = summary.total_rows;
199        let parts_before = summary.manifest_parts.len();
200        let outcome = run_chunked_sequential(
201            &mut *src,
202            plan,
203            &mut summary,
204            Some(state),
205            ChunkSource::Precomputed(vec![(start, end)]),
206        );
207        match outcome {
208            Ok(()) => {
209                let rows = summary.total_rows - rows_before;
210                // Every part `record_part` appended for this single chunk — its
211                // path, rows, bytes, fingerprint, md5. One chunk yields one part
212                // unless max_file_size rotated it; either way these are exactly
213                // the new files the manifest must learn about.
214                let mut chunk_parts: Vec<ManifestPart> =
215                    summary.manifest_parts[parts_before..].to_vec();
216
217                // L15: the writer named each part `..._chunk0_...` (the
218                // single-chunk `Precomputed` source enumerates from 0), but this
219                // part repairs the logical chunk `a.chunk_index`. Rename the file
220                // and rewrite the recorded `path` so the name carries the real
221                // index — both `complete_chunk_task` (file_name) and the manifest
222                // append below then reference the corrected name. Best-effort
223                // (ADR-0012 M9 `move` semantics): the bytes are already durable,
224                // so a failed rename keeps the original name and warns rather than
225                // failing the repair. A no-op when the chunk index is already 0.
226                for p in &mut chunk_parts {
227                    if let Some(renamed) = relabel_repair_chunk_index(&p.path, a.chunk_index) {
228                        match dest.r#move(&p.path, &renamed) {
229                            Ok(()) => p.path = renamed,
230                            Err(e) => log::warn!(
231                                "repair: chunk {} re-exported but could not rename \
232                                 '{}' → '{}' to carry the original chunk index \
233                                 (the file is durable under its chunk0 name): {:#}",
234                                a.chunk_index,
235                                p.path,
236                                renamed,
237                                e
238                            ),
239                        }
240                    }
241                }
242
243                // (1) Close finding #7: point `chunk_task.rows_written` at the
244                //     freshly-exported count (and re-mark the task completed,
245                //     clearing any stale error) so the next reconcile compares
246                //     the live source count against the repaired count. The
247                //     `file_name` records the newest part for this chunk; if the
248                //     chunk rotated into several parts the latest is recorded
249                //     (reconcile keys on rows_written, not file_name).
250                if let Some(rid) = &run_id {
251                    let file_name = chunk_parts.last().map(|p| p.path.as_str());
252                    if let Err(e) = state.complete_chunk_task(rid, a.chunk_index, rows, file_name) {
253                        // Non-fatal to the data (the file is durable) but fatal
254                        // to trust — surface it loudly rather than report a
255                        // false "executed" that leaves reconcile stuck.
256                        log::warn!(
257                            "repair: chunk {} re-exported but chunk_task update failed — \
258                             reconcile will still report the old mismatch: {:#}",
259                            a.chunk_index,
260                            e
261                        );
262                    }
263                } else {
264                    log::warn!(
265                        "repair: chunk {} re-exported but no chunk run is recorded for export \
266                         '{}' — chunk_task could not be updated; reconcile will not converge",
267                        a.chunk_index,
268                        plan.export_name
269                    );
270                }
271
272                new_parts.extend(chunk_parts);
273                results.push((a.clone(), RepairOutcome::Executed { rows_written: rows }));
274            }
275            Err(e) => {
276                let msg = crate::redact::redact_error(&e);
277                results.push((a.clone(), RepairOutcome::Failed { error: msg }));
278            }
279        }
280    }
281
282    // (2) Close finding #8: record the repair-written parts in the destination
283    //     manifest so `rivet validate` no longer flags them as untracked. Best
284    //     effort and warn-on-fail (ADR-0001 I7 / ADR-0012): the parts are
285    //     already durable at the destination, so a manifest-rewrite failure
286    //     must not change the repair's exit code — but it is logged loudly so
287    //     the operator knows validate may still flag the files.
288    if !new_parts.is_empty()
289        && let Err(e) = record_repair_parts_in_manifest(plan, &new_parts)
290    {
291        log::warn!(
292            "repair: re-exported parts were written but the destination manifest could not be \
293             updated (the files are durable; `rivet validate` may flag them as untracked): {:#}",
294            e
295        );
296    }
297
298    Ok(RepairReport::new(
299        repair_plan,
300        format!("repair-{}", chrono::Utc::now().format("%Y%m%dT%H%M%S")),
301        results,
302    ))
303}
304
305/// Read the destination `manifest.json`, append the repair-written parts as new
306/// committed entries (fresh unique `part_id`s, recomputed `row_count` /
307/// `part_count`), and rewrite it. The originals stay recorded — repair is
308/// additive — so this closes the "untracked repair file" gap (finding #8)
309/// without dropping the manifest's history of the prior parts.
310///
311/// Returns `Err` if no manifest exists at the prefix (a repair against a prefix
312/// that was never finalized has nothing to amend) or if the read/write fails;
313/// the caller logs and continues since the data itself is already durable.
314fn record_repair_parts_in_manifest(
315    plan: &ResolvedRunPlan,
316    new_parts: &[ManifestPart],
317) -> Result<()> {
318    let dest = crate::destination::create_destination(&plan.destination)?;
319
320    // Manifests live at the prefix root (manifest_dir == "" for the local/path
321    // and bucket-prefix destinations repair supports); parts are recorded with
322    // prefix-relative paths, which is exactly what `record_part` stored.
323    let raw = match dest.head(MANIFEST_FILENAME)? {
324        Some(_) => crate::pipeline::validate_manifest::read_capped(
325            &*dest,
326            MANIFEST_FILENAME,
327            crate::pipeline::validate_manifest::MANIFEST_MAX_BYTES,
328        )?,
329        None => anyhow::bail!(
330            "no manifest.json at the destination prefix — cannot record repair parts \
331             (was the original export finalized?)"
332        ),
333    };
334    let mut manifest: RunManifest = serde_json::from_slice(&raw)
335        .map_err(|e| anyhow::anyhow!("destination manifest.json is unparseable: {e}"))?;
336
337    // Unique, monotonic part_ids (ADR-0012 M4): max existing + 1, incrementing.
338    let mut next_id = manifest.parts.iter().map(|p| p.part_id).max().unwrap_or(0) + 1;
339    for p in new_parts {
340        manifest.parts.push(ManifestPart {
341            part_id: next_id,
342            path: p.path.clone(),
343            rows: p.rows,
344            size_bytes: p.size_bytes,
345            content_fingerprint: p.content_fingerprint.clone(),
346            content_md5: p.content_md5.clone(),
347            status: PartStatus::Committed,
348        });
349        next_id += 1;
350    }
351
352    // Keep the manifest self-consistent (validate's step 2 checks this): the
353    // declared aggregates must match the committed parts after the append.
354    manifest.row_count = manifest.committed_rows();
355    manifest.part_count = manifest.committed_part_count() as u32;
356    manifest.finished_at = chrono::Utc::now().to_rfc3339();
357
358    // Reuse the standard writer so atomicity / streaming-skip rules stay in one
359    // place. A repaired dataset is not a fresh clean run, so do NOT re-stamp
360    // _SUCCESS here — preserve whatever terminal status the manifest carried
361    // (the writer emits _SUCCESS only for `Success`, which the original clean
362    // run already established).
363    let bytes = serde_json::to_vec_pretty(&manifest)?;
364    let _ = ManifestStatus::Success; // (status unchanged; documented above)
365    let tmp = tempfile::NamedTempFile::new()?;
366    std::fs::write(tmp.path(), &bytes)?;
367    dest.write(tmp.path(), MANIFEST_FILENAME)?;
368    Ok(())
369}
370
371/// Rewrite a repair-written part filename so it carries the ORIGINAL chunk
372/// index (L15). The single-chunk `Precomputed` source the repair runner uses
373/// enumerates from 0, so the writer always emits `..._chunk0_<nonce>.<ext>`
374/// (or a rotated `..._chunk0_<nonce>_p<n>.<ext>`); this replaces that `_chunk0_`
375/// token with `_chunk{original_chunk_index}_` so the name reflects the logical
376/// chunk it repairs.
377///
378/// Returns `None` when there is nothing to do: the chunk index is already 0
379/// (the name is already correct), or the path carries no `_chunk0_` token
380/// (defensive — an unexpected name shape is left untouched rather than mangled).
381///
382/// Targets the **rightmost** `_chunk0_`: everything after the chunk token is a
383/// 16-hex nonce, an optional `_p<n>` rotation suffix, and the extension — none
384/// of which can contain `_chunk0_`, so the rightmost match is the chunk token.
385fn relabel_repair_chunk_index(path: &str, original_chunk_index: i64) -> Option<String> {
386    if original_chunk_index == 0 {
387        return None;
388    }
389    let token = "_chunk0_";
390    let at = path.rfind(token)?;
391    Some(format!(
392        "{}_chunk{}_{}",
393        &path[..at],
394        original_chunk_index,
395        &path[at + token.len()..],
396    ))
397}
398
399fn emit_plan(plan: &RepairPlan, format: &RepairOutputFormat) -> Result<()> {
400    match format {
401        RepairOutputFormat::Pretty => print_plan_pretty(plan),
402        RepairOutputFormat::Json(None) => println!("{}", plan.to_json_pretty()?),
403        RepairOutputFormat::Json(Some(path)) => {
404            std::fs::write(path, plan.to_json_pretty()?)
405                .map_err(|e| anyhow::anyhow!("cannot write repair plan '{}': {}", path, e))?;
406            println!("Repair plan written to: {}", path);
407        }
408    }
409    Ok(())
410}
411
412fn emit_report(report: &RepairReport, format: &RepairOutputFormat) -> Result<()> {
413    match format {
414        RepairOutputFormat::Pretty => print_report_pretty(report),
415        RepairOutputFormat::Json(None) => println!("{}", report.to_json_pretty()?),
416        RepairOutputFormat::Json(Some(path)) => {
417            std::fs::write(path, report.to_json_pretty()?)
418                .map_err(|e| anyhow::anyhow!("cannot write repair report '{}': {}", path, e))?;
419            println!("Repair report written to: {}", path);
420        }
421    }
422    Ok(())
423}
424
425fn print_plan_pretty(plan: &RepairPlan) {
426    println!();
427    println!("  Export            : {}", plan.export_name);
428    println!("  Reconcile run     : {}", plan.reconcile_run_id);
429    println!("  Actions           : {}", plan.actions.len());
430    for a in &plan.actions {
431        println!(
432            "    • chunk {} [{}..{}] — {}",
433            a.chunk_index, a.start_key, a.end_key, a.reason
434        );
435    }
436    if !plan.skipped.is_empty() {
437        println!("  Skipped           :");
438        for s in &plan.skipped {
439            println!("    • {s}");
440        }
441    }
442    if plan.is_empty() && plan.skipped.is_empty() {
443        println!("  (nothing to repair)");
444    }
445    println!();
446}
447
448fn print_report_pretty(report: &RepairReport) {
449    println!();
450    println!("  Export       : {}", report.plan.export_name);
451    println!("  Repair run   : {}", report.repair_run_id);
452    println!(
453        "  Summary      : planned {} · executed {} · skipped {} · failed {} · rows {}",
454        report.summary.planned,
455        report.summary.executed,
456        report.summary.skipped,
457        report.summary.failed,
458        report.summary.rows_written,
459    );
460    for (a, out) in &report.results {
461        let tag = match out {
462            RepairOutcome::Executed { rows_written } => format!("executed ({rows_written} rows)"),
463            RepairOutcome::Skipped { reason } => format!("skipped ({reason})"),
464            RepairOutcome::Failed { error } => format!("failed ({error})"),
465        };
466        println!(
467            "    • chunk {} [{}..{}] — {tag}",
468            a.chunk_index, a.start_key, a.end_key
469        );
470    }
471    println!();
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477    use crate::plan::{PartitionKind, PartitionResult, ReconcileReport};
478
479    #[test]
480    fn plan_from_auto_would_derive_actions_from_reconcile() {
481        // Smoke-test the public derivation path without hitting the DB.
482        let partitions = vec![
483            PartitionResult::classify(
484                PartitionKind::Chunk,
485                "chunk 0 [1..100]".into(),
486                Some(100),
487                Some(100),
488            ),
489            PartitionResult::classify(
490                PartitionKind::Chunk,
491                "chunk 1 [101..200]".into(),
492                Some(100),
493                Some(90),
494            ),
495        ];
496        let r = ReconcileReport::new(
497            "orders".into(),
498            "rec-1".into(),
499            "chunked".into(),
500            partitions,
501        );
502        let plan = RepairPlan::from_reconcile(&r);
503        assert_eq!(plan.actions.len(), 1);
504        assert_eq!(plan.actions[0].chunk_index, 1);
505    }
506
507    // ── L15: repair-written filename carries the ORIGINAL chunk index ─────────
508
509    #[test]
510    fn relabel_repair_chunk_index_rewrites_chunk0_to_original() {
511        // The writer always emits `_chunk0_` for a single-chunk Precomputed
512        // source; repairing logical chunk 2 must rename it to `_chunk2_`.
513        let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8.parquet";
514        let renamed = relabel_repair_chunk_index(written, 2)
515            .expect("a non-zero chunk index must produce a renamed path");
516        assert_eq!(
517            renamed,
518            "orders_20260611_120000_chunk2_a1b2c3d4e5f6a7b8.parquet"
519        );
520        assert!(!renamed.contains("_chunk0_"), "no chunk0 token survives");
521    }
522
523    #[test]
524    fn relabel_repair_chunk_index_handles_rotated_part_suffix() {
525        // A max_file_size rotation suffixes `_p<n>` after the nonce; the chunk
526        // token still rewrites and the rotation suffix is preserved.
527        let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8_p1.parquet";
528        let renamed = relabel_repair_chunk_index(written, 3).unwrap();
529        assert_eq!(
530            renamed,
531            "orders_20260611_120000_chunk3_a1b2c3d4e5f6a7b8_p1.parquet"
532        );
533    }
534
535    #[test]
536    fn relabel_repair_chunk_index_is_noop_for_chunk_zero() {
537        // Chunk 0's name is already correct — nothing to rename, so no move.
538        let written = "orders_20260611_120000_chunk0_a1b2c3d4e5f6a7b8.parquet";
539        assert!(relabel_repair_chunk_index(written, 0).is_none());
540    }
541
542    #[test]
543    fn relabel_repair_chunk_index_leaves_unexpected_shapes_untouched() {
544        // A name without the chunk0 token (e.g. an unexpected writer shape) is
545        // left alone rather than mangled.
546        assert!(relabel_repair_chunk_index("orders_no_chunk_token.parquet", 5).is_none());
547    }
548}