Skip to main content

rivet/pipeline/
reconcile_cmd.rs

1//! `rivet reconcile` — partition/window reconciliation (Epic F).
2//!
3//! Re-runs per-partition `COUNT(*)` against the source for a chunked export and
4//! compares the result with the stored per-chunk row counts. Produces a
5//! [`ReconcileReport`] — matches, mismatches, and repair candidates.
6//!
7//! v1 supports **chunked exports with `chunk_checkpoint: true`** (so per-chunk
8//! row counts and ranges are persisted). Other modes return a clear
9//! "not supported in v1" error — reconcile semantics for snapshot / incremental
10//! / time-window differ (see roadmap: Epic F & Epic G).
11
12use std::collections::HashMap;
13use std::path::Path;
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::plan::{
18    ExtractionStrategy, PartitionKind, PartitionResult, ReconcileReport, ReconcileSummary,
19    ResolvedRunPlan, build_plan,
20};
21use crate::source;
22use crate::state::{ChunkTaskInfo, StateStore};
23
24use super::chunked::build_chunk_query_sql;
25
26/// Output format for the reconcile report.
27pub enum ReconcileOutputFormat {
28    /// Human-readable summary printed to stdout.
29    Pretty,
30    /// Pretty-printed JSON written to the given path (or stdout if `None`).
31    Json(Option<String>),
32}
33
34pub fn run_reconcile_command(
35    config_path: &str,
36    export_name: &str,
37    params: Option<&HashMap<String, String>>,
38    format: ReconcileOutputFormat,
39) -> Result<()> {
40    let config = Config::load_with_params(config_path, params)?;
41    let config_dir = Path::new(config_path)
42        .parent()
43        .unwrap_or_else(|| Path::new("."));
44
45    let export = config
46        .exports
47        .iter()
48        .find(|e| e.name == export_name)
49        .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
50
51    let plan = build_plan(&config, export, config_dir, false, false, false, params)?;
52
53    let state_path = config_dir.join(".rivet_state.db");
54    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
55
56    let report = match &plan.strategy {
57        ExtractionStrategy::Chunked(_) => reconcile_chunked(&plan, &state, export)?,
58        ExtractionStrategy::TimeWindow { .. } => {
59            anyhow::bail!(
60                "reconcile: time-window mode is not supported in v1 (Epic F). \
61                 Convert to chunked mode with `chunk_by_days` for partition-level reconcile."
62            );
63        }
64        ExtractionStrategy::Snapshot
65        | ExtractionStrategy::Incremental(_)
66        | ExtractionStrategy::Keyset(_) => {
67            anyhow::bail!(
68                "reconcile: '{}' mode has no natural partitions — use `rivet run --reconcile` for a whole-export count check",
69                plan.strategy.mode_label()
70            );
71        }
72    };
73
74    emit_report(&report, &format)?;
75    enforce_reconcile_exit(&report.summary)
76}
77
78/// Exit-code contract for `rivet reconcile`.
79///
80/// A detected **mismatch** fails the command (non-zero exit) so an operator can
81/// gate on it — `rivet reconcile && <next step>` must not proceed when the
82/// export disagrees with the source. This mirrors `rivet validate`, which
83/// already exits non-zero on a failed verdict; before this, `reconcile` printed
84/// the mismatch but returned `Ok`, so the gate silently passed.
85///
86/// **Unknown** partitions are *not* a failure: they mean reconcile could not
87/// obtain one of the two counts — an incomplete chunk (never recorded
88/// `rows_written`) or a non-integer keyset key it cannot re-count in the source.
89/// That is "could not verify", not "verified wrong"; a keyset export is
90/// structurally all-unknown and must not read as a hard failure. They are
91/// surfaced as a warning so "0 mismatches, N unknown" is not mistaken for a
92/// clean audit.
93fn enforce_reconcile_exit(summary: &ReconcileSummary) -> Result<()> {
94    if summary.unknown > 0 {
95        log::warn!(
96            "reconcile: {} of {} partition(s) could not be verified (incomplete chunk or \
97             non-integer keyset key — no source re-count); not counted as a mismatch",
98            summary.unknown,
99            summary.total_partitions
100        );
101    }
102    if summary.mismatches > 0 {
103        // A partition disagreeing with the source is a verified-wrong result, not
104        // a "could not verify" — it is the data-integrity class (exit 3) named in
105        // the `error::ExitClass` table. Typed marker so a scheduler branches on
106        // the code; `unknown` partitions (handled above) stay a warning, not 3.
107        return Err(crate::error::DataIntegrityError::new(format!(
108            "reconcile: {} of {} partition(s) disagree with the source — see the report above",
109            summary.mismatches, summary.total_partitions
110        ))
111        .into());
112    }
113    Ok(())
114}
115
116/// Run a reconcile pass against the latest chunk run and return the report.
117/// Exposed so `rivet repair --auto` can build a repair plan from a fresh reconcile
118/// without duplicating the logic.
119pub(crate) fn reconcile_chunked_fresh(
120    plan: &ResolvedRunPlan,
121    state: &StateStore,
122) -> Result<ReconcileReport> {
123    reconcile_chunked_inner(plan, state)
124}
125
126fn reconcile_chunked(
127    plan: &ResolvedRunPlan,
128    state: &StateStore,
129    _export: &ExportConfig,
130) -> Result<ReconcileReport> {
131    reconcile_chunked_inner(plan, state)
132}
133
134fn reconcile_chunked_inner(plan: &ResolvedRunPlan, state: &StateStore) -> Result<ReconcileReport> {
135    let (run_id, _plan_hash, _status, _updated) = state
136        .get_latest_chunk_run(&plan.export_name)?
137        .ok_or_else(|| {
138            anyhow::anyhow!(
139                "reconcile: no chunk run recorded for export '{}'. \
140                 Enable `chunk_checkpoint: true` and run the export first.",
141                plan.export_name
142            )
143        })?;
144
145    let tasks = state.list_chunk_tasks_for_run(&run_id)?;
146    if tasks.is_empty() {
147        anyhow::bail!(
148            "reconcile: chunk run '{}' for export '{}' has no tasks",
149            run_id,
150            plan.export_name
151        );
152    }
153
154    let mut src = source::create_source(&plan.source)?;
155    let partitions = reconcile_chunked_tasks(plan, &tasks, |chunk_query| {
156        let count_sql = format!("SELECT COUNT(*) FROM ({chunk_query}) AS _rc");
157        let raw = src.query_scalar(&count_sql)?;
158        Ok(raw.and_then(|s| s.trim().parse::<i64>().ok()))
159    })?;
160
161    let report = ReconcileReport::new(
162        plan.export_name.clone(),
163        run_id.clone(),
164        plan.strategy.mode_label().to_string(),
165        partitions,
166    );
167
168    // Epic G: a reconcile with no mismatches and no unknowns is a fresh verified boundary.
169    if report.summary.mismatches == 0 && report.summary.unknown == 0 {
170        let highest = tasks
171            .iter()
172            .filter(|t| t.status == "completed")
173            .map(|t| t.chunk_index)
174            .max();
175        if let Some(idx) = highest
176            && let Err(e) = state.record_verified_chunked(&plan.export_name, idx, &run_id)
177        {
178            log::warn!(
179                "export '{}': verified boundary update failed: {:#}",
180                plan.export_name,
181                e
182            );
183        }
184    }
185
186    Ok(report)
187}
188
189/// Pure classification of `ChunkTaskInfo`s into `PartitionResult`s.
190///
191/// Abstracts the source round-trip via `count_source` so the logic can be
192/// exercised in unit tests without a live database.
193pub(crate) fn reconcile_chunked_tasks<F>(
194    plan: &ResolvedRunPlan,
195    tasks: &[ChunkTaskInfo],
196    mut count_source: F,
197) -> Result<Vec<PartitionResult>>
198where
199    F: FnMut(&str) -> Result<Option<i64>>,
200{
201    let cp = match &plan.strategy {
202        ExtractionStrategy::Chunked(cp) => cp,
203        _ => anyhow::bail!("reconcile_chunked_tasks requires Chunked strategy"),
204    };
205
206    let mut out: Vec<PartitionResult> = Vec::with_capacity(tasks.len());
207    for t in tasks {
208        let exported = if t.status == "completed" {
209            t.rows_written
210        } else {
211            None
212        };
213
214        let (start, end) = match (t.start_key.parse::<i64>(), t.end_key.parse::<i64>()) {
215            (Ok(s), Ok(e)) => (s, e),
216            _ => {
217                // Unparseable chunk keys: keep exported count but cannot re-count source.
218                out.push(PartitionResult::classify(
219                    PartitionKind::Chunk,
220                    format!("chunk {} [{}..{}]", t.chunk_index, t.start_key, t.end_key),
221                    None,
222                    exported,
223                ));
224                continue;
225            }
226        };
227
228        let chunk_query = build_chunk_query_sql(
229            &plan.base_query,
230            &cp.column,
231            start,
232            end,
233            cp.dense,
234            cp.by_days.is_some(),
235            plan.source.source_type,
236        );
237        let source_count = count_source(&chunk_query)?;
238
239        out.push(PartitionResult::classify(
240            PartitionKind::Chunk,
241            format!("chunk {} [{}..{}]", t.chunk_index, start, end),
242            source_count,
243            exported,
244        ));
245    }
246    Ok(out)
247}
248
249fn emit_report(report: &ReconcileReport, format: &ReconcileOutputFormat) -> Result<()> {
250    match format {
251        ReconcileOutputFormat::Pretty => {
252            print_report_pretty(report);
253        }
254        ReconcileOutputFormat::Json(None) => {
255            println!("{}", report.to_json_pretty()?);
256        }
257        ReconcileOutputFormat::Json(Some(path)) => {
258            let json = report.to_json_pretty()?;
259            std::fs::write(path, &json)
260                .map_err(|e| anyhow::anyhow!("cannot write reconcile report '{}': {}", path, e))?;
261            println!("Reconcile report written to: {}", path);
262        }
263    }
264    Ok(())
265}
266
267fn print_report_pretty(report: &ReconcileReport) {
268    println!();
269    println!("  Export    : {}", report.export_name);
270    println!("  Run       : {}", report.run_id);
271    println!("  Strategy  : {}", report.strategy);
272    println!(
273        "  Partitions: {} ({} match, {} mismatch, {} unknown)",
274        report.summary.total_partitions,
275        report.summary.matches,
276        report.summary.mismatches,
277        report.summary.unknown,
278    );
279    println!(
280        "  Rows      : source {} / exported {}",
281        report.summary.total_source_rows, report.summary.total_exported_rows,
282    );
283
284    let repair = report.repair_candidates();
285    if repair.is_empty() {
286        println!("  Status    : all partitions match");
287    } else {
288        println!("  Repair candidates:");
289        for p in repair {
290            println!("    • {} — {}", p.identifier, format_status_note(p));
291        }
292    }
293    println!();
294}
295
296fn format_status_note(p: &PartitionResult) -> String {
297    let s = match (p.source_count, p.exported_count) {
298        (Some(s), Some(e)) => format!("source={s}, exported={e}"),
299        (Some(s), None) => format!("source={s}, exported=n/a"),
300        (None, Some(e)) => format!("source=n/a, exported={e}"),
301        (None, None) => "no counts".to_string(),
302    };
303    if p.note.is_empty() {
304        s
305    } else {
306        format!("{s} ({})", p.note)
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use crate::config::{
314        CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
315        SourceType,
316    };
317    use crate::plan::{ChunkedPlan, ExtractionStrategy};
318    use crate::state::ChunkTaskInfo;
319    use crate::tuning::SourceTuning;
320
321    fn chunked_plan() -> ResolvedRunPlan {
322        ResolvedRunPlan {
323            export_name: "orders".into(),
324            base_query: "SELECT * FROM orders".into(),
325            strategy: ExtractionStrategy::Chunked(ChunkedPlan {
326                column: "id".into(),
327                chunk_size: 100,
328                chunk_count: None,
329                parallel: 1,
330                dense: false,
331                by_days: None,
332                checkpoint: true,
333                max_attempts: 3,
334            }),
335            format: FormatType::Parquet,
336            compression: CompressionType::Zstd,
337            compression_level: None,
338            max_file_size_bytes: None,
339            skip_empty: false,
340            meta_columns: MetaColumns::default(),
341            destination: DestinationConfig {
342                destination_type: DestinationType::Local,
343                path: Some("./out".into()),
344                ..Default::default()
345            },
346            quality: None,
347            tuning: SourceTuning::from_config(None),
348            tuning_profile_label: "balanced (default)".into(),
349            validate: false,
350            reconcile: false,
351            resume: false,
352            source: SourceConfig {
353                source_type: SourceType::Postgres,
354                url: Some("postgresql://localhost/test".into()),
355                url_env: None,
356                url_file: None,
357                host: None,
358                port: None,
359                user: None,
360                password: None,
361                password_env: None,
362                database: None,
363                environment: None,
364                tuning: None,
365                tls: None,
366            },
367            column_overrides: Default::default(),
368            verify: crate::config::VerifyMode::Size,
369            schema_drift_policy: Default::default(),
370            shape_drift_warn_factor: 2.0,
371            parquet: None,
372        }
373    }
374
375    fn task(idx: i64, start: &str, end: &str, status: &str, rows: Option<i64>) -> ChunkTaskInfo {
376        ChunkTaskInfo {
377            chunk_index: idx,
378            start_key: start.into(),
379            end_key: end.into(),
380            status: status.into(),
381            attempts: 1,
382            last_error: None,
383            rows_written: rows,
384            file_name: None,
385        }
386    }
387
388    #[test]
389    fn matches_and_mismatches_are_classified() {
390        let plan = chunked_plan();
391        let tasks = vec![
392            task(0, "1", "100", "completed", Some(42)),
393            task(1, "101", "200", "completed", Some(30)),
394        ];
395        // Stub source: chunk 0 matches, chunk 1 undercounts on export side.
396        let mut n = 0;
397        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
398            n += 1;
399            Ok(Some(if n == 1 { 42 } else { 33 }))
400        })
401        .unwrap();
402
403        assert_eq!(parts.len(), 2);
404        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Match);
405        assert_eq!(parts[1].status, crate::plan::PartitionStatus::Mismatch);
406        assert_eq!(parts[1].source_count, Some(33));
407        assert_eq!(parts[1].exported_count, Some(30));
408    }
409
410    #[test]
411    fn unfinished_task_is_unknown_and_does_not_hide_source_count() {
412        let plan = chunked_plan();
413        let tasks = vec![task(0, "1", "100", "failed", None)];
414        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| Ok(Some(42))).unwrap();
415        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
416        assert_eq!(parts[0].source_count, Some(42));
417        assert_eq!(parts[0].exported_count, None);
418    }
419
420    #[test]
421    fn unparseable_chunk_keys_are_unknown_without_source_lookup() {
422        let plan = chunked_plan();
423        let tasks = vec![task(0, "alpha", "omega", "completed", Some(5))];
424        let mut called = false;
425        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
426            called = true;
427            Ok(Some(99))
428        })
429        .unwrap();
430        assert!(
431            !called,
432            "reconcile must skip source count for unparseable chunk keys"
433        );
434        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
435        assert_eq!(parts[0].exported_count, Some(5));
436    }
437
438    #[test]
439    fn chunk_query_passes_through_chunked_math() {
440        let plan = chunked_plan();
441        let tasks = vec![task(0, "10", "20", "completed", Some(5))];
442        let mut captured = String::new();
443        reconcile_chunked_tasks(&plan, &tasks, |q| {
444            captured = q.to_string();
445            Ok(Some(5))
446        })
447        .unwrap();
448        // Must reuse the same WHERE predicate used during extraction (ADR-0001 shape).
449        assert!(captured.contains("BETWEEN 10 AND 20"), "got: {captured}");
450        assert!(
451            captured.contains("\"id\""),
452            "identifier must be quoted: {captured}"
453        );
454    }
455
456    // ─── Exit-code contract (regression: reconcile gated silently passed) ──────
457
458    fn summary(matches: usize, mismatches: usize, unknown: usize) -> ReconcileSummary {
459        ReconcileSummary {
460            total_partitions: matches + mismatches + unknown,
461            matches,
462            mismatches,
463            unknown,
464            total_source_rows: 0,
465            total_exported_rows: 0,
466        }
467    }
468
469    #[test]
470    fn reconcile_exit_fails_on_mismatch() {
471        // The gate: a detected mismatch must be a non-zero exit so
472        // `rivet reconcile && <next>` does not proceed on disagreeing data.
473        // Before the fix this returned Ok and the audit silently passed.
474        let err = enforce_reconcile_exit(&summary(3, 1, 0)).unwrap_err();
475        assert!(
476            err.to_string().contains("disagree with the source"),
477            "got: {err}"
478        );
479        // And it carries the data-integrity class so a scheduler exits 3, not 1 —
480        // the `error::ExitClass` table lists "reconcile mismatch" as exit 3.
481        assert_eq!(
482            crate::error::classify_exit(&err),
483            3,
484            "a reconcile mismatch must classify as data-integrity (exit 3)"
485        );
486    }
487
488    #[test]
489    fn reconcile_exit_passes_when_all_match() {
490        assert!(enforce_reconcile_exit(&summary(4, 0, 0)).is_ok());
491    }
492
493    #[test]
494    fn reconcile_exit_does_not_fail_on_unknown_only() {
495        // Unknown = "could not verify" (incomplete chunk / non-integer keyset
496        // key), not "verified wrong" — must NOT fail, else every keyset export
497        // (structurally all-unknown) would error.
498        assert!(enforce_reconcile_exit(&summary(2, 0, 3)).is_ok());
499    }
500
501    #[test]
502    fn reconcile_exit_fails_when_mismatch_and_unknown_coexist() {
503        // A real mismatch still gates even when other partitions are unverifiable.
504        assert!(enforce_reconcile_exit(&summary(0, 1, 2)).is_err());
505    }
506}