Skip to main content

rivet/pipeline/
reconcile_cmd.rs

1//! `rivet reconcile` — partition/window reconciliation (Epic F).
2//!
3//! Re-runs per-partition `COUNT(*)` against the source for a chunked export and
4//! compares the result with the stored per-chunk row counts. Produces a
5//! [`ReconcileReport`] — matches, mismatches, and repair candidates.
6//!
7//! v1 supports **chunked exports with `chunk_checkpoint: true`** (so per-chunk
8//! row counts and ranges are persisted). Other modes return a clear
9//! "not supported in v1" error — reconcile semantics for snapshot / incremental
10//! / time-window differ (see roadmap: Epic F & Epic G).
11
12use std::collections::HashMap;
13use std::path::Path;
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::plan::{
18    ExtractionStrategy, PartitionKind, PartitionResult, ReconcileReport, ReconcileSummary,
19    ResolvedRunPlan, build_plan,
20};
21use crate::source;
22use crate::state::{ChunkTaskInfo, StateStore};
23
24use super::chunked::build_chunk_query_sql;
25
26/// Output format for the reconcile report.
27pub enum ReconcileOutputFormat {
28    /// Human-readable summary printed to stdout.
29    Pretty,
30    /// Pretty-printed JSON written to the given path (or stdout if `None`).
31    Json(Option<String>),
32}
33
34pub fn run_reconcile_command(
35    config_path: &str,
36    export_name: &str,
37    params: Option<&HashMap<String, String>>,
38    format: ReconcileOutputFormat,
39) -> Result<()> {
40    let config = Config::load_with_params(config_path, params)?;
41    let config_dir = Path::new(config_path)
42        .parent()
43        .unwrap_or_else(|| Path::new("."));
44
45    let export = config
46        .exports
47        .iter()
48        .find(|e| e.name == export_name)
49        .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", export_name))?;
50
51    let plan = build_plan(&config, export, config_dir, false, false, false, params)?;
52
53    let state_path = config_dir.join(".rivet_state.db");
54    let state = StateStore::open(state_path.to_str().unwrap_or(".rivet_state.db"))?;
55
56    let report = match &plan.strategy {
57        ExtractionStrategy::Chunked(_) => reconcile_chunked(&plan, &state, export)?,
58        ExtractionStrategy::TimeWindow { .. } => {
59            anyhow::bail!(
60                "reconcile: time-window mode is not supported in v1 (Epic F). \
61                 Convert to chunked mode with `chunk_by_days` for partition-level reconcile."
62            );
63        }
64        ExtractionStrategy::Snapshot
65        | ExtractionStrategy::Incremental(_)
66        | ExtractionStrategy::Keyset(_) => {
67            anyhow::bail!(
68                "reconcile: '{}' mode has no natural partitions — use `rivet run --reconcile` for a whole-export count check",
69                plan.strategy.mode_label()
70            );
71        }
72    };
73
74    emit_report(&report, &format)?;
75    enforce_reconcile_exit(&report.summary)
76}
77
78/// Exit-code contract for `rivet reconcile`.
79///
80/// A detected **mismatch** fails the command (non-zero exit) so an operator can
81/// gate on it — `rivet reconcile && <next step>` must not proceed when the
82/// export disagrees with the source. This mirrors `rivet validate`, which
83/// already exits non-zero on a failed verdict; before this, `reconcile` printed
84/// the mismatch but returned `Ok`, so the gate silently passed.
85///
86/// **Unknown** partitions are *not* a failure: they mean reconcile could not
87/// obtain one of the two counts — an incomplete chunk (never recorded
88/// `rows_written`) or a non-integer keyset key it cannot re-count in the source.
89/// That is "could not verify", not "verified wrong"; a keyset export is
90/// structurally all-unknown and must not read as a hard failure. They are
91/// surfaced as a warning so "0 mismatches, N unknown" is not mistaken for a
92/// clean audit.
93fn enforce_reconcile_exit(summary: &ReconcileSummary) -> Result<()> {
94    if summary.unknown > 0 {
95        log::warn!(
96            "reconcile: {} of {} partition(s) could not be verified (incomplete chunk or \
97             non-integer keyset key — no source re-count); not counted as a mismatch",
98            summary.unknown,
99            summary.total_partitions
100        );
101    }
102    if summary.mismatches > 0 {
103        anyhow::bail!(
104            "reconcile: {} of {} partition(s) disagree with the source — see the report above",
105            summary.mismatches,
106            summary.total_partitions
107        );
108    }
109    Ok(())
110}
111
112/// Run a reconcile pass against the latest chunk run and return the report.
113/// Exposed so `rivet repair --auto` can build a repair plan from a fresh reconcile
114/// without duplicating the logic.
115pub(crate) fn reconcile_chunked_fresh(
116    plan: &ResolvedRunPlan,
117    state: &StateStore,
118) -> Result<ReconcileReport> {
119    reconcile_chunked_inner(plan, state)
120}
121
122fn reconcile_chunked(
123    plan: &ResolvedRunPlan,
124    state: &StateStore,
125    _export: &ExportConfig,
126) -> Result<ReconcileReport> {
127    reconcile_chunked_inner(plan, state)
128}
129
130fn reconcile_chunked_inner(plan: &ResolvedRunPlan, state: &StateStore) -> Result<ReconcileReport> {
131    let (run_id, _plan_hash, _status, _updated) = state
132        .get_latest_chunk_run(&plan.export_name)?
133        .ok_or_else(|| {
134            anyhow::anyhow!(
135                "reconcile: no chunk run recorded for export '{}'. \
136                 Enable `chunk_checkpoint: true` and run the export first.",
137                plan.export_name
138            )
139        })?;
140
141    let tasks = state.list_chunk_tasks_for_run(&run_id)?;
142    if tasks.is_empty() {
143        anyhow::bail!(
144            "reconcile: chunk run '{}' for export '{}' has no tasks",
145            run_id,
146            plan.export_name
147        );
148    }
149
150    let mut src = source::create_source(&plan.source)?;
151    let partitions = reconcile_chunked_tasks(plan, &tasks, |chunk_query| {
152        let count_sql = format!("SELECT COUNT(*) FROM ({chunk_query}) AS _rc");
153        let raw = src.query_scalar(&count_sql)?;
154        Ok(raw.and_then(|s| s.trim().parse::<i64>().ok()))
155    })?;
156
157    let report = ReconcileReport::new(
158        plan.export_name.clone(),
159        run_id.clone(),
160        plan.strategy.mode_label().to_string(),
161        partitions,
162    );
163
164    // Epic G: a reconcile with no mismatches and no unknowns is a fresh verified boundary.
165    if report.summary.mismatches == 0 && report.summary.unknown == 0 {
166        let highest = tasks
167            .iter()
168            .filter(|t| t.status == "completed")
169            .map(|t| t.chunk_index)
170            .max();
171        if let Some(idx) = highest
172            && let Err(e) = state.record_verified_chunked(&plan.export_name, idx, &run_id)
173        {
174            log::warn!(
175                "export '{}': verified boundary update failed: {:#}",
176                plan.export_name,
177                e
178            );
179        }
180    }
181
182    Ok(report)
183}
184
185/// Pure classification of `ChunkTaskInfo`s into `PartitionResult`s.
186///
187/// Abstracts the source round-trip via `count_source` so the logic can be
188/// exercised in unit tests without a live database.
189pub(crate) fn reconcile_chunked_tasks<F>(
190    plan: &ResolvedRunPlan,
191    tasks: &[ChunkTaskInfo],
192    mut count_source: F,
193) -> Result<Vec<PartitionResult>>
194where
195    F: FnMut(&str) -> Result<Option<i64>>,
196{
197    let cp = match &plan.strategy {
198        ExtractionStrategy::Chunked(cp) => cp,
199        _ => anyhow::bail!("reconcile_chunked_tasks requires Chunked strategy"),
200    };
201
202    let mut out: Vec<PartitionResult> = Vec::with_capacity(tasks.len());
203    for t in tasks {
204        let exported = if t.status == "completed" {
205            t.rows_written
206        } else {
207            None
208        };
209
210        let (start, end) = match (t.start_key.parse::<i64>(), t.end_key.parse::<i64>()) {
211            (Ok(s), Ok(e)) => (s, e),
212            _ => {
213                // Unparseable chunk keys: keep exported count but cannot re-count source.
214                out.push(PartitionResult::classify(
215                    PartitionKind::Chunk,
216                    format!("chunk {} [{}..{}]", t.chunk_index, t.start_key, t.end_key),
217                    None,
218                    exported,
219                ));
220                continue;
221            }
222        };
223
224        let chunk_query = build_chunk_query_sql(
225            &plan.base_query,
226            &cp.column,
227            start,
228            end,
229            cp.dense,
230            cp.by_days.is_some(),
231            plan.source.source_type,
232        );
233        let source_count = count_source(&chunk_query)?;
234
235        out.push(PartitionResult::classify(
236            PartitionKind::Chunk,
237            format!("chunk {} [{}..{}]", t.chunk_index, start, end),
238            source_count,
239            exported,
240        ));
241    }
242    Ok(out)
243}
244
245fn emit_report(report: &ReconcileReport, format: &ReconcileOutputFormat) -> Result<()> {
246    match format {
247        ReconcileOutputFormat::Pretty => {
248            print_report_pretty(report);
249        }
250        ReconcileOutputFormat::Json(None) => {
251            println!("{}", report.to_json_pretty()?);
252        }
253        ReconcileOutputFormat::Json(Some(path)) => {
254            let json = report.to_json_pretty()?;
255            std::fs::write(path, &json)
256                .map_err(|e| anyhow::anyhow!("cannot write reconcile report '{}': {}", path, e))?;
257            println!("Reconcile report written to: {}", path);
258        }
259    }
260    Ok(())
261}
262
263fn print_report_pretty(report: &ReconcileReport) {
264    println!();
265    println!("  Export    : {}", report.export_name);
266    println!("  Run       : {}", report.run_id);
267    println!("  Strategy  : {}", report.strategy);
268    println!(
269        "  Partitions: {} ({} match, {} mismatch, {} unknown)",
270        report.summary.total_partitions,
271        report.summary.matches,
272        report.summary.mismatches,
273        report.summary.unknown,
274    );
275    println!(
276        "  Rows      : source {} / exported {}",
277        report.summary.total_source_rows, report.summary.total_exported_rows,
278    );
279
280    let repair = report.repair_candidates();
281    if repair.is_empty() {
282        println!("  Status    : all partitions match");
283    } else {
284        println!("  Repair candidates:");
285        for p in repair {
286            println!("    • {} — {}", p.identifier, format_status_note(p));
287        }
288    }
289    println!();
290}
291
292fn format_status_note(p: &PartitionResult) -> String {
293    let s = match (p.source_count, p.exported_count) {
294        (Some(s), Some(e)) => format!("source={s}, exported={e}"),
295        (Some(s), None) => format!("source={s}, exported=n/a"),
296        (None, Some(e)) => format!("source=n/a, exported={e}"),
297        (None, None) => "no counts".to_string(),
298    };
299    if p.note.is_empty() {
300        s
301    } else {
302        format!("{s} ({})", p.note)
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::config::{
310        CompressionType, DestinationConfig, DestinationType, FormatType, MetaColumns, SourceConfig,
311        SourceType,
312    };
313    use crate::plan::{ChunkedPlan, ExtractionStrategy};
314    use crate::state::ChunkTaskInfo;
315    use crate::tuning::SourceTuning;
316
317    fn chunked_plan() -> ResolvedRunPlan {
318        ResolvedRunPlan {
319            export_name: "orders".into(),
320            base_query: "SELECT * FROM orders".into(),
321            strategy: ExtractionStrategy::Chunked(ChunkedPlan {
322                column: "id".into(),
323                chunk_size: 100,
324                chunk_count: None,
325                parallel: 1,
326                dense: false,
327                by_days: None,
328                checkpoint: true,
329                max_attempts: 3,
330            }),
331            format: FormatType::Parquet,
332            compression: CompressionType::Zstd,
333            compression_level: None,
334            max_file_size_bytes: None,
335            skip_empty: false,
336            meta_columns: MetaColumns::default(),
337            destination: DestinationConfig {
338                destination_type: DestinationType::Local,
339                path: Some("./out".into()),
340                ..Default::default()
341            },
342            quality: None,
343            tuning: SourceTuning::from_config(None),
344            tuning_profile_label: "balanced (default)".into(),
345            validate: false,
346            reconcile: false,
347            resume: false,
348            source: SourceConfig {
349                source_type: SourceType::Postgres,
350                url: Some("postgresql://localhost/test".into()),
351                url_env: None,
352                url_file: None,
353                host: None,
354                port: None,
355                user: None,
356                password: None,
357                password_env: None,
358                database: None,
359                environment: None,
360                tuning: None,
361                tls: None,
362            },
363            column_overrides: Default::default(),
364            verify: crate::config::VerifyMode::Size,
365            schema_drift_policy: Default::default(),
366            shape_drift_warn_factor: 2.0,
367            parquet: None,
368        }
369    }
370
371    fn task(idx: i64, start: &str, end: &str, status: &str, rows: Option<i64>) -> ChunkTaskInfo {
372        ChunkTaskInfo {
373            chunk_index: idx,
374            start_key: start.into(),
375            end_key: end.into(),
376            status: status.into(),
377            attempts: 1,
378            last_error: None,
379            rows_written: rows,
380            file_name: None,
381        }
382    }
383
384    #[test]
385    fn matches_and_mismatches_are_classified() {
386        let plan = chunked_plan();
387        let tasks = vec![
388            task(0, "1", "100", "completed", Some(42)),
389            task(1, "101", "200", "completed", Some(30)),
390        ];
391        // Stub source: chunk 0 matches, chunk 1 undercounts on export side.
392        let mut n = 0;
393        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
394            n += 1;
395            Ok(Some(if n == 1 { 42 } else { 33 }))
396        })
397        .unwrap();
398
399        assert_eq!(parts.len(), 2);
400        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Match);
401        assert_eq!(parts[1].status, crate::plan::PartitionStatus::Mismatch);
402        assert_eq!(parts[1].source_count, Some(33));
403        assert_eq!(parts[1].exported_count, Some(30));
404    }
405
406    #[test]
407    fn unfinished_task_is_unknown_and_does_not_hide_source_count() {
408        let plan = chunked_plan();
409        let tasks = vec![task(0, "1", "100", "failed", None)];
410        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| Ok(Some(42))).unwrap();
411        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
412        assert_eq!(parts[0].source_count, Some(42));
413        assert_eq!(parts[0].exported_count, None);
414    }
415
416    #[test]
417    fn unparseable_chunk_keys_are_unknown_without_source_lookup() {
418        let plan = chunked_plan();
419        let tasks = vec![task(0, "alpha", "omega", "completed", Some(5))];
420        let mut called = false;
421        let parts = reconcile_chunked_tasks(&plan, &tasks, |_q| {
422            called = true;
423            Ok(Some(99))
424        })
425        .unwrap();
426        assert!(
427            !called,
428            "reconcile must skip source count for unparseable chunk keys"
429        );
430        assert_eq!(parts[0].status, crate::plan::PartitionStatus::Unknown);
431        assert_eq!(parts[0].exported_count, Some(5));
432    }
433
434    #[test]
435    fn chunk_query_passes_through_chunked_math() {
436        let plan = chunked_plan();
437        let tasks = vec![task(0, "10", "20", "completed", Some(5))];
438        let mut captured = String::new();
439        reconcile_chunked_tasks(&plan, &tasks, |q| {
440            captured = q.to_string();
441            Ok(Some(5))
442        })
443        .unwrap();
444        // Must reuse the same WHERE predicate used during extraction (ADR-0001 shape).
445        assert!(captured.contains("BETWEEN 10 AND 20"), "got: {captured}");
446        assert!(
447            captured.contains("\"id\""),
448            "identifier must be quoted: {captured}"
449        );
450    }
451
452    // ─── Exit-code contract (regression: reconcile gated silently passed) ──────
453
454    fn summary(matches: usize, mismatches: usize, unknown: usize) -> ReconcileSummary {
455        ReconcileSummary {
456            total_partitions: matches + mismatches + unknown,
457            matches,
458            mismatches,
459            unknown,
460            total_source_rows: 0,
461            total_exported_rows: 0,
462        }
463    }
464
465    #[test]
466    fn reconcile_exit_fails_on_mismatch() {
467        // The gate: a detected mismatch must be a non-zero exit so
468        // `rivet reconcile && <next>` does not proceed on disagreeing data.
469        // Before the fix this returned Ok and the audit silently passed.
470        let err = enforce_reconcile_exit(&summary(3, 1, 0)).unwrap_err();
471        assert!(
472            err.to_string().contains("disagree with the source"),
473            "got: {err}"
474        );
475    }
476
477    #[test]
478    fn reconcile_exit_passes_when_all_match() {
479        assert!(enforce_reconcile_exit(&summary(4, 0, 0)).is_ok());
480    }
481
482    #[test]
483    fn reconcile_exit_does_not_fail_on_unknown_only() {
484        // Unknown = "could not verify" (incomplete chunk / non-integer keyset
485        // key), not "verified wrong" — must NOT fail, else every keyset export
486        // (structurally all-unknown) would error.
487        assert!(enforce_reconcile_exit(&summary(2, 0, 3)).is_ok());
488    }
489
490    #[test]
491    fn reconcile_exit_fails_when_mismatch_and_unknown_coexist() {
492        // A real mismatch still gates even when other partitions are unverifiable.
493        assert!(enforce_reconcile_exit(&summary(0, 1, 2)).is_err());
494    }
495}